Files
LEDMatrix/src/stock_news_manager.py
2025-09-12 17:15:45 -04:00

515 lines
23 KiB
Python

import time
import logging
import requests
import json
import random
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os
import urllib.parse
import re
from src.config_manager import ConfigManager
from PIL import Image, ImageDraw
from .cache_manager import CacheManager
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Import the API counter function from web interface
try:
from web_interface_v2 import increment_api_counter
except ImportError:
# Fallback if web interface is not available
def increment_api_counter(kind: str, count: int = 1):
pass
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StockNewsManager:
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
# Store reference to config instead of creating new ConfigManager
self.config_manager = None # Not used in this class
self.display_manager = display_manager
self.stocks_config = config.get('stocks', {})
self.stock_news_config = config.get('stock_news', {})
self.last_update = 0
self.news_data = {}
self.current_news_group = 0 # Track which group of headlines we're showing
self.scroll_position = 0
self.cached_text_image = None # Cache for the text image
self.cached_text = None # Cache for the text string
self.cache_manager = CacheManager()
# Get scroll settings from config with faster defaults
self.scroll_speed = self.stock_news_config.get('scroll_speed', 1)
self.scroll_delay = self.stock_news_config.get('scroll_delay', 0.005) # Default to 5ms for smoother scrolling
# Get headline settings from config
self.max_headlines_per_symbol = self.stock_news_config.get('max_headlines_per_symbol', 1)
self.headlines_per_rotation = self.stock_news_config.get('headlines_per_rotation', 2)
# Dynamic duration settings
self.dynamic_duration_enabled = self.stock_news_config.get('dynamic_duration', True)
self.min_duration = self.stock_news_config.get('min_duration', 30)
self.max_duration = self.stock_news_config.get('max_duration', 300)
self.duration_buffer = self.stock_news_config.get('duration_buffer', 0.1)
self.dynamic_duration = 60 # Default duration in seconds
self.total_scroll_width = 0 # Track total width for dynamic duration calculation
# Log the actual values being used
logger.info(f"Scroll settings - Speed: {self.scroll_speed} pixels/frame, Delay: {self.scroll_delay*1000:.2f}ms")
logger.info(f"Headline settings - Max per symbol: {self.max_headlines_per_symbol}, Per rotation: {self.headlines_per_rotation}")
# Initialize frame rate tracking
self.frame_count = 0
self.last_frame_time = time.time()
self.last_fps_log_time = time.time()
self.frame_times = [] # Keep track of recent frame times for average FPS
# Background image generation
self.background_image = None # The image being generated in background
self.is_generating_image = False # Flag to track if we're currently generating
self.last_generation_start = 0 # When we started generating
self.generation_timeout = 5 # Max seconds to spend generating
# Rotation tracking
self.all_news_items = [] # Store all available news items
self.current_rotation_index = 0 # Track which rotation we're on
self.rotation_complete = False # Flag to indicate when a full rotation is complete
self.headers = {
'User-Agent': 'LEDMatrix/1.0 (https://github.com/yourusername/LEDMatrix; contact@example.com)',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
# Set up session with retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=5, # increased number of retries
backoff_factor=1, # increased backoff factor
status_forcelist=[429, 500, 502, 503, 504], # added 429 to retry list
allowed_methods=["GET", "HEAD", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
# Initialize with first update
self.update_news_data()
def _fetch_news(self, symbol: str) -> List[Dict[str, Any]]:
"""Fetch news data for a stock from Yahoo Finance."""
try:
# Use Yahoo Finance query1 API for news data
url = f"https://query1.finance.yahoo.com/v1/finance/search"
params = {
'q': symbol,
'lang': 'en-US',
'region': 'US',
'quotesCount': 0,
'newsCount': 10,
'enableFuzzyQuery': False,
'quotesQueryId': 'tss_match_phrase_query',
'multiQuoteQueryId': 'multi_quote_single_token_query',
'newsQueryId': 'news_cie_vespa',
'enableCb': True,
}
# Use session with retry logic
response = self.session.get(
url,
headers=self.headers,
params=params,
timeout=10, # Increased timeout
verify=True # Enable SSL verification
)
if response.status_code != 200:
logger.error(f"Failed to fetch news for {symbol}: HTTP {response.status_code}")
return []
data = response.json()
# Increment API counter for news data call
increment_api_counter('news', 1)
news_items = data.get('news', [])
processed_news = []
for item in news_items:
try:
processed_news.append({
'title': item.get('title', ''),
'link': item.get('link', ''),
'publisher': item.get('publisher', ''),
'published': datetime.fromtimestamp(item.get('providerPublishTime', 0)),
'summary': item.get('summary', '')
})
except (ValueError, TypeError) as e:
logger.error(f"Error processing news item for {symbol}: {e}")
continue
logger.debug(f"Fetched {len(processed_news)} news items for {symbol}")
return processed_news
except requests.exceptions.SSLError as e:
logger.error(f"SSL error fetching news for {symbol}: {e}")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching news for {symbol}: {e}")
return []
except (ValueError, KeyError) as e:
logger.error(f"Error parsing news data for {symbol}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching news for {symbol}: {e}")
return []
def update_news_data(self):
"""Update news data from API."""
current_time = time.time()
update_interval = self.stock_news_config.get('update_interval', 3600)
# Check if we're currently scrolling and defer the update if so
if self.display_manager.is_currently_scrolling():
logger.debug("Stock news display is currently scrolling, deferring update")
self.display_manager.defer_update(self._perform_news_update, priority=2)
return
self._perform_news_update()
def _perform_news_update(self):
"""Internal method to perform the actual news update."""
current_time = time.time()
update_interval = self.stock_news_config.get('update_interval', 3600)
if current_time - self.last_update < update_interval:
return
try:
logger.debug("Updating stock news data")
symbols = self.stocks_config.get('symbols', [])
if not symbols:
logger.warning("No stock symbols configured for news")
return
# Fetch news for each symbol
for symbol in symbols:
try:
news = self._fetch_news_for_symbol(symbol)
if news:
self.news_data[symbol] = news
logger.debug(f"Updated news for {symbol}: {len(news)} headlines")
except Exception as e:
logger.error(f"Error fetching news for {symbol}: {e}")
self.last_update = current_time
# Clear cached text to force regeneration
self.cached_text = None
self.cached_text_image = None
except Exception as e:
logger.error(f"Error updating stock news data: {e}")
def _create_text_image(self, text: str, color: Tuple[int, int, int] = (255, 255, 255)) -> Image.Image:
"""Create an image containing the text for efficient scrolling."""
# Get text dimensions
bbox = self.display_manager.draw.textbbox((0, 0), text, font=self.display_manager.small_font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Create a new image with the text
text_image = Image.new('RGB', (text_width, self.display_manager.matrix.height), (0, 0, 0))
text_draw = ImageDraw.Draw(text_image)
# Draw the text centered vertically
y = (self.display_manager.matrix.height - text_height) // 2
text_draw.text((0, y), text, font=self.display_manager.small_font, fill=color)
return text_image
def _log_frame_rate(self):
"""Log frame rate statistics."""
current_time = time.time()
# Calculate instantaneous frame time
frame_time = current_time - self.last_frame_time
self.frame_times.append(frame_time)
# Keep only last 100 frames for average
if len(self.frame_times) > 100:
self.frame_times.pop(0)
# Log FPS every second
if current_time - self.last_fps_log_time >= 1.0:
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
avg_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
instant_fps = 1.0 / frame_time if frame_time > 0 else 0
logger.info(f"Frame stats - Avg FPS: {avg_fps:.1f}, Current FPS: {instant_fps:.1f}, Frame time: {frame_time*1000:.2f}ms")
self.last_fps_log_time = current_time
self.frame_count = 0
self.last_frame_time = current_time
self.frame_count += 1
def _generate_background_image(self, all_news, width, height):
"""Generate the full image in the background without disrupting display."""
if self.is_generating_image:
# Check if we've been generating too long
if time.time() - self.last_generation_start > self.generation_timeout:
logger.warning("[StockNews] Background image generation timed out, resetting")
self.is_generating_image = False
self.background_image = None
return False
# Still generating, return False to indicate not ready
return False
# Start a new background generation
self.is_generating_image = True
self.last_generation_start = time.time()
try:
# Log the number of headlines being displayed
logger.info(f"[StockNews] Generating image for {len(all_news)} headlines")
# First, create all news images to calculate total width needed
news_images = []
total_width = 0
screen_width_gap = width # Use a full screen width as the gap
# Add initial gap
total_width += screen_width_gap
for news in all_news:
news_text = f"{news['symbol']}: {news['title']} "
news_image = self._create_text_image(news_text)
news_images.append(news_image)
# Add width of news image plus gap
total_width += news_image.width + screen_width_gap
# Create the full image with calculated width
full_image = Image.new('RGB', (total_width, height), (0, 0, 0))
draw = ImageDraw.Draw(full_image)
# Now paste all news images with proper spacing
current_x = screen_width_gap # Start after initial gap
for news_image in news_images:
# Paste this news image into the full image
full_image.paste(news_image, (current_x, 0))
# Move to next position: text width + screen width gap
current_x += news_image.width + screen_width_gap
# Store the generated image
self.background_image = full_image
self.is_generating_image = False
return True
except Exception as e:
logger.error(f"[StockNews] Error generating background image: {e}")
self.is_generating_image = False
return False
def display_news(self):
"""Display news headlines by scrolling them across the screen."""
if not self.stock_news_config.get('enabled', False):
return
# Start update in background if needed
if time.time() - self.last_update >= self.stock_news_config.get('update_interval', 300):
self.update_news_data()
if not self.news_data:
logger.warning("No news data available to display")
return
# Get all news items from all symbols, respecting max_headlines_per_symbol
if not self.all_news_items:
self.all_news_items = []
for symbol, news_items in self.news_data.items():
# Limit the number of headlines per symbol
limited_items = news_items[:self.max_headlines_per_symbol]
for item in limited_items:
self.all_news_items.append({
"symbol": symbol,
"title": item["title"],
"publisher": item["publisher"]
})
# Shuffle the news items for variety
random.shuffle(self.all_news_items)
logger.info(f"Prepared {len(self.all_news_items)} news items for rotation")
if not self.all_news_items:
return
# Get the current rotation of headlines
start_idx = self.current_rotation_index * self.headlines_per_rotation
end_idx = min(start_idx + self.headlines_per_rotation, len(self.all_news_items))
# If we've reached the end, shuffle and start over
if start_idx >= len(self.all_news_items):
self.current_rotation_index = 0
random.shuffle(self.all_news_items) # Reshuffle when we've shown all headlines
start_idx = 0
end_idx = min(self.headlines_per_rotation, len(self.all_news_items))
self.rotation_complete = True
logger.info("Completed a full rotation of news headlines, reshuffling for next round")
# Get the current batch of headlines
current_news = self.all_news_items[start_idx:end_idx]
# Define width and height here, so they are always available
width = self.display_manager.matrix.width
height = self.display_manager.matrix.height
# Check if we need to generate a new image
if self.cached_text_image is None or self.rotation_complete:
# Reset rotation complete flag
self.rotation_complete = False
# Try to generate the image in the background
if self._generate_background_image(current_news, width, height):
# If generation completed successfully, use the background image
self.cached_text_image = self.background_image
self.scroll_position = 0
self.background_image = None # Clear the background image
# Calculate total scroll width for dynamic duration
self.total_scroll_width = self.cached_text_image.width
self.calculate_dynamic_duration()
# Move to next rotation for next time
self.current_rotation_index += 1
else:
# If still generating or failed, show a simple message
self.display_manager.image.paste(Image.new('RGB', (width, height), (0, 0, 0)), (0, 0))
draw = ImageDraw.Draw(self.display_manager.image)
draw.text((width//4, height//2), "Loading news...", font=self.display_manager.small_font, fill=(255, 255, 255))
self.display_manager.update_display()
# Removed sleep delay to improve scrolling performance
return True
# --- Scrolling logic remains the same ---
if self.cached_text_image is None:
logger.warning("[StockNews] Cached image is None, cannot scroll.")
return False
total_width = self.cached_text_image.width
# If total_width is somehow less than screen width, don't scroll
if total_width <= width:
# Signal that we're not scrolling for this frame
self.display_manager.set_scrolling_state(False)
# Process any deferred updates
self.display_manager.process_deferred_updates()
self.display_manager.image.paste(self.cached_text_image, (0, 0))
self.display_manager.update_display()
time.sleep(self.stock_news_config.get('item_display_duration', 5)) # Hold static image
self.cached_text_image = None # Force recreation next cycle
return True
# Signal that we're scrolling
self.display_manager.set_scrolling_state(True)
# Process any deferred updates (though news is usually always scrolling)
self.display_manager.process_deferred_updates()
# Update scroll position
self.scroll_position += self.scroll_speed
if self.scroll_position >= total_width:
self.scroll_position = 0 # Wrap around
# When we wrap around, move to next rotation
self.cached_text_image = None
return True
# Calculate the visible portion
# Handle wrap-around drawing
visible_end = self.scroll_position + width
if visible_end <= total_width:
# Normal case: Paste single crop
visible_portion = self.cached_text_image.crop((
self.scroll_position, 0,
visible_end, height
))
self.display_manager.image.paste(visible_portion, (0, 0))
else:
# Wrap-around case: Paste two parts
width1 = total_width - self.scroll_position
width2 = width - width1
portion1 = self.cached_text_image.crop((self.scroll_position, 0, total_width, height))
portion2 = self.cached_text_image.crop((0, 0, width2, height))
self.display_manager.image.paste(portion1, (0, 0))
self.display_manager.image.paste(portion2, (width1, 0))
self.display_manager.update_display()
self._log_frame_rate()
time.sleep(self.scroll_delay)
return True
def calculate_dynamic_duration(self):
"""Calculate the exact time needed to display all news headlines"""
# If dynamic duration is disabled, use fixed duration from config
if not self.dynamic_duration_enabled:
self.dynamic_duration = self.stock_news_config.get('fixed_duration', 60)
logger.debug(f"Dynamic duration disabled, using fixed duration: {self.dynamic_duration}s")
return
if not self.total_scroll_width:
self.dynamic_duration = self.min_duration # Use configured minimum
return
try:
# Get display width (assume full width of display)
display_width = getattr(self.display_manager, 'width', 128) # Default to 128 if not available
# Calculate total scroll distance needed
# Text needs to scroll from right edge to completely off left edge
total_scroll_distance = display_width + self.total_scroll_width
# Calculate time based on scroll speed and delay
# scroll_speed = pixels per frame, scroll_delay = seconds per frame
frames_needed = total_scroll_distance / self.scroll_speed
total_time = frames_needed * self.scroll_delay
# Add buffer time for smooth cycling (configurable %)
buffer_time = total_time * self.duration_buffer
calculated_duration = int(total_time + buffer_time)
# Apply configured min/max limits
if calculated_duration < self.min_duration:
self.dynamic_duration = self.min_duration
logger.debug(f"Duration capped to minimum: {self.min_duration}s")
elif calculated_duration > self.max_duration:
self.dynamic_duration = self.max_duration
logger.debug(f"Duration capped to maximum: {self.max_duration}s")
else:
self.dynamic_duration = calculated_duration
logger.debug(f"Stock news dynamic duration calculation:")
logger.debug(f" Display width: {display_width}px")
logger.debug(f" Text width: {self.total_scroll_width}px")
logger.debug(f" Total scroll distance: {total_scroll_distance}px")
logger.debug(f" Frames needed: {frames_needed:.1f}")
logger.debug(f" Base time: {total_time:.2f}s")
logger.debug(f" Buffer time: {buffer_time:.2f}s ({self.duration_buffer*100}%)")
logger.debug(f" Calculated duration: {calculated_duration}s")
logger.debug(f" Final duration: {self.dynamic_duration}s")
except Exception as e:
logger.error(f"Error calculating dynamic duration: {e}")
self.dynamic_duration = self.min_duration # Use configured minimum as fallback
def get_dynamic_duration(self) -> int:
"""Get the calculated dynamic duration for display"""
return self.dynamic_duration