mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-04-10 13:02:59 +00:00
Opening Bell
Introducing the Stock Ticker Feature
This commit is contained in:
@@ -35,5 +35,13 @@
|
||||
"update_interval": 300,
|
||||
"units": "imperial",
|
||||
"display_format": "{temp}°F\n{condition}"
|
||||
},
|
||||
"stocks": {
|
||||
"enabled": true,
|
||||
"update_interval": 60,
|
||||
"symbols": [
|
||||
"AAPL", "MSFT", "GOOGL", "AMZN", "META", "TSLA", "NVDA", "JPM", "V", "WMT"
|
||||
],
|
||||
"display_format": "{symbol}: ${price} ({change}%)"
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ from src.clock import Clock
|
||||
from src.weather_manager import WeatherManager
|
||||
from src.display_manager import DisplayManager
|
||||
from src.config_manager import ConfigManager
|
||||
from src.stock_manager import StockManager
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@@ -17,6 +18,7 @@ class DisplayController:
|
||||
self.display_manager = DisplayManager(self.config.get('display', {}))
|
||||
self.clock = Clock(display_manager=self.display_manager)
|
||||
self.weather = WeatherManager(self.config, self.display_manager)
|
||||
self.stocks = StockManager(self.config, self.display_manager)
|
||||
self.current_display = 'clock'
|
||||
self.last_switch = time.time()
|
||||
self.force_clear = True # Start with a clear screen
|
||||
@@ -31,14 +33,15 @@ class DisplayController:
|
||||
|
||||
# Check if we need to switch display mode
|
||||
if current_time - self.last_switch > self.config['display'].get('rotation_interval', 30):
|
||||
# Cycle through: clock -> current weather -> hourly forecast -> daily forecast
|
||||
# Cycle through: clock -> weather -> stocks
|
||||
if self.current_display == 'clock':
|
||||
self.current_display = 'weather'
|
||||
elif self.current_display == 'weather':
|
||||
self.current_display = 'hourly'
|
||||
elif self.current_display == 'hourly':
|
||||
self.current_display = 'daily'
|
||||
else: # daily
|
||||
if self.config.get('stocks', {}).get('enabled', False):
|
||||
self.current_display = 'stocks'
|
||||
else:
|
||||
self.current_display = 'clock'
|
||||
else: # stocks
|
||||
self.current_display = 'clock'
|
||||
|
||||
logger.info("Switching display to: %s", self.current_display)
|
||||
@@ -52,10 +55,8 @@ class DisplayController:
|
||||
self.clock.display_time(force_clear=self.force_clear)
|
||||
elif self.current_display == 'weather':
|
||||
self.weather.display_weather(force_clear=self.force_clear)
|
||||
elif self.current_display == 'hourly':
|
||||
self.weather.display_hourly_forecast(force_clear=self.force_clear)
|
||||
else: # daily
|
||||
self.weather.display_daily_forecast(force_clear=self.force_clear)
|
||||
else: # stocks
|
||||
self.stocks.display_stocks(force_clear=self.force_clear)
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating display: {e}")
|
||||
time.sleep(1) # Wait a bit before retrying
|
||||
|
||||
224
src/stock_manager.py
Normal file
224
src/stock_manager.py
Normal file
@@ -0,0 +1,224 @@
|
||||
import time
|
||||
import logging
|
||||
import requests
|
||||
import json
|
||||
import random
|
||||
from typing import Dict, Any, List, Tuple
|
||||
from datetime import datetime
|
||||
import os
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class StockManager:
|
||||
def __init__(self, config: Dict[str, Any], display_manager):
|
||||
self.config = config
|
||||
self.display_manager = display_manager
|
||||
self.stocks_config = config.get('stocks', {})
|
||||
self.last_update = 0
|
||||
self.stock_data = {}
|
||||
self.current_stock_index = 0
|
||||
self.base_url = "https://query2.finance.yahoo.com"
|
||||
self.logo_cache = {}
|
||||
|
||||
# Create logos directory if it doesn't exist
|
||||
self.logos_dir = "assets/logos/stocks"
|
||||
os.makedirs(self.logos_dir, exist_ok=True)
|
||||
|
||||
# Default colors for stocks
|
||||
self.default_colors = [
|
||||
(0, 255, 0), # Green
|
||||
(0, 255, 255), # Cyan
|
||||
(255, 255, 0), # Yellow
|
||||
(255, 165, 0), # Orange
|
||||
(128, 0, 128), # Purple
|
||||
(255, 0, 0), # Red
|
||||
(0, 0, 255), # Blue
|
||||
(255, 192, 203) # Pink
|
||||
]
|
||||
|
||||
def _get_stock_color(self, symbol: str) -> Tuple[int, int, int]:
|
||||
"""Get a consistent color for a stock symbol."""
|
||||
# Use the symbol as a seed for consistent color assignment
|
||||
random.seed(hash(symbol))
|
||||
color_index = random.randint(0, len(self.default_colors) - 1)
|
||||
random.seed() # Reset the seed
|
||||
return self.default_colors[color_index]
|
||||
|
||||
def _fetch_stock_data(self, symbol: str) -> Dict[str, Any]:
|
||||
"""Fetch stock data from Yahoo Finance API."""
|
||||
try:
|
||||
# Use Yahoo Finance API directly
|
||||
url = f"{self.base_url}/v8/finance/chart/{symbol}"
|
||||
params = {
|
||||
"interval": "1m",
|
||||
"period": "1d"
|
||||
}
|
||||
response = requests.get(url, params=params)
|
||||
data = response.json()
|
||||
|
||||
if "chart" not in data or "result" not in data["chart"] or not data["chart"]["result"]:
|
||||
logger.error(f"Invalid response for {symbol}: {data}")
|
||||
return None
|
||||
|
||||
result = data["chart"]["result"][0]
|
||||
meta = result["meta"]
|
||||
|
||||
# Extract price data
|
||||
price = meta.get("regularMarketPrice", 0)
|
||||
prev_close = meta.get("chartPreviousClose", 0)
|
||||
|
||||
# Calculate change percentage
|
||||
change_pct = 0
|
||||
if prev_close > 0:
|
||||
change_pct = ((price - prev_close) / prev_close) * 100
|
||||
|
||||
# Get company name if available
|
||||
company_name = meta.get("instrumentInfo", {}).get("longName", symbol)
|
||||
|
||||
return {
|
||||
"symbol": symbol,
|
||||
"name": company_name,
|
||||
"price": price,
|
||||
"change": change_pct,
|
||||
"open": prev_close
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching stock data for {symbol}: {e}")
|
||||
return None
|
||||
|
||||
def _download_logo(self, symbol: str) -> str:
|
||||
"""Download company logo for a stock symbol."""
|
||||
logo_path = os.path.join(self.logos_dir, f"{symbol}.png")
|
||||
|
||||
# If logo already exists, return the path
|
||||
if os.path.exists(logo_path):
|
||||
return logo_path
|
||||
|
||||
try:
|
||||
# Try to get logo from Yahoo Finance
|
||||
url = f"{self.base_url}/v8/finance/chart/{symbol}"
|
||||
params = {
|
||||
"interval": "1d",
|
||||
"period": "1d"
|
||||
}
|
||||
response = requests.get(url, params=params)
|
||||
data = response.json()
|
||||
|
||||
if "chart" in data and "result" in data["chart"] and data["chart"]["result"]:
|
||||
result = data["chart"]["result"][0]
|
||||
if "meta" in result and "logo_url" in result["meta"]:
|
||||
logo_url = result["meta"]["logo_url"]
|
||||
|
||||
# Download the logo
|
||||
logo_response = requests.get(logo_url)
|
||||
if logo_response.status_code == 200:
|
||||
with open(logo_path, "wb") as f:
|
||||
f.write(logo_response.content)
|
||||
logger.info(f"Downloaded logo for {symbol}")
|
||||
return logo_path
|
||||
|
||||
# If we couldn't get a logo, create a placeholder
|
||||
self._create_placeholder_logo(symbol, logo_path)
|
||||
return logo_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error downloading logo for {symbol}: {e}")
|
||||
# Create a placeholder logo
|
||||
self._create_placeholder_logo(symbol, logo_path)
|
||||
return logo_path
|
||||
|
||||
def _create_placeholder_logo(self, symbol: str, logo_path: str):
|
||||
"""Create a placeholder logo with the stock symbol."""
|
||||
try:
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
# Create a 32x32 image with a colored background
|
||||
color = self._get_stock_color(symbol)
|
||||
img = Image.new('RGB', (32, 32), color)
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
# Try to load a font, fall back to default if not available
|
||||
try:
|
||||
font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
|
||||
except:
|
||||
font = ImageFont.load_default()
|
||||
|
||||
# Draw the symbol
|
||||
draw.text((4, 8), symbol, fill=(255, 255, 255), font=font)
|
||||
|
||||
# Save the image
|
||||
img.save(logo_path)
|
||||
logger.info(f"Created placeholder logo for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating placeholder logo for {symbol}: {e}")
|
||||
|
||||
def update_stock_data(self):
|
||||
"""Update stock data if enough time has passed."""
|
||||
current_time = time.time()
|
||||
if current_time - self.last_update < self.stocks_config.get('update_interval', 60):
|
||||
return
|
||||
|
||||
# Get symbols from config
|
||||
symbols = self.stocks_config.get('symbols', [])
|
||||
|
||||
# If symbols is a list of strings, convert to list of dicts
|
||||
if symbols and isinstance(symbols[0], str):
|
||||
symbols = [{"symbol": symbol} for symbol in symbols]
|
||||
|
||||
for stock in symbols:
|
||||
symbol = stock['symbol']
|
||||
data = self._fetch_stock_data(symbol)
|
||||
if data:
|
||||
# Add color if not specified
|
||||
if 'color' not in stock:
|
||||
data['color'] = self._get_stock_color(symbol)
|
||||
else:
|
||||
data['color'] = tuple(stock['color'])
|
||||
|
||||
# Download logo
|
||||
data['logo_path'] = self._download_logo(symbol)
|
||||
|
||||
self.stock_data[symbol] = data
|
||||
|
||||
self.last_update = current_time
|
||||
|
||||
def display_stocks(self, force_clear: bool = False):
|
||||
"""Display stock information on the LED matrix."""
|
||||
if not self.stocks_config.get('enabled', False):
|
||||
return
|
||||
|
||||
self.update_stock_data()
|
||||
|
||||
if not self.stock_data:
|
||||
logger.warning("No stock data available to display")
|
||||
return
|
||||
|
||||
# Clear the display if forced or if this is the first stock
|
||||
if force_clear or self.current_stock_index == 0:
|
||||
self.display_manager.clear()
|
||||
|
||||
# Get the current stock to display
|
||||
symbols = list(self.stock_data.keys())
|
||||
if not symbols:
|
||||
return
|
||||
|
||||
current_symbol = symbols[self.current_stock_index]
|
||||
data = self.stock_data[current_symbol]
|
||||
|
||||
# Format the display text
|
||||
display_format = self.stocks_config.get('display_format', "{symbol}: ${price} ({change}%)")
|
||||
display_text = display_format.format(
|
||||
symbol=data['symbol'],
|
||||
price=f"{data['price']:.2f}",
|
||||
change=f"{data['change']:+.2f}"
|
||||
)
|
||||
|
||||
# Draw the stock information
|
||||
self.display_manager.draw_text(display_text, color=data['color'])
|
||||
self.display_manager.update_display()
|
||||
|
||||
# Move to next stock for next update
|
||||
self.current_stock_index = (self.current_stock_index + 1) % len(symbols)
|
||||
Reference in New Issue
Block a user