* Opening Bell

Introducing the Stock Ticker Feature

* Update stock_manager.py

Assume folder exists

* Update stock_manager.py

removing logos to focus on function for now

* Update stock_manager.py

parse yahoo scripts

* Update stock_manager.py

stock query update

* Update stock_manager.py

slow down stock display

* Update display_controller.py

adjust screen flow

* Update stock_manager.py

shipping features

* Update stock_manager.py

stock refresh in the background

* Customize Display timings

customize display timings

* Update stock_manager.py

stock font size change

* Sizing and Spacing

CHanged font sizing on chart and clock spacing

* Update clock.py

Date format changes

* Update stock_manager.py

actually read stocks from config file

* Update stock_manager.py

add config manager

* readme update

readme update and formatting for better flow

* Update .gitignore

rename reference folder

* Update config.json

changed default stocks to test update implementation

* Stock News

Stock news Ticker

* Update config.json

increase scroll speed

* Scroll Performance

Tuning news scrolling performance

* updating scroll direction

orienting scroll direction

* News tuning

removed test files and increased scroll speed

* Create test_news_manager.py

need a test script to call upon

* Update test_news_manager.py

test script tuning

* troubleshooting test script

* Update test_news_manager.py

* Update config.json

scroll speed increases

* Update config.json

scroll tuning

* Update config.json

speeding up

* Update config.json

still making text faster

* Update config.json

Trying to tune scrolling

* Update config.json

testing crazy parameters

* Update test_news_manager.py

remove sleep delay

* scroll tuning

scroll tuning

* scroll logging and debugging

FPS counter and debug messages

* Update config.json

matrix speed tuning

* Update news_manager.py

News separator

* Update news_manager.py

separator character change

* Stock News manager Rename

rename stock news ticker to enable other news in the future

* Update display_controller.py

load config update

* Update stock_manager.py

remove redundant import

* Stock news settings

Stock news has more granular control

* Stock news joins the lineup

Stock News added to the display controller and drawing display instead of image
This commit is contained in:
Chuck
2025-04-10 22:16:38 -05:00
committed by GitHub
parent 6fb65346e7
commit b4c2fff9a8
9 changed files with 834 additions and 34 deletions

2
.gitignore vendored
View File

@@ -20,4 +20,4 @@ ENV/
*.swo *.swo
# Dependencies # Dependencies
sports-0.0.115/ sports-reference/

View File

@@ -16,8 +16,8 @@
"scan_mode": "progressive", "scan_mode": "progressive",
"pwm_bits": 8, "pwm_bits": 8,
"pwm_dither_bits": 1, "pwm_dither_bits": 1,
"pwm_lsb_nanoseconds": 130, "pwm_lsb_nanoseconds": 50,
"disable_hardware_pulsing": true, "disable_hardware_pulsing": false,
"inverse_colors": false, "inverse_colors": false,
"show_refresh_rate": true, "show_refresh_rate": true,
"limit_refresh_rate_hz": 100 "limit_refresh_rate_hz": 100
@@ -25,15 +25,40 @@
"runtime": { "runtime": {
"gpio_slowdown": 2 "gpio_slowdown": 2
}, },
"rotation_interval": 15 "display_durations": {
"clock": 15,
"weather": 15,
"stocks": 45,
"hourly_forecast": 15,
"daily_forecast": 15,
"stock_news": 30
}
}, },
"clock": { "clock": {
"enabled": true,
"format": "%H:%M:%S", "format": "%H:%M:%S",
"update_interval": 1 "update_interval": 1
}, },
"weather": { "weather": {
"enabled": true,
"update_interval": 300, "update_interval": 300,
"units": "imperial", "units": "imperial",
"display_format": "{temp}°F\n{condition}" "display_format": "{temp}°F\n{condition}"
},
"stocks": {
"enabled": true,
"update_interval": 60,
"symbols": [
"ASTS", "SCHD", "INTC", "NVDA", "T", "VOO", "SPYG", "SMCI"
],
"display_format": "{symbol}: ${price} ({change}%)"
},
"stock_news": {
"enabled": true,
"update_interval": 300,
"scroll_speed": 1,
"scroll_delay": 0.0001,
"max_headlines_per_symbol": 1,
"headlines_per_rotation": 2
} }
} }

51
integrate_news_ticker.py Normal file
View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python3
import time
import sys
import os
from src.config_manager import ConfigManager
from src.display_manager import DisplayManager
from src.stock_news_manager import StockNewsManager
from src.stock_manager import StockManager
def main():
"""Integrate news ticker with the existing display system."""
try:
# Load configuration
config_manager = ConfigManager()
config = config_manager.config
# Initialize display manager
display_manager = DisplayManager(config.get('display', {}))
# Initialize stock manager
stock_manager = StockManager(config, display_manager)
# Initialize news manager
news_manager = StockNewsManager(config, display_manager)
print("News ticker integration test started. Press Ctrl+C to exit.")
print("Displaying stock data and news headlines...")
# Display stock data and news headlines in a loop
while True:
# Display stock data
stock_manager.display_stocks()
# Display news headlines for a limited time (30 seconds)
start_time = time.time()
while time.time() - start_time < 30:
news_manager.display_news()
except KeyboardInterrupt:
print("\nTest interrupted by user.")
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up
if 'display_manager' in locals():
display_manager.clear()
display_manager.update_display()
display_manager.cleanup()
if __name__ == "__main__":
main()

View File

@@ -89,15 +89,17 @@ class Clock:
# Get AM/PM # Get AM/PM
ampm = current.strftime('%p') ampm = current.strftime('%p')
# Format date with ordinal suffix # Format date with ordinal suffix - split into two lines
day_suffix = self._get_ordinal_suffix(current.day) day_suffix = self._get_ordinal_suffix(current.day)
date_str = current.strftime(f'%A, %B %-d{day_suffix}') # Full weekday on first line, full month and day on second line
weekday = current.strftime('%A')
date_str = current.strftime(f'%B %-d{day_suffix}')
return time_str, ampm, date_str return time_str, ampm, weekday, date_str
def display_time(self, force_clear: bool = False) -> None: def display_time(self, force_clear: bool = False) -> None:
"""Display the current time and date.""" """Display the current time and date."""
time_str, ampm, date_str = self.get_current_time() time_str, ampm, weekday, date_str = self.get_current_time()
# Only update if something has changed # Only update if something has changed
if time_str != self.last_time or date_str != self.last_date or force_clear: if time_str != self.last_time or date_str != self.last_date or force_clear:
@@ -111,7 +113,7 @@ class Clock:
# Draw time (large, centered, near top) # Draw time (large, centered, near top)
self.display_manager.draw_text( self.display_manager.draw_text(
time_str, time_str,
y=3, # Move down slightly from top y=2, # Move up slightly to make room for two lines of date
color=self.COLORS['time'], color=self.COLORS['time'],
small_font=False small_font=False
) )
@@ -122,15 +124,23 @@ class Clock:
self.display_manager.draw_text( self.display_manager.draw_text(
ampm, ampm,
x=ampm_x, x=ampm_x,
y=5, # Align with time y=4, # Align with time
color=self.COLORS['ampm'], color=self.COLORS['ampm'],
small_font=True small_font=True
) )
# Draw date (small, centered below time) # Draw weekday on first line (small font)
self.display_manager.draw_text(
weekday,
y=display_height - 18, # First line of date
color=self.COLORS['date'],
small_font=True
)
# Draw month and day on second line (small font)
self.display_manager.draw_text( self.display_manager.draw_text(
date_str, date_str,
y=display_height - 9, # Move up more from bottom y=display_height - 9, # Second line of date
color=self.COLORS['date'], color=self.COLORS['date'],
small_font=True small_font=True
) )

View File

@@ -7,11 +7,9 @@ class ConfigManager:
# Use current working directory as base # Use current working directory as base
self.config_path = config_path or "config/config.json" self.config_path = config_path or "config/config.json"
self.secrets_path = secrets_path or "config/config_secrets.json" self.secrets_path = secrets_path or "config/config_secrets.json"
self.config: Dict[str, Any] = {} self.config: Dict[str, Any] = {}
self.load_config()
def load_config(self) -> None: def load_config(self) -> Dict[str, Any]:
"""Load configuration from JSON files.""" """Load configuration from JSON files."""
try: try:
# Load main config # Load main config
@@ -26,10 +24,13 @@ class ConfigManager:
# Deep merge secrets into config # Deep merge secrets into config
self._deep_merge(self.config, secrets) self._deep_merge(self.config, secrets)
return self.config
except FileNotFoundError as e: except FileNotFoundError as e:
if str(e).find('config_secrets.json') == -1: # Only raise if main config is missing if str(e).find('config_secrets.json') == -1: # Only raise if main config is missing
print(f"Configuration file not found at {os.path.abspath(self.config_path)}") print(f"Configuration file not found at {os.path.abspath(self.config_path)}")
raise raise
return self.config
except json.JSONDecodeError: except json.JSONDecodeError:
print("Error parsing configuration file") print("Error parsing configuration file")
raise raise

View File

@@ -5,6 +5,8 @@ from src.clock import Clock
from src.weather_manager import WeatherManager from src.weather_manager import WeatherManager
from src.display_manager import DisplayManager from src.display_manager import DisplayManager
from src.config_manager import ConfigManager from src.config_manager import ConfigManager
from src.stock_manager import StockManager
from src.stock_news_manager import StockNewsManager
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -13,16 +15,37 @@ logger = logging.getLogger(__name__)
class DisplayController: class DisplayController:
def __init__(self): def __init__(self):
self.config_manager = ConfigManager() self.config_manager = ConfigManager()
self.config = self.config_manager.config self.config = self.config_manager.load_config()
self.display_manager = DisplayManager(self.config.get('display', {})) self.display_manager = DisplayManager(self.config.get('display', {}))
self.clock = Clock(display_manager=self.display_manager) self.clock = Clock(display_manager=self.display_manager)
self.weather = WeatherManager(self.config, self.display_manager) self.weather = WeatherManager(self.config, self.display_manager)
self.stocks = StockManager(self.config, self.display_manager)
self.news = StockNewsManager(self.config, self.display_manager)
self.current_display = 'clock' self.current_display = 'clock'
self.weather_mode = 'current' # current, hourly, or daily
self.last_switch = time.time() self.last_switch = time.time()
self.force_clear = True # Start with a clear screen self.force_clear = True # Start with a clear screen
self.update_interval = 0.5 # Slower updates for better stability self.update_interval = 0.5 # Slower updates for better stability
self.display_durations = self.config['display'].get('display_durations', {
'clock': 15,
'weather': 15,
'stocks': 45,
'hourly_forecast': 15,
'daily_forecast': 15,
'stock_news': 30
})
logger.info("DisplayController initialized with display_manager: %s", id(self.display_manager)) logger.info("DisplayController initialized with display_manager: %s", id(self.display_manager))
def get_current_duration(self) -> int:
"""Get the duration for the current display mode."""
if self.current_display == 'weather':
if self.weather_mode == 'hourly':
return self.display_durations.get('hourly_forecast', 15)
elif self.weather_mode == 'daily':
return self.display_durations.get('daily_forecast', 15)
return self.display_durations.get('weather', 15)
return self.display_durations.get(self.current_display, 15)
def run(self): def run(self):
"""Run the display controller, switching between displays.""" """Run the display controller, switching between displays."""
try: try:
@@ -30,32 +53,83 @@ class DisplayController:
current_time = time.time() current_time = time.time()
# Check if we need to switch display mode # Check if we need to switch display mode
if current_time - self.last_switch > self.config['display'].get('rotation_interval', 30): if current_time - self.last_switch > self.get_current_duration():
# Cycle through: clock -> current weather -> hourly forecast -> daily forecast # Find next enabled display mode
if self.current_display == 'clock': next_display = None
self.current_display = 'weather'
elif self.current_display == 'weather':
self.current_display = 'hourly'
elif self.current_display == 'hourly':
self.current_display = 'daily'
else: # daily
self.current_display = 'clock'
logger.info("Switching display to: %s", self.current_display) if self.current_display == 'clock':
if self.config.get('weather', {}).get('enabled', False):
next_display = 'weather'
self.weather_mode = 'current'
elif self.config.get('stocks', {}).get('enabled', False):
next_display = 'stocks'
elif self.config.get('stock_news', {}).get('enabled', False):
next_display = 'stock_news'
else:
next_display = 'clock'
elif self.current_display == 'weather':
if self.weather_mode == 'current':
next_display = 'weather'
self.weather_mode = 'hourly'
elif self.weather_mode == 'hourly':
next_display = 'weather'
self.weather_mode = 'daily'
else: # daily
if self.config.get('stocks', {}).get('enabled', False):
next_display = 'stocks'
elif self.config.get('stock_news', {}).get('enabled', False):
next_display = 'stock_news'
elif self.config.get('clock', {}).get('enabled', False):
next_display = 'clock'
else:
next_display = 'weather'
self.weather_mode = 'current'
elif self.current_display == 'stocks':
if self.config.get('stock_news', {}).get('enabled', False):
next_display = 'stock_news'
elif self.config.get('clock', {}).get('enabled', False):
next_display = 'clock'
elif self.config.get('weather', {}).get('enabled', False):
next_display = 'weather'
self.weather_mode = 'current'
else:
next_display = 'stocks'
else: # stock_news
if self.config.get('clock', {}).get('enabled', False):
next_display = 'clock'
elif self.config.get('weather', {}).get('enabled', False):
next_display = 'weather'
self.weather_mode = 'current'
elif self.config.get('stocks', {}).get('enabled', False):
next_display = 'stocks'
else:
next_display = 'stock_news'
# Update current display
self.current_display = next_display
logger.info(f"Switching display to: {self.current_display} {self.weather_mode if self.current_display == 'weather' else ''}")
self.last_switch = current_time self.last_switch = current_time
self.force_clear = True self.force_clear = True
self.display_manager.clear() # Ensure clean transition self.display_manager.clear() # Ensure clean transition
# Display current screen # Display current screen
try: try:
if self.current_display == 'clock': if self.current_display == 'clock' and self.config.get('clock', {}).get('enabled', False):
self.clock.display_time(force_clear=self.force_clear) self.clock.display_time(force_clear=self.force_clear)
elif self.current_display == 'weather': elif self.current_display == 'weather' and self.config.get('weather', {}).get('enabled', False):
self.weather.display_weather(force_clear=self.force_clear) if self.weather_mode == 'current':
elif self.current_display == 'hourly': self.weather.display_weather(force_clear=self.force_clear)
self.weather.display_hourly_forecast(force_clear=self.force_clear) elif self.weather_mode == 'hourly':
else: # daily self.weather.display_hourly_forecast(force_clear=self.force_clear)
self.weather.display_daily_forecast(force_clear=self.force_clear) else: # daily
self.weather.display_daily_forecast(force_clear=self.force_clear)
elif self.current_display == 'stocks' and self.config.get('stocks', {}).get('enabled', False):
self.stocks.display_stocks(force_clear=self.force_clear)
elif self.current_display == 'stock_news' and self.config.get('stock_news', {}).get('enabled', False):
self.news.display_news()
except Exception as e: except Exception as e:
logger.error(f"Error updating display: {e}") logger.error(f"Error updating display: {e}")
time.sleep(1) # Wait a bit before retrying time.sleep(1) # Wait a bit before retrying

346
src/stock_manager.py Normal file
View File

@@ -0,0 +1,346 @@
import time
import logging
import requests
import json
import random
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os
import urllib.parse
import re
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StockManager:
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
self.display_manager = display_manager
self.stocks_config = config.get('stocks', {})
self.last_update = 0
self.stock_data = {}
self.current_stock_index = 0
self.display_mode = 'info' # 'info' or 'chart'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Initialize with first update
self.update_stock_data()
def _get_stock_color(self, symbol: str) -> Tuple[int, int, int]:
"""Get color based on stock performance."""
if symbol not in self.stock_data:
return (255, 255, 255) # White for unknown
change = self.stock_data[symbol].get('change', 0)
if change > 0:
return (0, 255, 0) # Green for positive
elif change < 0:
return (255, 0, 0) # Red for negative
return (255, 255, 0) # Yellow for no change
def _extract_json_from_html(self, html: str) -> Dict:
"""Extract the JSON data from Yahoo Finance HTML."""
try:
# Look for the finance data in the HTML
patterns = [
r'root\.App\.main = (.*?);\s*</script>',
r'"QuotePageStore":\s*({.*?}),\s*"',
r'{"regularMarketPrice":.*?"regularMarketChangePercent".*?}'
]
for pattern in patterns:
match = re.search(pattern, html, re.DOTALL)
if match:
json_str = match.group(1)
try:
data = json.loads(json_str)
if isinstance(data, dict):
if 'context' in data:
# First pattern matched
context = data.get('context', {})
dispatcher = context.get('dispatcher', {})
stores = dispatcher.get('stores', {})
quote_data = stores.get('QuoteSummaryStore', {})
if quote_data:
return quote_data
else:
# Direct quote data
return data
except json.JSONDecodeError:
continue
# If we get here, try one last attempt to find the price data directly
price_match = re.search(r'"regularMarketPrice":{"raw":([\d.]+)', html)
change_match = re.search(r'"regularMarketChangePercent":{"raw":([-\d.]+)', html)
prev_close_match = re.search(r'"regularMarketPreviousClose":{"raw":([\d.]+)', html)
name_match = re.search(r'"shortName":"([^"]+)"', html)
if price_match:
return {
"price": {
"regularMarketPrice": {"raw": float(price_match.group(1))},
"regularMarketChangePercent": {"raw": float(change_match.group(1)) if change_match else 0},
"regularMarketPreviousClose": {"raw": float(prev_close_match.group(1)) if prev_close_match else 0},
"shortName": name_match.group(1) if name_match else None
}
}
return {}
except Exception as e:
logger.error(f"Error extracting JSON data: {e}")
return {}
def _fetch_stock_data(self, symbol: str) -> Dict[str, Any]:
"""Fetch stock data from Yahoo Finance public API."""
try:
# Use Yahoo Finance query1 API for chart data
encoded_symbol = urllib.parse.quote(symbol)
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{encoded_symbol}"
params = {
'interval': '5m', # 5-minute intervals
'range': '1d' # 1 day of data
}
response = requests.get(url, headers=self.headers, params=params, timeout=5)
if response.status_code != 200:
logger.error(f"Failed to fetch data for {symbol}: HTTP {response.status_code}")
return None
data = response.json()
# Extract the relevant data from the response
chart_data = data.get('chart', {}).get('result', [{}])[0]
meta = chart_data.get('meta', {})
if not meta:
logger.error(f"No meta data found for {symbol}")
return None
current_price = meta.get('regularMarketPrice', 0)
prev_close = meta.get('previousClose', current_price)
# Get price history
timestamps = chart_data.get('timestamp', [])
indicators = chart_data.get('indicators', {}).get('quote', [{}])[0]
close_prices = indicators.get('close', [])
# Build price history
price_history = []
for i, ts in enumerate(timestamps):
if i < len(close_prices) and close_prices[i] is not None:
price_history.append({
'timestamp': datetime.fromtimestamp(ts),
'price': close_prices[i]
})
# Calculate change percentage
change_pct = ((current_price - prev_close) / prev_close) * 100 if prev_close > 0 else 0
# Get company name (symbol will be used if name not available)
name = meta.get('symbol', symbol)
logger.debug(f"Processed data for {symbol}: price={current_price}, change={change_pct}%")
return {
"symbol": symbol,
"name": name,
"price": current_price,
"change": change_pct,
"open": prev_close,
"price_history": price_history
}
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching data for {symbol}: {e}")
return None
except (ValueError, IndexError, KeyError) as e:
logger.error(f"Error parsing data for {symbol}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching data for {symbol}: {e}")
return None
def _draw_chart(self, symbol: str, data: Dict[str, Any]):
"""Draw a price chart for the stock."""
if not data.get('price_history'):
return
# Clear the display
self.display_manager.clear()
# Draw the symbol at the top with small font
self.display_manager.draw_text(
symbol,
y=1, # Moved up slightly
color=(255, 255, 255),
small_font=True # Use small font
)
# Calculate chart dimensions
chart_height = 22 # Increased height since we're using smaller fonts
chart_y = 7 # Start closer to symbol due to smaller font
width = self.display_manager.matrix.width
# Get min and max prices for scaling
prices = [p['price'] for p in data['price_history']]
if not prices:
return
min_price = min(prices)
max_price = max(prices)
price_range = max_price - min_price
if price_range == 0:
return
# Draw chart points
points = []
color = self._get_stock_color(symbol)
for i, point in enumerate(data['price_history']):
x = int((i / len(data['price_history'])) * width)
y = chart_y + chart_height - int(((point['price'] - min_price) / price_range) * chart_height)
points.append((x, y))
# Draw lines between points
for i in range(len(points) - 1):
x1, y1 = points[i]
x2, y2 = points[i + 1]
self.display_manager.draw.line([x1, y1, x2, y2], fill=color, width=1)
# Draw current price at the bottom with small font
price_text = f"${data['price']:.2f} ({data['change']:+.1f}%)"
self.display_manager.draw_text(
price_text,
y=30, # Near bottom
color=color,
small_font=True # Use small font
)
# Update the display
self.display_manager.update_display()
def _reload_config(self):
"""Reload configuration from file."""
# Reset stock data if symbols have changed
new_symbols = set(self.stocks_config.get('symbols', []))
current_symbols = set(self.stock_data.keys())
if new_symbols != current_symbols:
self.stock_data = {}
self.current_stock_index = 0
self.last_update = 0 # Force immediate update
logger.info(f"Stock symbols changed. New symbols: {new_symbols}")
def update_stock_data(self):
"""Update stock data if enough time has passed."""
current_time = time.time()
update_interval = self.stocks_config.get('update_interval', 60)
# If not enough time has passed, keep using existing data
if current_time - self.last_update < update_interval + random.uniform(0, 2):
return
# Reload config to check for symbol changes
self._reload_config()
# Get symbols from config
symbols = self.stocks_config.get('symbols', [])
if not symbols:
logger.warning("No stock symbols configured")
return
# If symbols is a list of strings, convert to list of dicts
if isinstance(symbols[0], str):
symbols = [{"symbol": symbol} for symbol in symbols]
# Create temporary storage for new data
new_data = {}
success = False
for stock in symbols:
symbol = stock['symbol']
# Add a small delay between requests to avoid rate limiting
time.sleep(random.uniform(0.1, 0.3)) # Reduced delay
data = self._fetch_stock_data(symbol)
if data:
new_data[symbol] = data
success = True
logger.info(f"Updated {symbol}: ${data['price']:.2f} ({data['change']:+.2f}%)")
if success:
# Only update the displayed data when we have new data
self.stock_data.update(new_data)
self.last_update = current_time
else:
logger.error("Failed to fetch data for any configured stocks")
def display_stocks(self, force_clear: bool = False):
"""Display stock information on the LED matrix."""
if not self.stocks_config.get('enabled', False):
return
# Start update in background if needed
if time.time() - self.last_update >= self.stocks_config.get('update_interval', 60):
self.update_stock_data()
if not self.stock_data:
logger.warning("No stock data available to display")
return
# Get the current stock to display
symbols = list(self.stock_data.keys())
if not symbols:
return
current_symbol = symbols[self.current_stock_index]
data = self.stock_data[current_symbol]
# Toggle between info and chart display
if self.display_mode == 'info':
# Clear the display
self.display_manager.clear()
# Draw the stock symbol at the top with regular font
self.display_manager.draw_text(
data['symbol'],
y=2, # Near top
color=(255, 255, 255) # White for symbol
)
# Draw the price in the middle with small font
price_text = f"${data['price']:.2f}"
self.display_manager.draw_text(
price_text,
y=14, # Middle
color=(0, 255, 0) if data['change'] >= 0 else (255, 0, 0), # Green for up, red for down
small_font=True # Use small font
)
# Draw the change percentage at the bottom with small font
change_text = f"({data['change']:+.1f}%)"
self.display_manager.draw_text(
change_text,
y=24, # Near bottom
color=(0, 255, 0) if data['change'] >= 0 else (255, 0, 0), # Green for up, red for down
small_font=True # Use small font
)
# Update the display
self.display_manager.update_display()
# Switch to chart mode next time
self.display_mode = 'chart'
else: # chart mode
self._draw_chart(current_symbol, data)
# Switch back to info mode next time
self.display_mode = 'info'
# Add a delay to make each display visible
time.sleep(3)
# Move to next stock for next update
self.current_stock_index = (self.current_stock_index + 1) % len(symbols)
# If we've shown all stocks, signal completion by returning True
return self.current_stock_index == 0

244
src/stock_news_manager.py Normal file
View File

@@ -0,0 +1,244 @@
import time
import logging
import requests
import json
import random
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os
import urllib.parse
import re
from src.config_manager import ConfigManager
from PIL import Image, ImageDraw
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StockNewsManager:
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
self.config_manager = ConfigManager()
self.display_manager = display_manager
self.stocks_config = config.get('stocks', {})
self.stock_news_config = config.get('stock_news', {})
self.last_update = 0
self.news_data = {}
self.current_news_group = 0 # Track which group of headlines we're showing
self.scroll_position = 0
# Get scroll settings from config with faster defaults
self.scroll_speed = self.stock_news_config.get('scroll_speed', 1)
self.scroll_delay = self.stock_news_config.get('scroll_delay', 0.001) # Default to 1ms instead of 50ms
# Log the actual values being used
logger.info(f"Scroll settings - Speed: {self.scroll_speed} pixels/frame, Delay: {self.scroll_delay*1000:.2f}ms")
# Initialize frame rate tracking
self.frame_count = 0
self.last_frame_time = time.time()
self.last_fps_log_time = time.time()
self.frame_times = [] # Keep track of recent frame times for average FPS
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Initialize with first update
self.update_news_data()
def _fetch_news_for_symbol(self, symbol: str) -> List[Dict[str, Any]]:
"""Fetch news headlines for a stock symbol."""
try:
# Using Yahoo Finance API to get news
encoded_symbol = urllib.parse.quote(symbol)
url = f"https://query1.finance.yahoo.com/v1/finance/search?q={encoded_symbol}&lang=en-US&region=US&quotesCount=0&newsCount={self.stock_news_config.get('max_headlines_per_symbol', 5)}"
response = requests.get(url, headers=self.headers, timeout=5)
if response.status_code != 200:
logger.error(f"Failed to fetch news for {symbol}: HTTP {response.status_code}")
return []
data = response.json()
news_items = data.get('news', [])
# Process and format news items
formatted_news = []
for item in news_items:
formatted_news.append({
"title": item.get('title', ''),
"publisher": item.get('publisher', ''),
"link": item.get('link', ''),
"published": item.get('providerPublishTime', 0)
})
logger.info(f"Fetched {len(formatted_news)} news items for {symbol}")
return formatted_news
except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching news for {symbol}: {e}")
return []
except (ValueError, IndexError, KeyError) as e:
logger.error(f"Error parsing news data for {symbol}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching news for {symbol}: {e}")
return []
def update_news_data(self):
"""Update news data for all configured stock symbols."""
current_time = time.time()
update_interval = self.stock_news_config.get('update_interval', 300) # Default to 5 minutes
# If not enough time has passed, keep using existing data
if current_time - self.last_update < update_interval:
return
# Get symbols from config
symbols = self.stocks_config.get('symbols', [])
if not symbols:
logger.warning("No stock symbols configured for news")
return
# Create temporary storage for new data
new_data = {}
success = False
for symbol in symbols:
# Add a small delay between requests to avoid rate limiting
time.sleep(random.uniform(0.1, 0.3))
news_items = self._fetch_news_for_symbol(symbol)
if news_items:
new_data[symbol] = news_items
success = True
if success:
# Only update the displayed data when we have new data
self.news_data = new_data
self.last_update = current_time
logger.info(f"Updated news data for {len(new_data)} symbols")
else:
logger.error("Failed to fetch news for any configured stocks")
def _create_text_image(self, text: str, color: Tuple[int, int, int] = (255, 255, 255)) -> Image.Image:
"""Create an image containing the text for efficient scrolling."""
# Get text dimensions
bbox = self.display_manager.draw.textbbox((0, 0), text, font=self.display_manager.small_font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Create a new image with the text
text_image = Image.new('RGB', (text_width, self.display_manager.matrix.height), (0, 0, 0))
text_draw = ImageDraw.Draw(text_image)
# Draw the text centered vertically
y = (self.display_manager.matrix.height - text_height) // 2
text_draw.text((0, y), text, font=self.display_manager.small_font, fill=color)
return text_image
def _log_frame_rate(self):
"""Log frame rate statistics."""
current_time = time.time()
# Calculate instantaneous frame time
frame_time = current_time - self.last_frame_time
self.frame_times.append(frame_time)
# Keep only last 100 frames for average
if len(self.frame_times) > 100:
self.frame_times.pop(0)
# Log FPS every second
if current_time - self.last_fps_log_time >= 1.0:
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
avg_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
instant_fps = 1.0 / frame_time if frame_time > 0 else 0
logger.info(f"Frame stats - Avg FPS: {avg_fps:.1f}, Current FPS: {instant_fps:.1f}, Frame time: {frame_time*1000:.2f}ms")
self.last_fps_log_time = current_time
self.frame_count = 0
self.last_frame_time = current_time
self.frame_count += 1
def display_news(self):
"""Display news headlines by scrolling them across the screen."""
if not self.stock_news_config.get('enabled', False):
return
# Start update in background if needed
if time.time() - self.last_update >= self.stock_news_config.get('update_interval', 300):
self.update_news_data()
if not self.news_data:
logger.warning("No news data available to display")
return
# Get all news items from all symbols
all_news = []
for symbol, news_items in self.news_data.items():
for item in news_items:
all_news.append({
"symbol": symbol,
"title": item["title"],
"publisher": item["publisher"]
})
if not all_news:
return
# Get the number of headlines to show per rotation
headlines_per_rotation = self.stock_news_config.get('headlines_per_rotation', 2)
total_headlines = len(all_news)
# Calculate the starting index for the current group
start_idx = (self.current_news_group * headlines_per_rotation) % total_headlines
# Build the text for all headlines in this group
news_texts = []
for i in range(headlines_per_rotation):
idx = (start_idx + i) % total_headlines
news = all_news[idx]
news_texts.append(f"{news['symbol']}: {news['title']}")
# Join all headlines with a separator
separator = " - " # Visual separator between news items
news_text = separator.join(news_texts)
# Clear the display
self.display_manager.clear()
# Calculate text width for scrolling
bbox = self.display_manager.draw.textbbox((0, 0), news_text, font=self.display_manager.small_font)
text_width = bbox[2] - bbox[0]
# Calculate scroll position
display_width = self.display_manager.matrix.width
total_width = text_width + display_width
# Update scroll position
self.scroll_position = (self.scroll_position + self.scroll_speed) % total_width
# Draw the text at the current scroll position
self.display_manager.draw_text(
news_text,
x=display_width - self.scroll_position,
y=None, # Center vertically
color=(255, 255, 255),
small_font=True
)
# Update the display
self.display_manager.update_display()
# If we've completed a full scroll, move to the next group
if self.scroll_position == 0:
self.current_news_group = (self.current_news_group + 1) % ((total_headlines + headlines_per_rotation - 1) // headlines_per_rotation)
# Small delay to control scroll speed
time.sleep(self.scroll_delay)
# Log frame rate
self._log_frame_rate()
return True

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import time
import sys
import os
from src.config_manager import ConfigManager
from src.display_manager import DisplayManager
from src.stock_news_manager import StockNewsManager
print(f"Current working directory: {os.getcwd()}")
def main():
"""Test the StockNewsManager class directly."""
try:
# Load configuration
config_manager = ConfigManager()
config = config_manager.load_config()
if not config:
print("Error: Failed to load configuration")
return
display_config = config.get('display')
if not display_config:
print("Error: No display configuration found")
return
# Initialize display manager
display_manager = DisplayManager(display_config)
# Initialize news manager with the loaded config
news_manager = StockNewsManager(config, display_manager)
print("Testing news display. Press Ctrl+C to exit.")
# Run the news display in a loop
while True:
news_manager.display_news()
except KeyboardInterrupt:
print("\nTest interrupted by user")
except Exception as e:
print(f"Error during test: {e}")
import traceback
traceback.print_exc()
finally:
print("Test completed")
if __name__ == "__main__":
main()