mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-04-12 05:42:59 +00:00
added handling of stock images to reduce number of downloads
This commit is contained in:
@@ -40,33 +40,29 @@ class StockManager:
|
|||||||
|
|
||||||
# Try to use the assets/stocks directory from the repository
|
# Try to use the assets/stocks directory from the repository
|
||||||
self.logo_dir = os.path.join('assets', 'stocks')
|
self.logo_dir = os.path.join('assets', 'stocks')
|
||||||
self.use_temp_dir = False
|
|
||||||
|
|
||||||
# Check if we can write to the logo directory
|
# Check if we can use the logo directory, otherwise use temporary
|
||||||
try:
|
try:
|
||||||
if not os.path.exists(self.logo_dir):
|
if not os.path.exists(self.logo_dir):
|
||||||
try:
|
try:
|
||||||
os.makedirs(self.logo_dir, mode=0o755, exist_ok=True)
|
os.makedirs(self.logo_dir, mode=0o755, exist_ok=True)
|
||||||
logger.info(f"Created logo directory: {self.logo_dir}")
|
logger.info(f"Created logo directory: {self.logo_dir}")
|
||||||
except (PermissionError, OSError) as e:
|
except (PermissionError, OSError) as e:
|
||||||
logger.warning(f"Cannot create logo directory: {str(e)}. Using temporary directory instead.")
|
logger.warning(f"Cannot create logo directory '{self.logo_dir}': {str(e)}. Using temporary directory.")
|
||||||
self.use_temp_dir = True
|
import tempfile
|
||||||
|
self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_')
|
||||||
elif not os.access(self.logo_dir, os.W_OK):
|
elif not os.access(self.logo_dir, os.W_OK):
|
||||||
logger.warning(f"Cannot write to logo directory: {self.logo_dir}. Using temporary directory instead.")
|
logger.warning(f"Cannot write to logo directory '{self.logo_dir}'. Using temporary directory.")
|
||||||
self.use_temp_dir = True
|
|
||||||
|
|
||||||
# If we need to use a temporary directory, create it
|
|
||||||
if self.use_temp_dir:
|
|
||||||
import tempfile
|
import tempfile
|
||||||
self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_')
|
self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_')
|
||||||
logger.info(f"Using temporary directory for logos: {self.logo_dir}")
|
|
||||||
|
logger.info(f"Using logo directory: {self.logo_dir}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error setting up logo directory: {str(e)}")
|
logger.error(f"Error setting up logo directory: {str(e)}")
|
||||||
# Fall back to using a temporary directory
|
# Fall back to using a temporary directory
|
||||||
import tempfile
|
import tempfile
|
||||||
self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_')
|
self.logo_dir = tempfile.mkdtemp(prefix='stock_logos_')
|
||||||
self.use_temp_dir = True
|
|
||||||
logger.info(f"Using temporary directory for logos: {self.logo_dir}")
|
logger.info(f"Using temporary directory for logos: {self.logo_dir}")
|
||||||
|
|
||||||
self.headers = {
|
self.headers = {
|
||||||
@@ -209,6 +205,35 @@ class StockManager:
|
|||||||
logger.error(f"Unexpected error fetching data for {symbol}: {e}")
|
logger.error(f"Unexpected error fetching data for {symbol}: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _fetch_logo_data(self, symbol: str) -> bytes | None:
|
||||||
|
"""Fetch logo image data from various sources."""
|
||||||
|
urls_to_try = [
|
||||||
|
f"https://logo.clearbit.com/{symbol.lower()}.com",
|
||||||
|
f"https://storage.googleapis.com/iex/api/logos/{symbol}.png"
|
||||||
|
# Add more URLs here if needed
|
||||||
|
]
|
||||||
|
|
||||||
|
for url in urls_to_try:
|
||||||
|
try:
|
||||||
|
response = requests.get(url, headers=self.headers, timeout=5)
|
||||||
|
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''):
|
||||||
|
try:
|
||||||
|
# Verify it's a valid image before returning bytes
|
||||||
|
from io import BytesIO
|
||||||
|
img = Image.open(BytesIO(response.content))
|
||||||
|
img.verify()
|
||||||
|
logger.info(f"Successfully fetched logo data for {symbol} from {url}")
|
||||||
|
return response.content
|
||||||
|
except Exception as img_err:
|
||||||
|
logger.warning(f"Invalid image data from {url} for {symbol}: {img_err}")
|
||||||
|
continue # Try next URL
|
||||||
|
except requests.exceptions.RequestException as req_err:
|
||||||
|
logger.warning(f"Error fetching logo from {url} for {symbol}: {req_err}")
|
||||||
|
continue # Try next URL
|
||||||
|
|
||||||
|
logger.warning(f"Could not fetch logo data for {symbol} from any source.")
|
||||||
|
return None
|
||||||
|
|
||||||
def _draw_chart(self, symbol: str, data: Dict[str, Any]):
|
def _draw_chart(self, symbol: str, data: Dict[str, Any]):
|
||||||
"""Draw a price chart for the stock."""
|
"""Draw a price chart for the stock."""
|
||||||
if not data.get('price_history'):
|
if not data.get('price_history'):
|
||||||
@@ -322,105 +347,46 @@ class StockManager:
|
|||||||
else:
|
else:
|
||||||
logger.error("Failed to fetch data for any configured stocks")
|
logger.error("Failed to fetch data for any configured stocks")
|
||||||
|
|
||||||
def _download_stock_logo(self, symbol: str) -> str:
|
def _download_stock_logo(self, symbol: str) -> str | None:
|
||||||
"""Download and save stock logo for a given symbol.
|
"""Attempt to download and save stock logo, returning the path if successful.
|
||||||
|
|
||||||
|
Checks if logo already exists. If not, fetches data using
|
||||||
|
_fetch_logo_data and attempts to save it to self.logo_dir.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
symbol: Stock symbol (e.g., 'AAPL', 'MSFT')
|
symbol: Stock symbol.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Path to the saved logo image, or None if download failed
|
Path to the saved logo image if exists or saved successfully,
|
||||||
|
otherwise None.
|
||||||
"""
|
"""
|
||||||
|
filename = f"{symbol.lower()}.png"
|
||||||
|
filepath = os.path.join(self.logo_dir, filename)
|
||||||
|
|
||||||
|
# Check if we already have the logo
|
||||||
|
if os.path.exists(filepath):
|
||||||
|
logger.debug(f"Found existing logo for {symbol} at {filepath}")
|
||||||
|
return filepath
|
||||||
|
|
||||||
|
# If not found, try fetching the logo data
|
||||||
|
logger.info(f"Logo for {symbol} not found locally, attempting download.")
|
||||||
|
logo_bytes = self._fetch_logo_data(symbol)
|
||||||
|
|
||||||
|
if not logo_bytes:
|
||||||
|
return None # Fetching failed
|
||||||
|
|
||||||
|
# Try to save the fetched data
|
||||||
try:
|
try:
|
||||||
# Create a filename based on the symbol
|
with open(filepath, 'wb') as f:
|
||||||
filename = f"{symbol.lower()}.png"
|
f.write(logo_bytes)
|
||||||
filepath = os.path.join(self.logo_dir, filename)
|
logger.info(f"Saved new logo for {symbol} to {filepath}")
|
||||||
|
return filepath # Return path on successful save
|
||||||
# Check if we already have the logo
|
except (PermissionError, OSError) as e:
|
||||||
if os.path.exists(filepath):
|
logger.warning(f"Failed to save logo for {symbol} to '{filepath}': {e}. Logo will be loaded in-memory.")
|
||||||
return filepath
|
return None # Return None indicates save failure
|
||||||
|
|
||||||
# Try to find logo from various sources
|
|
||||||
# 1. Yahoo Finance
|
|
||||||
yahoo_url = f"https://logo.clearbit.com/{symbol.lower()}.com"
|
|
||||||
|
|
||||||
# 2. Alternative source if Yahoo fails
|
|
||||||
alt_url = f"https://storage.googleapis.com/iex/api/logos/{symbol}.png"
|
|
||||||
|
|
||||||
# Try Yahoo first
|
|
||||||
response = requests.get(yahoo_url, headers=self.headers, timeout=5)
|
|
||||||
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''):
|
|
||||||
try:
|
|
||||||
# Verify it's a valid image before saving
|
|
||||||
from io import BytesIO
|
|
||||||
img = Image.open(BytesIO(response.content))
|
|
||||||
img.verify() # Verify it's a valid image
|
|
||||||
|
|
||||||
# Try to save the file
|
|
||||||
try:
|
|
||||||
with open(filepath, 'wb') as f:
|
|
||||||
f.write(response.content)
|
|
||||||
logger.info(f"Downloaded logo for {symbol} from Yahoo")
|
|
||||||
return filepath
|
|
||||||
except PermissionError:
|
|
||||||
logger.warning(f"Permission denied when saving logo for {symbol}. Using in-memory logo instead.")
|
|
||||||
# Return a temporary path that won't be used for saving
|
|
||||||
return f"temp_{symbol.lower()}.png"
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Invalid image data from Yahoo for {symbol}: {str(e)}")
|
|
||||||
|
|
||||||
# Try alternative source
|
|
||||||
response = requests.get(alt_url, headers=self.headers, timeout=5)
|
|
||||||
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''):
|
|
||||||
try:
|
|
||||||
# Verify it's a valid image before saving
|
|
||||||
from io import BytesIO
|
|
||||||
img = Image.open(BytesIO(response.content))
|
|
||||||
img.verify() # Verify it's a valid image
|
|
||||||
|
|
||||||
# Try to save the file
|
|
||||||
try:
|
|
||||||
with open(filepath, 'wb') as f:
|
|
||||||
f.write(response.content)
|
|
||||||
logger.info(f"Downloaded logo for {symbol} from alternative source")
|
|
||||||
return filepath
|
|
||||||
except PermissionError:
|
|
||||||
logger.warning(f"Permission denied when saving logo for {symbol}. Using in-memory logo instead.")
|
|
||||||
# Return a temporary path that won't be used for saving
|
|
||||||
return f"temp_{symbol.lower()}.png"
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Invalid image data from alternative source for {symbol}: {str(e)}")
|
|
||||||
|
|
||||||
# Try a third source - company.com domain
|
|
||||||
company_url = f"https://logo.clearbit.com/{symbol.lower()}.com"
|
|
||||||
if company_url != yahoo_url: # Avoid duplicate request
|
|
||||||
response = requests.get(company_url, headers=self.headers, timeout=5)
|
|
||||||
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''):
|
|
||||||
try:
|
|
||||||
# Verify it's a valid image before saving
|
|
||||||
from io import BytesIO
|
|
||||||
img = Image.open(BytesIO(response.content))
|
|
||||||
img.verify() # Verify it's a valid image
|
|
||||||
|
|
||||||
# Try to save the file
|
|
||||||
try:
|
|
||||||
with open(filepath, 'wb') as f:
|
|
||||||
f.write(response.content)
|
|
||||||
logger.info(f"Downloaded logo for {symbol} from company domain")
|
|
||||||
return filepath
|
|
||||||
except PermissionError:
|
|
||||||
logger.warning(f"Permission denied when saving logo for {symbol}. Using in-memory logo instead.")
|
|
||||||
# Return a temporary path that won't be used for saving
|
|
||||||
return f"temp_{symbol.lower()}.png"
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Invalid image data from company domain for {symbol}: {str(e)}")
|
|
||||||
|
|
||||||
logger.warning(f"Could not download logo for {symbol}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error downloading logo for {symbol}: {str(e)}")
|
logger.error(f"Unexpected error saving logo for {symbol}: {e}")
|
||||||
return None
|
return None # Return None indicates save failure
|
||||||
|
|
||||||
def _get_stock_logo(self, symbol: str) -> Image.Image:
|
def _get_stock_logo(self, symbol: str) -> Image.Image:
|
||||||
"""Get stock logo image, or create a text-based fallback.
|
"""Get stock logo image, or create a text-based fallback.
|
||||||
@@ -431,75 +397,55 @@ class StockManager:
|
|||||||
Returns:
|
Returns:
|
||||||
PIL Image of the logo or text-based fallback
|
PIL Image of the logo or text-based fallback
|
||||||
"""
|
"""
|
||||||
# Try to download the logo if we don't have it
|
# Try to get the path to a saved logo (or save it)
|
||||||
logo_path = self._download_stock_logo(symbol)
|
logo_path = self._download_stock_logo(symbol)
|
||||||
|
|
||||||
|
# If we have a path, try to load from disk
|
||||||
if logo_path:
|
if logo_path:
|
||||||
try:
|
try:
|
||||||
# Check if this is a temporary path (in-memory logo)
|
logo = Image.open(logo_path)
|
||||||
if logo_path.startswith("temp_"):
|
# Verify it's a valid image
|
||||||
# For temporary paths, we need to download the logo again
|
logo.verify()
|
||||||
# since we couldn't save it to disk
|
# Reopen after verify
|
||||||
symbol_lower = symbol.lower()
|
logo = Image.open(logo_path)
|
||||||
yahoo_url = f"https://logo.clearbit.com/{symbol_lower}.com"
|
|
||||||
alt_url = f"https://storage.googleapis.com/iex/api/logos/{symbol}.png"
|
|
||||||
company_url = f"https://logo.clearbit.com/{symbol_lower}.com"
|
|
||||||
|
|
||||||
# Try all sources
|
# Convert to RGBA if not already
|
||||||
for url in [yahoo_url, alt_url, company_url]:
|
if logo.mode != 'RGBA':
|
||||||
try:
|
logo = logo.convert('RGBA')
|
||||||
response = requests.get(url, headers=self.headers, timeout=5)
|
|
||||||
if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''):
|
|
||||||
try:
|
|
||||||
# Create image from response content
|
|
||||||
from io import BytesIO
|
|
||||||
logo = Image.open(BytesIO(response.content))
|
|
||||||
# Verify it's a valid image
|
|
||||||
logo.verify()
|
|
||||||
# Reopen after verify
|
|
||||||
logo = Image.open(BytesIO(response.content))
|
|
||||||
|
|
||||||
# Convert to RGBA if not already
|
# Resize to fit in the display
|
||||||
if logo.mode != 'RGBA':
|
max_size = min(int(self.display_manager.matrix.width / 1.5),
|
||||||
logo = logo.convert('RGBA')
|
int(self.display_manager.matrix.height / 1.5))
|
||||||
|
logo = logo.resize((max_size, max_size), Image.LANCZOS)
|
||||||
|
|
||||||
# Resize to fit in the display (assuming square logo)
|
return logo
|
||||||
max_size = min(int(self.display_manager.matrix.width / 1.5),
|
|
||||||
int(self.display_manager.matrix.height / 1.5))
|
|
||||||
logo = logo.resize((max_size, max_size), Image.LANCZOS)
|
|
||||||
|
|
||||||
return logo
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Invalid image data from {url} for {symbol}: {str(e)}")
|
|
||||||
continue
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Error downloading from {url} for {symbol}: {str(e)}")
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
# Normal case: open the saved logo file
|
|
||||||
try:
|
|
||||||
logo = Image.open(logo_path)
|
|
||||||
# Verify it's a valid image
|
|
||||||
logo.verify()
|
|
||||||
# Reopen after verify
|
|
||||||
logo = Image.open(logo_path)
|
|
||||||
|
|
||||||
# Convert to RGBA if not already
|
|
||||||
if logo.mode != 'RGBA':
|
|
||||||
logo = logo.convert('RGBA')
|
|
||||||
|
|
||||||
# Resize to fit in the display (assuming square logo)
|
|
||||||
max_size = min(int(self.display_manager.matrix.width / 1.5),
|
|
||||||
int(self.display_manager.matrix.height / 1.5))
|
|
||||||
logo = logo.resize((max_size, max_size), Image.LANCZOS)
|
|
||||||
|
|
||||||
return logo
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Invalid image file for {symbol}: {str(e)}")
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing logo for {symbol}: {str(e)}")
|
logger.warning(f"Error loading saved logo file '{logo_path}' for {symbol}: {e}. Attempting in-memory load.")
|
||||||
|
# If loading from disk fails, proceed to in-memory load below
|
||||||
|
|
||||||
# Fallback to text-based logo
|
# If logo_path is None (save failed) or loading from disk failed, try loading into memory
|
||||||
|
logger.info(f"Attempting to load logo for {symbol} directly into memory.")
|
||||||
|
logo_bytes = self._fetch_logo_data(symbol)
|
||||||
|
if logo_bytes:
|
||||||
|
try:
|
||||||
|
from io import BytesIO
|
||||||
|
logo = Image.open(BytesIO(logo_bytes))
|
||||||
|
# Verify (redundant but safe)
|
||||||
|
logo.verify()
|
||||||
|
logo = Image.open(BytesIO(logo_bytes))
|
||||||
|
|
||||||
|
if logo.mode != 'RGBA':
|
||||||
|
logo = logo.convert('RGBA')
|
||||||
|
|
||||||
|
max_size = min(int(self.display_manager.matrix.width / 1.5),
|
||||||
|
int(self.display_manager.matrix.height / 1.5))
|
||||||
|
logo = logo.resize((max_size, max_size), Image.LANCZOS)
|
||||||
|
return logo
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing in-memory logo data for {symbol}: {e}")
|
||||||
|
|
||||||
|
# Fallback if all attempts fail
|
||||||
|
logger.warning(f"Failed to obtain logo for {symbol} from disk or download.")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _create_stock_display(self, symbol: str, price: float, change: float, change_percent: float) -> Image.Image:
|
def _create_stock_display(self, symbol: str, price: float, change: float, change_percent: float) -> Image.Image:
|
||||||
|
|||||||
Reference in New Issue
Block a user