Files
LEDMatrix/src/stock_news_manager.py

420 lines
18 KiB
Python

import time
import logging
import requests
import json
import random
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os
import urllib.parse
import re
from src.config_manager import ConfigManager
from PIL import Image, ImageDraw
from .cache_manager import CacheManager
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StockNewsManager:
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
self.config_manager = ConfigManager()
self.display_manager = display_manager
self.stocks_config = config.get('stocks', {})
self.stock_news_config = config.get('stock_news', {})
self.last_update = 0
self.news_data = {}
self.current_news_group = 0 # Track which group of headlines we're showing
self.scroll_position = 0
self.cached_text_image = None # Cache for the text image
self.cached_text = None # Cache for the text string
self.cache_manager = CacheManager()
# Get scroll settings from config with faster defaults
self.scroll_speed = self.stock_news_config.get('scroll_speed', 1)
self.scroll_delay = self.stock_news_config.get('scroll_delay', 0.001) # Default to 1ms instead of 50ms
# Get headline settings from config
self.max_headlines_per_symbol = self.stock_news_config.get('max_headlines_per_symbol', 1)
self.headlines_per_rotation = self.stock_news_config.get('headlines_per_rotation', 2)
# Log the actual values being used
logger.info(f"Scroll settings - Speed: {self.scroll_speed} pixels/frame, Delay: {self.scroll_delay*1000:.2f}ms")
logger.info(f"Headline settings - Max per symbol: {self.max_headlines_per_symbol}, Per rotation: {self.headlines_per_rotation}")
# Initialize frame rate tracking
self.frame_count = 0
self.last_frame_time = time.time()
self.last_fps_log_time = time.time()
self.frame_times = [] # Keep track of recent frame times for average FPS
# Background image generation
self.background_image = None # The image being generated in background
self.is_generating_image = False # Flag to track if we're currently generating
self.last_generation_start = 0 # When we started generating
self.generation_timeout = 5 # Max seconds to spend generating
# Rotation tracking
self.all_news_items = [] # Store all available news items
self.current_rotation_index = 0 # Track which rotation we're on
self.rotation_complete = False # Flag to indicate when a full rotation is complete
self.headers = {
'User-Agent': 'LEDMatrix/1.0 (https://github.com/yourusername/LEDMatrix; contact@example.com)',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
# Set up session with retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=5, # increased number of retries
backoff_factor=1, # increased backoff factor
status_forcelist=[429, 500, 502, 503, 504], # added 429 to retry list
allowed_methods=["GET", "HEAD", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
# Initialize with first update
self.update_news_data()
def _fetch_news(self, symbol: str) -> List[Dict[str, Any]]:
"""Fetch news data for a stock from Yahoo Finance."""
try:
# Use Yahoo Finance query1 API for news data
url = f"https://query1.finance.yahoo.com/v1/finance/search"
params = {
'q': symbol,
'lang': 'en-US',
'region': 'US',
'quotesCount': 0,
'newsCount': 10,
'enableFuzzyQuery': False,
'quotesQueryId': 'tss_match_phrase_query',
'multiQuoteQueryId': 'multi_quote_single_token_query',
'newsQueryId': 'news_cie_vespa',
'enableCb': True,
}
# Use session with retry logic
response = self.session.get(
url,
headers=self.headers,
params=params,
timeout=10, # Increased timeout
verify=True # Enable SSL verification
)
if response.status_code != 200:
logger.error(f"Failed to fetch news for {symbol}: HTTP {response.status_code}")
return []
data = response.json()
news_items = data.get('news', [])
processed_news = []
for item in news_items:
try:
processed_news.append({
'title': item.get('title', ''),
'link': item.get('link', ''),
'publisher': item.get('publisher', ''),
'published': datetime.fromtimestamp(item.get('providerPublishTime', 0)),
'summary': item.get('summary', '')
})
except (ValueError, TypeError) as e:
logger.error(f"Error processing news item for {symbol}: {e}")
continue
logger.debug(f"Fetched {len(processed_news)} news items for {symbol}")
return processed_news
except requests.exceptions.SSLError as e:
logger.error(f"SSL error fetching news for {symbol}: {e}")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching news for {symbol}: {e}")
return []
except (ValueError, KeyError) as e:
logger.error(f"Error parsing news data for {symbol}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching news for {symbol}: {e}")
return []
def update_news_data(self):
"""Update news data for all configured stock symbols."""
current_time = time.time()
update_interval = self.stock_news_config.get('update_interval', 300)
# Check if we need to update based on time
if current_time - self.last_update > update_interval:
symbols = self.stocks_config.get('symbols', [])
if not symbols:
logger.warning("No stock symbols configured for news")
return
# Get cached data
cached_data = self.cache_manager.get_cached_data('stock_news')
# Update each symbol
new_data = {}
success = False
for symbol in symbols:
# Check if data has changed before fetching
if cached_data and symbol in cached_data:
current_state = cached_data[symbol]
if not self.cache_manager.has_data_changed('stock_news', current_state):
logger.info(f"News data hasn't changed for {symbol}, using existing data")
new_data[symbol] = current_state
success = True
continue
# Add a longer delay between requests to avoid rate limiting
time.sleep(random.uniform(1.0, 2.0)) # increased delay between requests
news_items = self._fetch_news(symbol)
if news_items:
new_data[symbol] = news_items
success = True
if success:
# Only update the displayed data when we have new data
self.news_data = new_data
self.last_update = current_time
logger.info(f"Updated news data for {len(new_data)} symbols")
else:
logger.error("Failed to fetch news for any configured stocks")
def _create_text_image(self, text: str, color: Tuple[int, int, int] = (255, 255, 255)) -> Image.Image:
"""Create an image containing the text for efficient scrolling."""
# Get text dimensions
bbox = self.display_manager.draw.textbbox((0, 0), text, font=self.display_manager.small_font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Create a new image with the text
text_image = Image.new('RGB', (text_width, self.display_manager.matrix.height), (0, 0, 0))
text_draw = ImageDraw.Draw(text_image)
# Draw the text centered vertically
y = (self.display_manager.matrix.height - text_height) // 2
text_draw.text((0, y), text, font=self.display_manager.small_font, fill=color)
return text_image
def _log_frame_rate(self):
"""Log frame rate statistics."""
current_time = time.time()
# Calculate instantaneous frame time
frame_time = current_time - self.last_frame_time
self.frame_times.append(frame_time)
# Keep only last 100 frames for average
if len(self.frame_times) > 100:
self.frame_times.pop(0)
# Log FPS every second
if current_time - self.last_fps_log_time >= 1.0:
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
avg_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
instant_fps = 1.0 / frame_time if frame_time > 0 else 0
logger.info(f"Frame stats - Avg FPS: {avg_fps:.1f}, Current FPS: {instant_fps:.1f}, Frame time: {frame_time*1000:.2f}ms")
self.last_fps_log_time = current_time
self.frame_count = 0
self.last_frame_time = current_time
self.frame_count += 1
def _generate_background_image(self, all_news, width, height):
"""Generate the full image in the background without disrupting display."""
if self.is_generating_image:
# Check if we've been generating too long
if time.time() - self.last_generation_start > self.generation_timeout:
logger.warning("[StockNews] Background image generation timed out, resetting")
self.is_generating_image = False
self.background_image = None
return False
# Still generating, return False to indicate not ready
return False
# Start a new background generation
self.is_generating_image = True
self.last_generation_start = time.time()
try:
# Log the number of headlines being displayed
logger.info(f"[StockNews] Generating image for {len(all_news)} headlines")
# First, create all news images to calculate total width needed
news_images = []
total_width = 0
screen_width_gap = width # Use a full screen width as the gap
# Add initial gap
total_width += screen_width_gap
for news in all_news:
news_text = f"{news['symbol']}: {news['title']} "
news_image = self._create_text_image(news_text)
news_images.append(news_image)
# Add width of news image plus gap
total_width += news_image.width + screen_width_gap
# Create the full image with calculated width
full_image = Image.new('RGB', (total_width, height), (0, 0, 0))
draw = ImageDraw.Draw(full_image)
# Now paste all news images with proper spacing
current_x = screen_width_gap # Start after initial gap
for news_image in news_images:
# Paste this news image into the full image
full_image.paste(news_image, (current_x, 0))
# Move to next position: text width + screen width gap
current_x += news_image.width + screen_width_gap
# Store the generated image
self.background_image = full_image
self.is_generating_image = False
return True
except Exception as e:
logger.error(f"[StockNews] Error generating background image: {e}")
self.is_generating_image = False
return False
def display_news(self):
"""Display news headlines by scrolling them across the screen."""
if not self.stock_news_config.get('enabled', False):
return
# Start update in background if needed
if time.time() - self.last_update >= self.stock_news_config.get('update_interval', 300):
self.update_news_data()
if not self.news_data:
logger.warning("No news data available to display")
return
# Get all news items from all symbols, respecting max_headlines_per_symbol
if not self.all_news_items:
self.all_news_items = []
for symbol, news_items in self.news_data.items():
# Limit the number of headlines per symbol
limited_items = news_items[:self.max_headlines_per_symbol]
for item in limited_items:
self.all_news_items.append({
"symbol": symbol,
"title": item["title"],
"publisher": item["publisher"]
})
# Shuffle the news items for variety
random.shuffle(self.all_news_items)
logger.info(f"Prepared {len(self.all_news_items)} news items for rotation")
if not self.all_news_items:
return
# Get the current rotation of headlines
start_idx = self.current_rotation_index * self.headlines_per_rotation
end_idx = min(start_idx + self.headlines_per_rotation, len(self.all_news_items))
# If we've reached the end, shuffle and start over
if start_idx >= len(self.all_news_items):
self.current_rotation_index = 0
random.shuffle(self.all_news_items) # Reshuffle when we've shown all headlines
start_idx = 0
end_idx = min(self.headlines_per_rotation, len(self.all_news_items))
self.rotation_complete = True
logger.info("Completed a full rotation of news headlines, reshuffling for next round")
# Get the current batch of headlines
current_news = self.all_news_items[start_idx:end_idx]
# Define width and height here, so they are always available
width = self.display_manager.matrix.width
height = self.display_manager.matrix.height
# Check if we need to generate a new image
if self.cached_text_image is None or self.rotation_complete:
# Reset rotation complete flag
self.rotation_complete = False
# Try to generate the image in the background
if self._generate_background_image(current_news, width, height):
# If generation completed successfully, use the background image
self.cached_text_image = self.background_image
self.scroll_position = 0
self.background_image = None # Clear the background image
# Move to next rotation for next time
self.current_rotation_index += 1
else:
# If still generating or failed, show a simple message
self.display_manager.image.paste(Image.new('RGB', (width, height), (0, 0, 0)), (0, 0))
draw = ImageDraw.Draw(self.display_manager.image)
draw.text((width//4, height//2), "Loading news...", font=self.display_manager.small_font, fill=(255, 255, 255))
self.display_manager.update_display()
time.sleep(0.1) # Short delay to prevent CPU hogging
return True
# --- Scrolling logic remains the same ---
if self.cached_text_image is None:
logger.warning("[StockNews] Cached image is None, cannot scroll.")
return False
total_width = self.cached_text_image.width
# If total_width is somehow less than screen width, don't scroll
if total_width <= width:
self.display_manager.image.paste(self.cached_text_image, (0, 0))
self.display_manager.update_display()
time.sleep(self.stock_news_config.get('item_display_duration', 5)) # Hold static image
self.cached_text_image = None # Force recreation next cycle
return True
# Update scroll position
self.scroll_position += self.scroll_speed
if self.scroll_position >= total_width:
self.scroll_position = 0 # Wrap around
# When we wrap around, move to next rotation
self.cached_text_image = None
return True
# Calculate the visible portion
# Handle wrap-around drawing
visible_end = self.scroll_position + width
if visible_end <= total_width:
# Normal case: Paste single crop
visible_portion = self.cached_text_image.crop((
self.scroll_position, 0,
visible_end, height
))
self.display_manager.image.paste(visible_portion, (0, 0))
else:
# Wrap-around case: Paste two parts
width1 = total_width - self.scroll_position
width2 = width - width1
portion1 = self.cached_text_image.crop((self.scroll_position, 0, total_width, height))
portion2 = self.cached_text_image.crop((0, 0, width2, height))
self.display_manager.image.paste(portion1, (0, 0))
self.display_manager.image.paste(portion2, (width1, 0))
self.display_manager.update_display()
self._log_frame_rate()
time.sleep(self.scroll_delay)
return True