Files
LEDMatrix/src/stock_news_manager.py
Chuck 3d662baf54 Stocks (#2)
* Opening Bell

Introducing the Stock Ticker Feature

* Update stock_manager.py

Assume folder exists

* Update stock_manager.py

removing logos to focus on function for now

* Update stock_manager.py

parse yahoo scripts

* Update stock_manager.py

stock query update

* Update stock_manager.py

slow down stock display

* Update display_controller.py

adjust screen flow

* Update stock_manager.py

shipping features

* Update stock_manager.py

stock refresh in the background

* Customize Display timings

customize display timings

* Update stock_manager.py

stock font size change

* Sizing and Spacing

CHanged font sizing on chart and clock spacing

* Update clock.py

Date format changes

* Update stock_manager.py

actually read stocks from config file

* Update stock_manager.py

add config manager

* readme update

readme update and formatting for better flow

* Update .gitignore

rename reference folder

* Update config.json

changed default stocks to test update implementation

* Stock News

Stock news Ticker

* Update config.json

increase scroll speed

* Scroll Performance

Tuning news scrolling performance

* updating scroll direction

orienting scroll direction

* News tuning

removed test files and increased scroll speed

* Create test_news_manager.py

need a test script to call upon

* Update test_news_manager.py

test script tuning

* troubleshooting test script

* Update test_news_manager.py

* Update config.json

scroll speed increases

* Update config.json

scroll tuning

* Update config.json

speeding up

* Update config.json

still making text faster

* Update config.json

Trying to tune scrolling

* Update config.json

testing crazy parameters

* Update test_news_manager.py

remove sleep delay

* scroll tuning

scroll tuning

* scroll logging and debugging

FPS counter and debug messages

* Update config.json

matrix speed tuning

* Update news_manager.py

News separator

* Update news_manager.py

separator character change

* Stock News manager Rename

rename stock news ticker to enable other news in the future

* Update display_controller.py

load config update

* Update stock_manager.py

remove redundant import

* Stock news settings

Stock news has more granular control

* Stock news joins the lineup

Stock News added to the display controller and drawing display instead of image

* Optimize scrolling text performance for news ticker

* Adjust matrix settings to reduce artifacting while maintaining performance

* changed float to integer

* Fix news ticker performance with simplified scrolling mechanism

* Fix stock news scrolling in test environment: - Optimize display manager settings for smooth scrolling - Add proper display initialization and cleanup in test script - Implement timing control to prevent display buffer overflow - Ensure consistent 1ms delay between updates for smooth scrolling

* Optimize stock news scrolling for better performance: - Use pre-rendered text image for efficient scrolling - Implement cropping and pasting for smoother animation - Remove unnecessary display operations and delays

* Optimize stock news display performance: - Cache text image to reduce rendering overhead - Improve frame creation and update logic - Optimize text wrapping for smoother scrolling - Remove unnecessary display clears

* Optimize stock news display in controller: - Remove global sleep delay - Allow news display to run at full speed - Keep slower update rates for other displays

---------

Signed-off-by: Chuck <33324927+ChuckBuilds@users.noreply.github.com>
2025-04-11 11:10:50 -05:00

268 lines
11 KiB
Python

import time
import logging
import requests
import json
import random
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os
import urllib.parse
import re
from src.config_manager import ConfigManager
from PIL import Image, ImageDraw
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StockNewsManager:
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
self.config_manager = ConfigManager()
self.display_manager = display_manager
self.stocks_config = config.get('stocks', {})
self.stock_news_config = config.get('stock_news', {})
self.last_update = 0
self.news_data = {}
self.current_news_group = 0 # Track which group of headlines we're showing
self.scroll_position = 0
self.cached_text_image = None # Cache for the text image
self.cached_text = None # Cache for the text string
# Get scroll settings from config with faster defaults
self.scroll_speed = self.stock_news_config.get('scroll_speed', 1)
self.scroll_delay = self.stock_news_config.get('scroll_delay', 0.001) # Default to 1ms instead of 50ms
# Log the actual values being used
logger.info(f"Scroll settings - Speed: {self.scroll_speed} pixels/frame, Delay: {self.scroll_delay*1000:.2f}ms")
# Initialize frame rate tracking
self.frame_count = 0
self.last_frame_time = time.time()
self.last_fps_log_time = time.time()
self.frame_times = [] # Keep track of recent frame times for average FPS
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Initialize with first update
self.update_news_data()
def _fetch_news_for_symbol(self, symbol: str) -> List[Dict[str, Any]]:
"""Fetch news headlines for a stock symbol."""
try:
# Using Yahoo Finance API to get news
encoded_symbol = urllib.parse.quote(symbol)
url = f"https://query1.finance.yahoo.com/v1/finance/search?q={encoded_symbol}&lang=en-US&region=US&quotesCount=0&newsCount={self.stock_news_config.get('max_headlines_per_symbol', 5)}"
response = requests.get(url, headers=self.headers, timeout=5)
if response.status_code != 200:
logger.error(f"Failed to fetch news for {symbol}: HTTP {response.status_code}")
return []
data = response.json()
news_items = data.get('news', [])
# Process and format news items
formatted_news = []
for item in news_items:
formatted_news.append({
"title": item.get('title', ''),
"publisher": item.get('publisher', ''),
"link": item.get('link', ''),
"published": item.get('providerPublishTime', 0)
})
logger.info(f"Fetched {len(formatted_news)} news items for {symbol}")
return formatted_news
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching news for {symbol}: {e}")
return []
except (ValueError, IndexError, KeyError) as e:
logger.error(f"Error parsing news data for {symbol}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching news for {symbol}: {e}")
return []
def update_news_data(self):
"""Update news data for all configured stock symbols."""
current_time = time.time()
update_interval = self.stock_news_config.get('update_interval', 300) # Default to 5 minutes
# If not enough time has passed, keep using existing data
if current_time - self.last_update < update_interval:
return
# Get symbols from config
symbols = self.stocks_config.get('symbols', [])
if not symbols:
logger.warning("No stock symbols configured for news")
return
# Create temporary storage for new data
new_data = {}
success = False
for symbol in symbols:
# Add a small delay between requests to avoid rate limiting
time.sleep(random.uniform(0.1, 0.3))
news_items = self._fetch_news_for_symbol(symbol)
if news_items:
new_data[symbol] = news_items
success = True
if success:
# Only update the displayed data when we have new data
self.news_data = new_data
self.last_update = current_time
logger.info(f"Updated news data for {len(new_data)} symbols")
else:
logger.error("Failed to fetch news for any configured stocks")
def _create_text_image(self, text: str, color: Tuple[int, int, int] = (255, 255, 255)) -> Image.Image:
"""Create an image containing the text for efficient scrolling."""
# Get text dimensions
bbox = self.display_manager.draw.textbbox((0, 0), text, font=self.display_manager.small_font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Create a new image with the text
text_image = Image.new('RGB', (text_width, self.display_manager.matrix.height), (0, 0, 0))
text_draw = ImageDraw.Draw(text_image)
# Draw the text centered vertically
y = (self.display_manager.matrix.height - text_height) // 2
text_draw.text((0, y), text, font=self.display_manager.small_font, fill=color)
return text_image
def _log_frame_rate(self):
"""Log frame rate statistics."""
current_time = time.time()
# Calculate instantaneous frame time
frame_time = current_time - self.last_frame_time
self.frame_times.append(frame_time)
# Keep only last 100 frames for average
if len(self.frame_times) > 100:
self.frame_times.pop(0)
# Log FPS every second
if current_time - self.last_fps_log_time >= 1.0:
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
avg_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
instant_fps = 1.0 / frame_time if frame_time > 0 else 0
logger.info(f"Frame stats - Avg FPS: {avg_fps:.1f}, Current FPS: {instant_fps:.1f}, Frame time: {frame_time*1000:.2f}ms")
self.last_fps_log_time = current_time
self.frame_count = 0
self.last_frame_time = current_time
self.frame_count += 1
def display_news(self):
"""Display news headlines by scrolling them across the screen."""
if not self.stock_news_config.get('enabled', False):
return
# Start update in background if needed
if time.time() - self.last_update >= self.stock_news_config.get('update_interval', 300):
self.update_news_data()
if not self.news_data:
logger.warning("No news data available to display")
return
# Get all news items from all symbols
all_news = []
for symbol, news_items in self.news_data.items():
for item in news_items:
all_news.append({
"symbol": symbol,
"title": item["title"],
"publisher": item["publisher"]
})
if not all_news:
return
# Get the number of headlines to show per rotation
headlines_per_rotation = self.stock_news_config.get('headlines_per_rotation', 2)
total_headlines = len(all_news)
# Calculate the starting index for the current group
start_idx = (self.current_news_group * headlines_per_rotation) % total_headlines
# Build the text for all headlines in this group
news_texts = []
for i in range(headlines_per_rotation):
idx = (start_idx + i) % total_headlines
news = all_news[idx]
news_texts.append(f"{news['symbol']}: {news['title']}")
# Join all headlines with a separator
separator = " - " # Visual separator between news items
news_text = separator.join(news_texts)
# Only create new text image if the text has changed
if news_text != self.cached_text:
self.cached_text = news_text
self.cached_text_image = self._create_text_image(news_text)
self.scroll_position = 0 # Reset scroll position for new text
if not self.cached_text_image:
return
text_width = self.cached_text_image.width
text_height = self.cached_text_image.height
display_width = self.display_manager.matrix.width
total_width = text_width + display_width
# Update scroll position
self.scroll_position = (self.scroll_position + self.scroll_speed) % total_width
# Calculate the visible portion of the text
visible_width = min(display_width, text_width - self.scroll_position)
if visible_width > 0:
# Create a new blank image for this frame
frame_image = Image.new('RGB', (display_width, text_height), (0, 0, 0))
# Crop and paste in one operation
if self.scroll_position + visible_width <= text_width:
# Normal case - text is still scrolling in
visible_portion = self.cached_text_image.crop((
self.scroll_position, 0,
self.scroll_position + visible_width, text_height
))
frame_image.paste(visible_portion, (0, 0))
else:
# Wrapping case - text is wrapping around
first_part_width = text_width - self.scroll_position
first_part = self.cached_text_image.crop((
self.scroll_position, 0,
text_width, text_height
))
second_part = self.cached_text_image.crop((
0, 0,
visible_width - first_part_width, text_height
))
frame_image.paste(first_part, (0, 0))
frame_image.paste(second_part, (first_part_width, 0))
# Update the display with the new frame
self.display_manager.image = frame_image
self.display_manager.update_display()
# If we've completed a full scroll, move to the next group
if self.scroll_position == 0:
self.current_news_group = (self.current_news_group + 1) % ((total_headlines + headlines_per_rotation - 1) // headlines_per_rotation)
# Log frame rate
self._log_frame_rate()
return True