From 44b3853230f8a1f6058b22d5b08b415e280cb420 Mon Sep 17 00:00:00 2001 From: ChuckBuilds <33324927+ChuckBuilds@users.noreply.github.com> Date: Thu, 17 Apr 2025 09:25:48 -0500 Subject: [PATCH] added handling of stock images to reduce number of downloads --- src/stock_manager.py | 278 +++++++++++++++++-------------------------- 1 file changed, 112 insertions(+), 166 deletions(-) diff --git a/src/stock_manager.py b/src/stock_manager.py index 38a94667..447ebbd9 100644 --- a/src/stock_manager.py +++ b/src/stock_manager.py @@ -40,33 +40,29 @@ class StockManager: # Try to use the assets/stocks directory from the repository self.logo_dir = os.path.join('assets', 'stocks') - self.use_temp_dir = False - # Check if we can write to the logo directory + # Check if we can use the logo directory, otherwise use temporary try: if not os.path.exists(self.logo_dir): try: os.makedirs(self.logo_dir, mode=0o755, exist_ok=True) logger.info(f"Created logo directory: {self.logo_dir}") except (PermissionError, OSError) as e: - logger.warning(f"Cannot create logo directory: {str(e)}. Using temporary directory instead.") - self.use_temp_dir = True + logger.warning(f"Cannot create logo directory '{self.logo_dir}': {str(e)}. Using temporary directory.") + import tempfile + self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_') elif not os.access(self.logo_dir, os.W_OK): - logger.warning(f"Cannot write to logo directory: {self.logo_dir}. Using temporary directory instead.") - self.use_temp_dir = True - - # If we need to use a temporary directory, create it - if self.use_temp_dir: + logger.warning(f"Cannot write to logo directory '{self.logo_dir}'. Using temporary directory.") import tempfile self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_') - logger.info(f"Using temporary directory for logos: {self.logo_dir}") + + logger.info(f"Using logo directory: {self.logo_dir}") except Exception as e: logger.error(f"Error setting up logo directory: {str(e)}") # Fall back to using a temporary directory import tempfile self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_') - self.use_temp_dir = True logger.info(f"Using temporary directory for logos: {self.logo_dir}") self.headers = { @@ -209,6 +205,35 @@ class StockManager: logger.error(f"Unexpected error fetching data for {symbol}: {e}") return None + def _fetch_logo_data(self, symbol: str) -> bytes | None: + """Fetch logo image data from various sources.""" + urls_to_try = [ + f"https://logo.clearbit.com/{symbol.lower()}.com", + f"https://storage.googleapis.com/iex/api/logos/{symbol}.png" + # Add more URLs here if needed + ] + + for url in urls_to_try: + try: + response = requests.get(url, headers=self.headers, timeout=5) + if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): + try: + # Verify it's a valid image before returning bytes + from io import BytesIO + img = Image.open(BytesIO(response.content)) + img.verify() + logger.info(f"Successfully fetched logo data for {symbol} from {url}") + return response.content + except Exception as img_err: + logger.warning(f"Invalid image data from {url} for {symbol}: {img_err}") + continue # Try next URL + except requests.exceptions.RequestException as req_err: + logger.warning(f"Error fetching logo from {url} for {symbol}: {req_err}") + continue # Try next URL + + logger.warning(f"Could not fetch logo data for {symbol} from any source.") + return None + def _draw_chart(self, symbol: str, data: Dict[str, Any]): """Draw a price chart for the stock.""" if not data.get('price_history'): @@ -322,105 +347,46 @@ class StockManager: else: logger.error("Failed to fetch data for any configured stocks") - def _download_stock_logo(self, symbol: str) -> str: - """Download and save stock logo for a given symbol. + def _download_stock_logo(self, symbol: str) -> str | None: + """Attempt to download and save stock logo, returning the path if successful. + + Checks if logo already exists. If not, fetches data using + _fetch_logo_data and attempts to save it to self.logo_dir. Args: - symbol: Stock symbol (e.g., 'AAPL', 'MSFT') + symbol: Stock symbol. Returns: - Path to the saved logo image, or None if download failed + Path to the saved logo image if exists or saved successfully, + otherwise None. """ + filename = f"{symbol.lower()}.png" + filepath = os.path.join(self.logo_dir, filename) + + # Check if we already have the logo + if os.path.exists(filepath): + logger.debug(f"Found existing logo for {symbol} at {filepath}") + return filepath + + # If not found, try fetching the logo data + logger.info(f"Logo for {symbol} not found locally, attempting download.") + logo_bytes = self._fetch_logo_data(symbol) + + if not logo_bytes: + return None # Fetching failed + + # Try to save the fetched data try: - # Create a filename based on the symbol - filename = f"{symbol.lower()}.png" - filepath = os.path.join(self.logo_dir, filename) - - # Check if we already have the logo - if os.path.exists(filepath): - return filepath - - # Try to find logo from various sources - # 1. Yahoo Finance - yahoo_url = f"https://logo.clearbit.com/{symbol.lower()}.com" - - # 2. Alternative source if Yahoo fails - alt_url = f"https://storage.googleapis.com/iex/api/logos/{symbol}.png" - - # Try Yahoo first - response = requests.get(yahoo_url, headers=self.headers, timeout=5) - if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): - try: - # Verify it's a valid image before saving - from io import BytesIO - img = Image.open(BytesIO(response.content)) - img.verify() # Verify it's a valid image - - # Try to save the file - try: - with open(filepath, 'wb') as f: - f.write(response.content) - logger.info(f"Downloaded logo for {symbol} from Yahoo") - return filepath - except PermissionError: - logger.warning(f"Permission denied when saving logo for {symbol}. Using in-memory logo instead.") - # Return a temporary path that won't be used for saving - return f"temp_{symbol.lower()}.png" - except Exception as e: - logger.warning(f"Invalid image data from Yahoo for {symbol}: {str(e)}") - - # Try alternative source - response = requests.get(alt_url, headers=self.headers, timeout=5) - if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): - try: - # Verify it's a valid image before saving - from io import BytesIO - img = Image.open(BytesIO(response.content)) - img.verify() # Verify it's a valid image - - # Try to save the file - try: - with open(filepath, 'wb') as f: - f.write(response.content) - logger.info(f"Downloaded logo for {symbol} from alternative source") - return filepath - except PermissionError: - logger.warning(f"Permission denied when saving logo for {symbol}. Using in-memory logo instead.") - # Return a temporary path that won't be used for saving - return f"temp_{symbol.lower()}.png" - except Exception as e: - logger.warning(f"Invalid image data from alternative source for {symbol}: {str(e)}") - - # Try a third source - company.com domain - company_url = f"https://logo.clearbit.com/{symbol.lower()}.com" - if company_url != yahoo_url: # Avoid duplicate request - response = requests.get(company_url, headers=self.headers, timeout=5) - if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): - try: - # Verify it's a valid image before saving - from io import BytesIO - img = Image.open(BytesIO(response.content)) - img.verify() # Verify it's a valid image - - # Try to save the file - try: - with open(filepath, 'wb') as f: - f.write(response.content) - logger.info(f"Downloaded logo for {symbol} from company domain") - return filepath - except PermissionError: - logger.warning(f"Permission denied when saving logo for {symbol}. Using in-memory logo instead.") - # Return a temporary path that won't be used for saving - return f"temp_{symbol.lower()}.png" - except Exception as e: - logger.warning(f"Invalid image data from company domain for {symbol}: {str(e)}") - - logger.warning(f"Could not download logo for {symbol}") - return None - + with open(filepath, 'wb') as f: + f.write(logo_bytes) + logger.info(f"Saved new logo for {symbol} to {filepath}") + return filepath # Return path on successful save + except (PermissionError, OSError) as e: + logger.warning(f"Failed to save logo for {symbol} to '{filepath}': {e}. Logo will be loaded in-memory.") + return None # Return None indicates save failure except Exception as e: - logger.error(f"Error downloading logo for {symbol}: {str(e)}") - return None + logger.error(f"Unexpected error saving logo for {symbol}: {e}") + return None # Return None indicates save failure def _get_stock_logo(self, symbol: str) -> Image.Image: """Get stock logo image, or create a text-based fallback. @@ -431,75 +397,55 @@ class StockManager: Returns: PIL Image of the logo or text-based fallback """ - # Try to download the logo if we don't have it + # Try to get the path to a saved logo (or save it) logo_path = self._download_stock_logo(symbol) + # If we have a path, try to load from disk if logo_path: try: - # Check if this is a temporary path (in-memory logo) - if logo_path.startswith("temp_"): - # For temporary paths, we need to download the logo again - # since we couldn't save it to disk - symbol_lower = symbol.lower() - yahoo_url = f"https://logo.clearbit.com/{symbol_lower}.com" - alt_url = f"https://storage.googleapis.com/iex/api/logos/{symbol}.png" - company_url = f"https://logo.clearbit.com/{symbol_lower}.com" - - # Try all sources - for url in [yahoo_url, alt_url, company_url]: - try: - response = requests.get(url, headers=self.headers, timeout=5) - if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): - try: - # Create image from response content - from io import BytesIO - logo = Image.open(BytesIO(response.content)) - # Verify it's a valid image - logo.verify() - # Reopen after verify - logo = Image.open(BytesIO(response.content)) - - # Convert to RGBA if not already - if logo.mode != 'RGBA': - logo = logo.convert('RGBA') - - # Resize to fit in the display (assuming square logo) - max_size = min(int(self.display_manager.matrix.width / 1.5), - int(self.display_manager.matrix.height / 1.5)) - logo = logo.resize((max_size, max_size), Image.LANCZOS) - - return logo - except Exception as e: - logger.warning(f"Invalid image data from {url} for {symbol}: {str(e)}") - continue - except Exception as e: - logger.warning(f"Error downloading from {url} for {symbol}: {str(e)}") - continue - else: - # Normal case: open the saved logo file - try: - logo = Image.open(logo_path) - # Verify it's a valid image - logo.verify() - # Reopen after verify - logo = Image.open(logo_path) - - # Convert to RGBA if not already - if logo.mode != 'RGBA': - logo = logo.convert('RGBA') - - # Resize to fit in the display (assuming square logo) - max_size = min(int(self.display_manager.matrix.width / 1.5), - int(self.display_manager.matrix.height / 1.5)) - logo = logo.resize((max_size, max_size), Image.LANCZOS) - - return logo - except Exception as e: - logger.warning(f"Invalid image file for {symbol}: {str(e)}") + logo = Image.open(logo_path) + # Verify it's a valid image + logo.verify() + # Reopen after verify + logo = Image.open(logo_path) + + # Convert to RGBA if not already + if logo.mode != 'RGBA': + logo = logo.convert('RGBA') + + # Resize to fit in the display + max_size = min(int(self.display_manager.matrix.width / 1.5), + int(self.display_manager.matrix.height / 1.5)) + logo = logo.resize((max_size, max_size), Image.LANCZOS) + + return logo except Exception as e: - logger.error(f"Error processing logo for {symbol}: {str(e)}") + logger.warning(f"Error loading saved logo file '{logo_path}' for {symbol}: {e}. Attempting in-memory load.") + # If loading from disk fails, proceed to in-memory load below - # Fallback to text-based logo + # If logo_path is None (save failed) or loading from disk failed, try loading into memory + logger.info(f"Attempting to load logo for {symbol} directly into memory.") + logo_bytes = self._fetch_logo_data(symbol) + if logo_bytes: + try: + from io import BytesIO + logo = Image.open(BytesIO(logo_bytes)) + # Verify (redundant but safe) + logo.verify() + logo = Image.open(BytesIO(logo_bytes)) + + if logo.mode != 'RGBA': + logo = logo.convert('RGBA') + + max_size = min(int(self.display_manager.matrix.width / 1.5), + int(self.display_manager.matrix.height / 1.5)) + logo = logo.resize((max_size, max_size), Image.LANCZOS) + return logo + except Exception as e: + logger.error(f"Error processing in-memory logo data for {symbol}: {e}") + + # Fallback if all attempts fail + logger.warning(f"Failed to obtain logo for {symbol} from disk or download.") return None def _create_stock_display(self, symbol: str, price: float, change: float, change_percent: float) -> Image.Image: