Files
LEDMatrix/src/stock_news_manager.py

243 lines
9.9 KiB
Python

import time
import logging
import requests
import json
import random
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os
import urllib.parse
import re
from src.config_manager import ConfigManager
from PIL import Image, ImageDraw
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StockNewsManager:
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
self.config_manager = ConfigManager()
self.display_manager = display_manager
self.stocks_config = config.get('stocks', {})
self.stock_news_config = config.get('stock_news', {})
self.last_update = 0
self.news_data = {}
self.current_news_group = 0 # Track which group of headlines we're showing
self.scroll_position = 0
# Get scroll settings from config with faster defaults
self.scroll_speed = self.stock_news_config.get('scroll_speed', 1)
self.scroll_delay = self.stock_news_config.get('scroll_delay', 0.001) # Default to 1ms instead of 50ms
# Log the actual values being used
logger.info(f"Scroll settings - Speed: {self.scroll_speed} pixels/frame, Delay: {self.scroll_delay*1000:.2f}ms")
# Initialize frame rate tracking
self.frame_count = 0
self.last_frame_time = time.time()
self.last_fps_log_time = time.time()
self.frame_times = [] # Keep track of recent frame times for average FPS
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Initialize with first update
self.update_news_data()
def _fetch_news_for_symbol(self, symbol: str) -> List[Dict[str, Any]]:
"""Fetch news headlines for a stock symbol."""
try:
# Using Yahoo Finance API to get news
encoded_symbol = urllib.parse.quote(symbol)
url = f"https://query1.finance.yahoo.com/v1/finance/search?q={encoded_symbol}&lang=en-US&region=US&quotesCount=0&newsCount={self.stock_news_config.get('max_headlines_per_symbol', 5)}"
response = requests.get(url, headers=self.headers, timeout=5)
if response.status_code != 200:
logger.error(f"Failed to fetch news for {symbol}: HTTP {response.status_code}")
return []
data = response.json()
news_items = data.get('news', [])
# Process and format news items
formatted_news = []
for item in news_items:
formatted_news.append({
"title": item.get('title', ''),
"publisher": item.get('publisher', ''),
"link": item.get('link', ''),
"published": item.get('providerPublishTime', 0)
})
logger.info(f"Fetched {len(formatted_news)} news items for {symbol}")
return formatted_news
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching news for {symbol}: {e}")
return []
except (ValueError, IndexError, KeyError) as e:
logger.error(f"Error parsing news data for {symbol}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching news for {symbol}: {e}")
return []
def update_news_data(self):
"""Update news data for all configured stock symbols."""
current_time = time.time()
update_interval = self.stock_news_config.get('update_interval', 300) # Default to 5 minutes
# If not enough time has passed, keep using existing data
if current_time - self.last_update < update_interval:
return
# Get symbols from config
symbols = self.stocks_config.get('symbols', [])
if not symbols:
logger.warning("No stock symbols configured for news")
return
# Create temporary storage for new data
new_data = {}
success = False
for symbol in symbols:
# Add a small delay between requests to avoid rate limiting
time.sleep(random.uniform(0.1, 0.3))
news_items = self._fetch_news_for_symbol(symbol)
if news_items:
new_data[symbol] = news_items
success = True
if success:
# Only update the displayed data when we have new data
self.news_data = new_data
self.last_update = current_time
logger.info(f"Updated news data for {len(new_data)} symbols")
else:
logger.error("Failed to fetch news for any configured stocks")
def _create_text_image(self, text: str, color: Tuple[int, int, int] = (255, 255, 255)) -> Image.Image:
"""Create an image containing the text for efficient scrolling."""
# Get text dimensions
bbox = self.display_manager.draw.textbbox((0, 0), text, font=self.display_manager.small_font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Create a new image with the text
text_image = Image.new('RGB', (text_width, self.display_manager.matrix.height), (0, 0, 0))
text_draw = ImageDraw.Draw(text_image)
# Draw the text centered vertically
y = (self.display_manager.matrix.height - text_height) // 2
text_draw.text((0, y), text, font=self.display_manager.small_font, fill=color)
return text_image
def _log_frame_rate(self):
"""Log frame rate statistics."""
current_time = time.time()
# Calculate instantaneous frame time
frame_time = current_time - self.last_frame_time
self.frame_times.append(frame_time)
# Keep only last 100 frames for average
if len(self.frame_times) > 100:
self.frame_times.pop(0)
# Log FPS every second
if current_time - self.last_fps_log_time >= 1.0:
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
avg_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
instant_fps = 1.0 / frame_time if frame_time > 0 else 0
logger.info(f"Frame stats - Avg FPS: {avg_fps:.1f}, Current FPS: {instant_fps:.1f}, Frame time: {frame_time*1000:.2f}ms")
self.last_fps_log_time = current_time
self.frame_count = 0
self.last_frame_time = current_time
self.frame_count += 1
def display_news(self):
"""Display news headlines by scrolling them across the screen."""
if not self.stock_news_config.get('enabled', False):
return
# Start update in background if needed
if time.time() - self.last_update >= self.stock_news_config.get('update_interval', 300):
self.update_news_data()
if not self.news_data:
logger.warning("No news data available to display")
return
# Get all news items from all symbols
all_news = []
for symbol, news_items in self.news_data.items():
for item in news_items:
all_news.append({
"symbol": symbol,
"title": item["title"],
"publisher": item["publisher"]
})
if not all_news:
return
# Get the number of headlines to show per rotation
headlines_per_rotation = self.stock_news_config.get('headlines_per_rotation', 2)
total_headlines = len(all_news)
# Calculate the starting index for the current group
start_idx = (self.current_news_group * headlines_per_rotation) % total_headlines
# Build the text for all headlines in this group
news_texts = []
for i in range(headlines_per_rotation):
idx = (start_idx + i) % total_headlines
news = all_news[idx]
news_texts.append(f"{news['symbol']}: {news['title']}")
# Join all headlines with a separator
separator = " - " # Visual separator between news items
news_text = separator.join(news_texts)
# Create a text image for efficient scrolling
text_image = self._create_text_image(news_text)
text_width = text_image.width
text_height = text_image.height
# Calculate display position
display_width = self.display_manager.matrix.width
total_width = text_width + display_width
# Update scroll position
self.scroll_position = (self.scroll_position + self.scroll_speed) % total_width
# Clear the display
self.display_manager.clear()
# Calculate the visible portion of the text
visible_width = min(display_width, text_width - self.scroll_position)
if visible_width > 0:
# Crop the text image to show only the visible portion
visible_portion = text_image.crop((self.scroll_position, 0,
self.scroll_position + visible_width, text_height))
# Paste the visible portion onto the display
self.display_manager.image.paste(visible_portion, (0, 0))
# Update the display
self.display_manager.update_display()
# If we've completed a full scroll, move to the next group
if self.scroll_position == 0:
self.current_news_group = (self.current_news_group + 1) % ((total_headlines + headlines_per_rotation - 1) // headlines_per_rotation)
# Log frame rate
self._log_frame_rate()
return True