Merge cursor/modernize-and-enhance-led-matrix-web-interface-24d0 into development

This commit is contained in:
Chuck
2025-07-31 22:27:30 -05:00
29 changed files with 7803 additions and 146 deletions

View File

@@ -436,7 +436,11 @@ class CacheManager:
try:
config = self.config_manager.get_config()
sport_config = config.get(f"{sport_key}_scoreboard", {})
# For MiLB, look for "milb" config instead of "milb_scoreboard"
if sport_key == 'milb':
sport_config = config.get("milb", {})
else:
sport_config = config.get(f"{sport_key}_scoreboard", {})
return sport_config.get("live_update_interval", 60) # Default to 60 seconds
except Exception as e:
self.logger.warning(f"Could not get live_update_interval for {sport_key}: {e}")

View File

@@ -33,6 +33,7 @@ from src.calendar_manager import CalendarManager
from src.text_display import TextDisplay
from src.music_manager import MusicManager
from src.of_the_day_manager import OfTheDayManager
from src.news_manager import NewsManager
# Get logger without configuring
logger = logging.getLogger(__name__)
@@ -61,9 +62,11 @@ class DisplayController:
self.youtube = YouTubeDisplay(self.display_manager, self.config) if self.config.get('youtube', {}).get('enabled', False) else None
self.text_display = TextDisplay(self.display_manager, self.config) if self.config.get('text_display', {}).get('enabled', False) else None
self.of_the_day = OfTheDayManager(self.display_manager, self.config) if self.config.get('of_the_day', {}).get('enabled', False) else None
self.news_manager = NewsManager(self.config, self.display_manager) if self.config.get('news_manager', {}).get('enabled', False) else None
logger.info(f"Calendar Manager initialized: {'Object' if self.calendar else 'None'}")
logger.info(f"Text Display initialized: {'Object' if self.text_display else 'None'}")
logger.info(f"OfTheDay Manager initialized: {'Object' if self.of_the_day else 'None'}")
logger.info(f"News Manager initialized: {'Object' if self.news_manager else 'None'}")
logger.info("Display modes initialized in %.3f seconds", time.time() - init_time)
# Initialize Music Manager
@@ -255,6 +258,7 @@ class DisplayController:
if self.youtube: self.available_modes.append('youtube')
if self.text_display: self.available_modes.append('text_display')
if self.of_the_day: self.available_modes.append('of_the_day')
if self.news_manager: self.available_modes.append('news_manager')
if self.music_manager:
self.available_modes.append('music')
# Add NHL display modes if enabled
@@ -292,6 +296,9 @@ class DisplayController:
# Set initial display to first available mode (clock)
self.current_mode_index = 0
self.current_display_mode = self.available_modes[0] if self.available_modes else 'none'
# Reset logged duration when mode is initialized
if hasattr(self, '_last_logged_duration'):
delattr(self, '_last_logged_duration')
self.last_switch = time.time()
self.force_clear = True
self.update_interval = 0.01 # Reduced from 0.1 to 0.01 for smoother scrolling
@@ -439,6 +446,20 @@ class DisplayController:
"""Get the duration for the current display mode."""
mode_key = self.current_display_mode
# Handle dynamic duration for news manager
if mode_key == 'news_manager' and self.news_manager:
try:
dynamic_duration = self.news_manager.get_dynamic_duration()
# Only log if duration has changed or we haven't logged this duration yet
if not hasattr(self, '_last_logged_duration') or self._last_logged_duration != dynamic_duration:
logger.info(f"Using dynamic duration for news_manager: {dynamic_duration} seconds")
self._last_logged_duration = dynamic_duration
return dynamic_duration
except Exception as e:
logger.error(f"Error getting dynamic duration for news_manager: {e}")
# Fall back to configured duration
return self.display_durations.get(mode_key, 60)
# Simplify weather key handling
if mode_key.startswith('weather_'):
return self.display_durations.get(mode_key, 15)
@@ -461,6 +482,8 @@ class DisplayController:
if self.youtube: self.youtube.update()
if self.text_display: self.text_display.update()
if self.of_the_day: self.of_the_day.update(time.time())
# News manager fetches data when displayed, not during updates
# if self.news_manager: self.news_manager.fetch_news_data()
# Update NHL managers
if self.nhl_live: self.nhl_live.update()
@@ -514,39 +537,31 @@ class DisplayController:
tuple[bool, str]: (has_live_games, sport_type)
sport_type will be 'nhl', 'nba', 'mlb', 'milb', 'soccer' or None
"""
# Prioritize sports (e.g., Soccer > NHL > NBA > MLB)
live_checks = {
'nhl': self.nhl_live and self.nhl_live.live_games and len(self.nhl_live.live_games) > 0,
'nba': self.nba_live and self.nba_live.live_games and len(self.nba_live.live_games) > 0,
'mlb': self.mlb_live and self.mlb_live.live_games and len(self.mlb_live.live_games) > 0,
'milb': self.milb_live and self.milb_live.live_games and len(self.milb_live.live_games) > 0,
'nfl': self.nfl_live and self.nfl_live.live_games and len(self.nfl_live.live_games) > 0,
# ... other sports
}
# Only include sports that are enabled in config
live_checks = {}
if 'nhl_scoreboard' in self.config and self.config['nhl_scoreboard'].get('enabled', False):
live_checks['nhl'] = self.nhl_live and self.nhl_live.live_games
if 'nba_scoreboard' in self.config and self.config['nba_scoreboard'].get('enabled', False):
live_checks['nba'] = self.nba_live and self.nba_live.live_games
if 'mlb' in self.config and self.config['mlb'].get('enabled', False):
live_checks['mlb'] = self.mlb_live and self.mlb_live.live_games
if 'milb' in self.config and self.config['milb'].get('enabled', False):
live_checks['milb'] = self.milb_live and self.milb_live.live_games
if 'nfl_scoreboard' in self.config and self.config['nfl_scoreboard'].get('enabled', False):
live_checks['nfl'] = self.nfl_live and self.nfl_live.live_games
if 'soccer_scoreboard' in self.config and self.config['soccer_scoreboard'].get('enabled', False):
live_checks['soccer'] = self.soccer_live and self.soccer_live.live_games
if 'ncaa_fb_scoreboard' in self.config and self.config['ncaa_fb_scoreboard'].get('enabled', False):
live_checks['ncaa_fb'] = self.ncaa_fb_live and self.ncaa_fb_live.live_games
if 'ncaa_baseball_scoreboard' in self.config and self.config['ncaa_baseball_scoreboard'].get('enabled', False):
live_checks['ncaa_baseball'] = self.ncaa_baseball_live and self.ncaa_baseball_live.live_games
if 'ncaam_basketball_scoreboard' in self.config and self.config['ncaam_basketball_scoreboard'].get('enabled', False):
live_checks['ncaam_basketball'] = self.ncaam_basketball_live and self.ncaam_basketball_live.live_games
for sport, has_live_games in live_checks.items():
if has_live_games:
logger.debug(f"{sport.upper()} live games available")
return True, sport
if 'ncaa_fb_scoreboard' in self.config and self.config['ncaa_fb_scoreboard'].get('enabled', False):
if self.ncaa_fb_live and self.ncaa_fb_live.live_games and len(self.ncaa_fb_live.live_games) > 0:
logger.debug("NCAA FB live games available")
return True, 'ncaa_fb'
if 'ncaa_baseball_scoreboard' in self.config and self.config['ncaa_baseball_scoreboard'].get('enabled', False):
if self.ncaa_baseball_live and self.ncaa_baseball_live.live_games and len(self.ncaa_baseball_live.live_games) > 0:
logger.debug("NCAA Baseball live games available")
return True, 'ncaa_baseball'
if 'ncaam_basketball_scoreboard' in self.config and self.config['ncaam_basketball_scoreboard'].get('enabled', False):
if self.ncaam_basketball_live and self.ncaam_basketball_live.live_games and len(self.ncaam_basketball_live.live_games) > 0:
logger.debug("NCAA Men's Basketball live games available")
return True, 'ncaam_basketball'
# Add more sports checks here (e.g., MLB, Soccer)
if 'mlb' in self.config and self.config['mlb'].get('enabled', False):
if self.mlb_live and self.mlb_live.live_games and len(self.mlb_live.live_games) > 0:
return True, 'mlb'
return False, None
@@ -758,33 +773,50 @@ class DisplayController:
def _update_live_modes_in_rotation(self):
"""Add or remove live modes from available_modes based on live_priority and live games."""
# Helper to add/remove live modes for all sports
def update_mode(mode_name, manager, live_priority):
# If manager is None (sport disabled), remove the mode from rotation
if manager is None:
def update_mode(mode_name, manager, live_priority, sport_enabled):
# Only process if the sport is enabled in config
if not sport_enabled:
# If sport is disabled, ensure the mode is removed from rotation
if mode_name in self.available_modes:
self.available_modes.remove(mode_name)
logger.debug(f"Removed {mode_name} from rotation (manager is None)")
return
if not live_priority:
live_games = getattr(manager, 'live_games', None)
if live_games and len(live_games) > 0: # Check if there are actually live games
# Only add to rotation if manager exists and has live games
if manager and getattr(manager, 'live_games', None):
live_games = getattr(manager, 'live_games', None)
if mode_name not in self.available_modes:
self.available_modes.append(mode_name)
logger.debug(f"Added {mode_name} to rotation (found {len(live_games)} live games)")
else:
if mode_name in self.available_modes:
self.available_modes.remove(mode_name)
logger.debug(f"Removed {mode_name} from rotation (no live games)")
update_mode('nhl_live', getattr(self, 'nhl_live', None), self.nhl_live_priority)
update_mode('nba_live', getattr(self, 'nba_live', None), self.nba_live_priority)
update_mode('mlb_live', getattr(self, 'mlb_live', None), self.mlb_live_priority)
update_mode('milb_live', getattr(self, 'milb_live', None), self.milb_live_priority)
update_mode('soccer_live', getattr(self, 'soccer_live', None), self.soccer_live_priority)
update_mode('nfl_live', getattr(self, 'nfl_live', None), self.nfl_live_priority)
update_mode('ncaa_fb_live', getattr(self, 'ncaa_fb_live', None), self.ncaa_fb_live_priority)
update_mode('ncaa_baseball_live', getattr(self, 'ncaa_baseball_live', None), self.ncaa_baseball_live_priority)
update_mode('ncaam_basketball_live', getattr(self, 'ncaam_basketball_live', None), self.ncaam_basketball_live_priority)
else:
# For live_priority=True, never add to regular rotation
# These modes are only used for live priority takeover
if mode_name in self.available_modes:
self.available_modes.remove(mode_name)
# Check if each sport is enabled before processing
nhl_enabled = self.config.get('nhl_scoreboard', {}).get('enabled', False)
nba_enabled = self.config.get('nba_scoreboard', {}).get('enabled', False)
mlb_enabled = self.config.get('mlb', {}).get('enabled', False)
milb_enabled = self.config.get('milb', {}).get('enabled', False)
soccer_enabled = self.config.get('soccer_scoreboard', {}).get('enabled', False)
nfl_enabled = self.config.get('nfl_scoreboard', {}).get('enabled', False)
ncaa_fb_enabled = self.config.get('ncaa_fb_scoreboard', {}).get('enabled', False)
ncaa_baseball_enabled = self.config.get('ncaa_baseball_scoreboard', {}).get('enabled', False)
ncaam_basketball_enabled = self.config.get('ncaam_basketball_scoreboard', {}).get('enabled', False)
update_mode('nhl_live', getattr(self, 'nhl_live', None), self.nhl_live_priority, nhl_enabled)
update_mode('nba_live', getattr(self, 'nba_live', None), self.nba_live_priority, nba_enabled)
update_mode('mlb_live', getattr(self, 'mlb_live', None), self.mlb_live_priority, mlb_enabled)
update_mode('milb_live', getattr(self, 'milb_live', None), self.milb_live_priority, milb_enabled)
update_mode('soccer_live', getattr(self, 'soccer_live', None), self.soccer_live_priority, soccer_enabled)
update_mode('nfl_live', getattr(self, 'nfl_live', None), self.nfl_live_priority, nfl_enabled)
update_mode('ncaa_fb_live', getattr(self, 'ncaa_fb_live', None), self.ncaa_fb_live_priority, ncaa_fb_enabled)
update_mode('ncaa_baseball_live', getattr(self, 'ncaa_baseball_live', None), self.ncaa_baseball_live_priority, ncaa_baseball_enabled)
update_mode('ncaam_basketball_live', getattr(self, 'ncaam_basketball_live', None), self.ncaam_basketball_live_priority, ncaam_basketball_enabled)
def run(self):
"""Run the display controller, switching between displays."""
@@ -848,6 +880,9 @@ class DisplayController:
if self.current_display_mode != new_mode:
logger.info(f"Switching to only active live sport: {new_mode} from {self.current_display_mode}")
self.current_display_mode = new_mode
# Reset logged duration when mode changes
if hasattr(self, '_last_logged_duration'):
delattr(self, '_last_logged_duration')
self.force_clear = True
else:
self.force_clear = False
@@ -865,9 +900,13 @@ class DisplayController:
if live_priority_takeover:
new_mode = f"{live_priority_sport}_live"
if self.current_display_mode != new_mode:
logger.info(f"Live priority takeover: Switching to {new_mode} from {self.current_display_mode}")
if previous_mode_before_switch == 'music' and self.music_manager:
self.music_manager.deactivate_music_display()
self.current_display_mode = new_mode
# Reset logged duration when mode changes
if hasattr(self, '_last_logged_duration'):
delattr(self, '_last_logged_duration')
self.force_clear = True
self.last_switch = current_time
manager_to_display = getattr(self, f"{live_priority_sport}_live", None)
@@ -879,7 +918,19 @@ class DisplayController:
# No live_priority takeover, regular rotation
needs_switch = False
if self.current_display_mode.endswith('_live'):
needs_switch = True
# For live modes without live_priority, check if duration has elapsed
if current_time - self.last_switch >= self.get_current_duration():
needs_switch = True
self.current_mode_index = (self.current_mode_index + 1) % len(self.available_modes)
new_mode_after_timer = self.available_modes[self.current_mode_index]
if previous_mode_before_switch == 'music' and self.music_manager and new_mode_after_timer != 'music':
self.music_manager.deactivate_music_display()
if self.current_display_mode != new_mode_after_timer:
logger.info(f"Switching to {new_mode_after_timer} from {self.current_display_mode}")
self.current_display_mode = new_mode_after_timer
# Reset logged duration when mode changes
if hasattr(self, '_last_logged_duration'):
delattr(self, '_last_logged_duration')
elif current_time - self.last_switch >= self.get_current_duration():
if self.current_display_mode == 'calendar' and self.calendar:
self.calendar.advance_event()
@@ -890,7 +941,12 @@ class DisplayController:
new_mode_after_timer = self.available_modes[self.current_mode_index]
if previous_mode_before_switch == 'music' and self.music_manager and new_mode_after_timer != 'music':
self.music_manager.deactivate_music_display()
if self.current_display_mode != new_mode_after_timer:
logger.info(f"Switching to {new_mode_after_timer} from {self.current_display_mode}")
self.current_display_mode = new_mode_after_timer
# Reset logged duration when mode changes
if hasattr(self, '_last_logged_duration'):
delattr(self, '_last_logged_duration')
if needs_switch:
self.force_clear = True
self.last_switch = current_time
@@ -919,46 +975,71 @@ class DisplayController:
manager_to_display = self.text_display
elif self.current_display_mode == 'of_the_day' and self.of_the_day:
manager_to_display = self.of_the_day
elif self.current_display_mode == 'news_manager' and self.news_manager:
manager_to_display = self.news_manager
elif self.current_display_mode == 'nhl_recent' and self.nhl_recent:
manager_to_display = self.nhl_recent
elif self.current_display_mode == 'nhl_upcoming' and self.nhl_upcoming:
manager_to_display = self.nhl_upcoming
elif self.current_display_mode == 'nhl_live' and self.nhl_live:
manager_to_display = self.nhl_live
elif self.current_display_mode == 'nba_recent' and self.nba_recent:
manager_to_display = self.nba_recent
elif self.current_display_mode == 'nba_upcoming' and self.nba_upcoming:
manager_to_display = self.nba_upcoming
elif self.current_display_mode == 'nba_live' and self.nba_live:
manager_to_display = self.nba_live
elif self.current_display_mode == 'mlb_recent' and self.mlb_recent:
manager_to_display = self.mlb_recent
elif self.current_display_mode == 'mlb_upcoming' and self.mlb_upcoming:
manager_to_display = self.mlb_upcoming
elif self.current_display_mode == 'mlb_live' and self.mlb_live:
manager_to_display = self.mlb_live
elif self.current_display_mode == 'milb_recent' and self.milb_recent:
manager_to_display = self.milb_recent
elif self.current_display_mode == 'milb_upcoming' and self.milb_upcoming:
manager_to_display = self.milb_upcoming
elif self.current_display_mode == 'milb_live' and self.milb_live:
manager_to_display = self.milb_live
elif self.current_display_mode == 'soccer_recent' and self.soccer_recent:
manager_to_display = self.soccer_recent
elif self.current_display_mode == 'soccer_upcoming' and self.soccer_upcoming:
manager_to_display = self.soccer_upcoming
elif self.current_display_mode == 'soccer_live' and self.soccer_live:
manager_to_display = self.soccer_live
elif self.current_display_mode == 'nfl_recent' and self.nfl_recent:
manager_to_display = self.nfl_recent
elif self.current_display_mode == 'nfl_upcoming' and self.nfl_upcoming:
manager_to_display = self.nfl_upcoming
elif self.current_display_mode == 'nfl_live' and self.nfl_live:
manager_to_display = self.nfl_live
elif self.current_display_mode == 'ncaa_fb_recent' and self.ncaa_fb_recent:
manager_to_display = self.ncaa_fb_recent
elif self.current_display_mode == 'ncaa_fb_upcoming' and self.ncaa_fb_upcoming:
manager_to_display = self.ncaa_fb_upcoming
elif self.current_display_mode == 'ncaa_fb_live' and self.ncaa_fb_live:
manager_to_display = self.ncaa_fb_live
elif self.current_display_mode == 'ncaa_baseball_recent' and self.ncaa_baseball_recent:
manager_to_display = self.ncaa_baseball_recent
elif self.current_display_mode == 'ncaa_baseball_upcoming' and self.ncaa_baseball_upcoming:
manager_to_display = self.ncaa_baseball_upcoming
elif self.current_display_mode == 'ncaa_baseball_live' and self.ncaa_baseball_live:
manager_to_display = self.ncaa_baseball_live
elif self.current_display_mode == 'ncaam_basketball_recent' and self.ncaam_basketball_recent:
manager_to_display = self.ncaam_basketball_recent
elif self.current_display_mode == 'ncaam_basketball_upcoming' and self.ncaam_basketball_upcoming:
manager_to_display = self.ncaam_basketball_upcoming
elif self.current_display_mode == 'ncaam_basketball_live' and self.ncaam_basketball_live:
manager_to_display = self.ncaam_basketball_live
# --- Perform Display Update ---
try:
# Log which display is being shown
if self.current_display_mode != getattr(self, '_last_logged_mode', None):
logger.info(f"Showing {self.current_display_mode}")
self._last_logged_mode = self.current_display_mode
if self.current_display_mode == 'music' and self.music_manager:
# Call MusicManager's display method
self.music_manager.display(force_clear=self.force_clear)
@@ -991,6 +1072,8 @@ class DisplayController:
manager_to_display.display() # Assumes internal clearing
elif self.current_display_mode == 'of_the_day':
manager_to_display.display(force_clear=self.force_clear)
elif self.current_display_mode == 'news_manager':
manager_to_display.display_news()
elif self.current_display_mode == 'nfl_live' and self.nfl_live:
self.nfl_live.display(force_clear=self.force_clear)
elif self.current_display_mode == 'ncaa_fb_live' and self.ncaa_fb_live:

View File

@@ -37,82 +37,136 @@ class DisplayManager:
def _setup_matrix(self):
"""Initialize the RGB matrix with configuration settings."""
setup_start = time.time()
options = RGBMatrixOptions()
# Hardware configuration
hardware_config = self.config.get('display', {}).get('hardware', {})
runtime_config = self.config.get('display', {}).get('runtime', {})
# Basic hardware settings
options.rows = hardware_config.get('rows', 32)
options.cols = hardware_config.get('cols', 64)
options.chain_length = hardware_config.get('chain_length', 2)
options.parallel = hardware_config.get('parallel', 1)
options.hardware_mapping = hardware_config.get('hardware_mapping', 'adafruit-hat-pwm')
# Performance and stability settings
options.brightness = hardware_config.get('brightness', 90)
options.pwm_bits = hardware_config.get('pwm_bits', 10)
options.pwm_lsb_nanoseconds = hardware_config.get('pwm_lsb_nanoseconds', 150)
options.led_rgb_sequence = hardware_config.get('led_rgb_sequence', 'RGB')
options.pixel_mapper_config = hardware_config.get('pixel_mapper_config', '')
options.row_address_type = hardware_config.get('row_address_type', 0)
options.multiplexing = hardware_config.get('multiplexing', 0)
options.disable_hardware_pulsing = hardware_config.get('disable_hardware_pulsing', False)
options.show_refresh_rate = hardware_config.get('show_refresh_rate', False)
options.limit_refresh_rate_hz = hardware_config.get('limit_refresh_rate_hz', 90)
options.gpio_slowdown = runtime_config.get('gpio_slowdown', 2)
# Additional settings from config
if 'scan_mode' in hardware_config:
options.scan_mode = hardware_config.get('scan_mode')
if 'pwm_dither_bits' in hardware_config:
options.pwm_dither_bits = hardware_config.get('pwm_dither_bits')
if 'inverse_colors' in hardware_config:
options.inverse_colors = hardware_config.get('inverse_colors')
# Initialize the matrix
self.matrix = RGBMatrix(options=options)
# Create double buffer for smooth updates
self.offscreen_canvas = self.matrix.CreateFrameCanvas()
self.current_canvas = self.matrix.CreateFrameCanvas()
# Create image with full chain width
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image)
# Initialize font with Press Start 2P
try:
self.font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
logger.info("Initial Press Start 2P font loaded successfully")
options = RGBMatrixOptions()
# Hardware configuration
hardware_config = self.config.get('display', {}).get('hardware', {})
runtime_config = self.config.get('display', {}).get('runtime', {})
# Basic hardware settings
options.rows = hardware_config.get('rows', 32)
options.cols = hardware_config.get('cols', 64)
options.chain_length = hardware_config.get('chain_length', 2)
options.parallel = hardware_config.get('parallel', 1)
options.hardware_mapping = hardware_config.get('hardware_mapping', 'adafruit-hat-pwm')
# Performance and stability settings
options.brightness = hardware_config.get('brightness', 90)
options.pwm_bits = hardware_config.get('pwm_bits', 10)
options.pwm_lsb_nanoseconds = hardware_config.get('pwm_lsb_nanoseconds', 150)
options.led_rgb_sequence = hardware_config.get('led_rgb_sequence', 'RGB')
options.pixel_mapper_config = hardware_config.get('pixel_mapper_config', '')
options.row_address_type = hardware_config.get('row_address_type', 0)
options.multiplexing = hardware_config.get('multiplexing', 0)
options.disable_hardware_pulsing = hardware_config.get('disable_hardware_pulsing', False)
options.show_refresh_rate = hardware_config.get('show_refresh_rate', False)
options.limit_refresh_rate_hz = hardware_config.get('limit_refresh_rate_hz', 90)
options.gpio_slowdown = runtime_config.get('gpio_slowdown', 2)
# Additional settings from config
if 'scan_mode' in hardware_config:
options.scan_mode = hardware_config.get('scan_mode')
if 'pwm_dither_bits' in hardware_config:
options.pwm_dither_bits = hardware_config.get('pwm_dither_bits')
if 'inverse_colors' in hardware_config:
options.inverse_colors = hardware_config.get('inverse_colors')
logger.info(f"Initializing RGB Matrix with settings: rows={options.rows}, cols={options.cols}, chain_length={options.chain_length}, parallel={options.parallel}, hardware_mapping={options.hardware_mapping}")
# Initialize the matrix
self.matrix = RGBMatrix(options=options)
logger.info("RGB Matrix initialized successfully")
# Create double buffer for smooth updates
self.offscreen_canvas = self.matrix.CreateFrameCanvas()
self.current_canvas = self.matrix.CreateFrameCanvas()
logger.info("Frame canvases created successfully")
# Create image with full chain width
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image)
logger.info(f"Image canvas created with dimensions: {self.matrix.width}x{self.matrix.height}")
# Initialize font with Press Start 2P
try:
self.font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
logger.info("Initial Press Start 2P font loaded successfully")
except Exception as e:
logger.error(f"Failed to load initial font: {e}")
self.font = ImageFont.load_default()
# Draw a test pattern
self._draw_test_pattern()
except Exception as e:
logger.error(f"Failed to load initial font: {e}")
self.font = ImageFont.load_default()
# Draw a test pattern
self._draw_test_pattern()
logger.error(f"Failed to initialize RGB Matrix: {e}", exc_info=True)
# Create a fallback image for web preview
self.matrix = None
self.image = Image.new('RGB', (128, 32)) # Default size
self.draw = ImageDraw.Draw(self.image)
self.draw.text((10, 10), "Matrix Error", fill=(255, 0, 0))
logger.error(f"Matrix initialization failed, using fallback mode. Error: {e}")
raise
@property
def width(self):
"""Get the display width."""
if hasattr(self, 'matrix') and self.matrix is not None:
return self.matrix.width
elif hasattr(self, 'image'):
return self.image.width
else:
return 128 # Default fallback width
@property
def height(self):
"""Get the display height."""
if hasattr(self, 'matrix') and self.matrix is not None:
return self.matrix.height
elif hasattr(self, 'image'):
return self.image.height
else:
return 32 # Default fallback height
def _draw_test_pattern(self):
"""Draw a test pattern to verify the display is working."""
self.clear()
# Draw a red rectangle border
self.draw.rectangle([0, 0, self.matrix.width-1, self.matrix.height-1], outline=(255, 0, 0))
# Draw a diagonal line
self.draw.line([0, 0, self.matrix.width-1, self.matrix.height-1], fill=(0, 255, 0))
# Draw some text - changed from "TEST" to "Initializing" with smaller font
self.draw.text((10, 10), "Initializing", font=self.font, fill=(0, 0, 255))
# Update the display once after everything is drawn
self.update_display()
time.sleep(0.5) # Reduced from 1 second to 0.5 seconds for faster animation
try:
self.clear()
if self.matrix is None:
# Fallback mode - just draw on the image
self.draw.rectangle([0, 0, self.image.width-1, self.image.height-1], outline=(255, 0, 0))
self.draw.line([0, 0, self.image.width-1, self.image.height-1], fill=(0, 255, 0))
self.draw.text((10, 10), "Simulation", font=self.font, fill=(0, 0, 255))
logger.info("Drew test pattern in fallback mode")
return
# Draw a red rectangle border
self.draw.rectangle([0, 0, self.matrix.width-1, self.matrix.height-1], outline=(255, 0, 0))
# Draw a diagonal line
self.draw.line([0, 0, self.matrix.width-1, self.matrix.height-1], fill=(0, 255, 0))
# Draw some text - changed from "TEST" to "Initializing" with smaller font
self.draw.text((10, 10), "Initializing", font=self.font, fill=(0, 0, 255))
# Update the display once after everything is drawn
self.update_display()
time.sleep(0.5) # Reduced from 1 second to 0.5 seconds for faster animation
except Exception as e:
logger.error(f"Error drawing test pattern: {e}", exc_info=True)
def update_display(self):
"""Update the display using double buffering with proper sync."""
try:
if self.matrix is None:
# Fallback mode - no actual hardware to update
logger.debug("Update display called in fallback mode (no hardware)")
return
# Copy the current image to the offscreen canvas
self.offscreen_canvas.SetImage(self.image)
@@ -127,6 +181,13 @@ class DisplayManager:
def clear(self):
"""Clear the display completely."""
try:
if self.matrix is None:
# Fallback mode - just clear the image
self.image = Image.new('RGB', (self.image.width, self.image.height))
self.draw = ImageDraw.Draw(self.image)
logger.debug("Cleared display in fallback mode")
return
# Create a new black image
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image)

404
src/layout_manager.py Normal file
View File

@@ -0,0 +1,404 @@
"""
Layout Manager for LED Matrix Display
Handles custom layouts, element positioning, and display composition.
"""
import json
import os
import logging
from typing import Dict, List, Any, Tuple
from datetime import datetime
from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)
class LayoutManager:
def __init__(self, display_manager=None, config_path="config/custom_layouts.json"):
self.display_manager = display_manager
self.config_path = config_path
self.layouts = self.load_layouts()
self.current_layout = None
def load_layouts(self) -> Dict[str, Any]:
"""Load saved layouts from file."""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
return {}
except Exception as e:
logger.error(f"Error loading layouts: {e}")
return {}
def save_layouts(self) -> bool:
"""Save layouts to file."""
try:
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.layouts, f, indent=2)
return True
except Exception as e:
logger.error(f"Error saving layouts: {e}")
return False
def create_layout(self, name: str, elements: List[Dict], description: str = "") -> bool:
"""Create a new layout."""
try:
self.layouts[name] = {
'elements': elements,
'description': description,
'created': datetime.now().isoformat(),
'modified': datetime.now().isoformat()
}
return self.save_layouts()
except Exception as e:
logger.error(f"Error creating layout '{name}': {e}")
return False
def update_layout(self, name: str, elements: List[Dict], description: str = None) -> bool:
"""Update an existing layout."""
try:
if name not in self.layouts:
return False
self.layouts[name]['elements'] = elements
self.layouts[name]['modified'] = datetime.now().isoformat()
if description is not None:
self.layouts[name]['description'] = description
return self.save_layouts()
except Exception as e:
logger.error(f"Error updating layout '{name}': {e}")
return False
def delete_layout(self, name: str) -> bool:
"""Delete a layout."""
try:
if name in self.layouts:
del self.layouts[name]
return self.save_layouts()
return False
except Exception as e:
logger.error(f"Error deleting layout '{name}': {e}")
return False
def get_layout(self, name: str) -> Dict[str, Any]:
"""Get a specific layout."""
return self.layouts.get(name, {})
def list_layouts(self) -> List[str]:
"""Get list of all layout names."""
return list(self.layouts.keys())
def set_current_layout(self, name: str) -> bool:
"""Set the current active layout."""
if name in self.layouts:
self.current_layout = name
return True
return False
def render_layout(self, layout_name: str = None, data_context: Dict = None) -> bool:
"""Render a layout to the display."""
if not self.display_manager:
logger.error("No display manager available")
return False
layout_name = layout_name or self.current_layout
if not layout_name or layout_name not in self.layouts:
logger.error(f"Layout '{layout_name}' not found")
return False
try:
# Clear the display
self.display_manager.clear()
# Get layout elements
elements = self.layouts[layout_name]['elements']
# Render each element
for element in elements:
self.render_element(element, data_context or {})
# Update the display
self.display_manager.update_display()
return True
except Exception as e:
logger.error(f"Error rendering layout '{layout_name}': {e}")
return False
def render_element(self, element: Dict, data_context: Dict) -> None:
"""Render a single element."""
element_type = element.get('type')
x = element.get('x', 0)
y = element.get('y', 0)
properties = element.get('properties', {})
try:
if element_type == 'text':
self._render_text_element(x, y, properties, data_context)
elif element_type == 'weather_icon':
self._render_weather_icon_element(x, y, properties, data_context)
elif element_type == 'rectangle':
self._render_rectangle_element(x, y, properties)
elif element_type == 'line':
self._render_line_element(x, y, properties)
elif element_type == 'clock':
self._render_clock_element(x, y, properties)
elif element_type == 'data_text':
self._render_data_text_element(x, y, properties, data_context)
else:
logger.warning(f"Unknown element type: {element_type}")
except Exception as e:
logger.error(f"Error rendering element {element_type}: {e}")
def _render_text_element(self, x: int, y: int, properties: Dict, data_context: Dict) -> None:
"""Render a text element."""
text = properties.get('text', 'Sample Text')
color = tuple(properties.get('color', [255, 255, 255]))
font_size = properties.get('font_size', 'normal')
# Support template variables in text
text = self._process_template_text(text, data_context)
# Select font
if font_size == 'small':
font = self.display_manager.small_font
elif font_size == 'large':
font = self.display_manager.regular_font
else:
font = self.display_manager.regular_font
self.display_manager.draw_text(text, x, y, color, font=font)
def _render_weather_icon_element(self, x: int, y: int, properties: Dict, data_context: Dict) -> None:
"""Render a weather icon element."""
condition = properties.get('condition', 'sunny')
size = properties.get('size', 16)
# Use weather data from context if available
if 'weather' in data_context and 'condition' in data_context['weather']:
condition = data_context['weather']['condition'].lower()
self.display_manager.draw_weather_icon(condition, x, y, size)
def _render_rectangle_element(self, x: int, y: int, properties: Dict) -> None:
"""Render a rectangle element."""
width = properties.get('width', 10)
height = properties.get('height', 10)
color = tuple(properties.get('color', [255, 255, 255]))
filled = properties.get('filled', False)
if filled:
self.display_manager.draw.rectangle(
[x, y, x + width, y + height],
fill=color
)
else:
self.display_manager.draw.rectangle(
[x, y, x + width, y + height],
outline=color
)
def _render_line_element(self, x: int, y: int, properties: Dict) -> None:
"""Render a line element."""
x2 = properties.get('x2', x + 10)
y2 = properties.get('y2', y)
color = tuple(properties.get('color', [255, 255, 255]))
width = properties.get('width', 1)
self.display_manager.draw.line([x, y, x2, y2], fill=color, width=width)
def _render_clock_element(self, x: int, y: int, properties: Dict) -> None:
"""Render a clock element."""
format_str = properties.get('format', '%H:%M')
color = tuple(properties.get('color', [255, 255, 255]))
current_time = datetime.now().strftime(format_str)
self.display_manager.draw_text(current_time, x, y, color)
def _render_data_text_element(self, x: int, y: int, properties: Dict, data_context: Dict) -> None:
"""Render a data-driven text element."""
data_key = properties.get('data_key', '')
format_str = properties.get('format', '{value}')
color = tuple(properties.get('color', [255, 255, 255]))
default_value = properties.get('default', 'N/A')
# Extract data from context
value = self._get_nested_value(data_context, data_key, default_value)
# Format the text
try:
text = format_str.format(value=value)
except:
text = str(value)
self.display_manager.draw_text(text, x, y, color)
def _process_template_text(self, text: str, data_context: Dict) -> str:
"""Process template variables in text."""
try:
# Simple template processing - replace {key} with values from context
for key, value in data_context.items():
placeholder = f"{{{key}}}"
if placeholder in text:
text = text.replace(placeholder, str(value))
return text
except Exception as e:
logger.error(f"Error processing template text: {e}")
return text
def _get_nested_value(self, data: Dict, key: str, default=None):
"""Get a nested value from a dictionary using dot notation."""
try:
keys = key.split('.')
value = data
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def create_preset_layouts(self) -> None:
"""Create some preset layouts for common use cases."""
# Basic clock layout
clock_layout = [
{
'type': 'clock',
'x': 10,
'y': 10,
'properties': {
'format': '%H:%M',
'color': [255, 255, 255]
}
},
{
'type': 'clock',
'x': 10,
'y': 20,
'properties': {
'format': '%m/%d',
'color': [100, 100, 255]
}
}
]
self.create_layout('basic_clock', clock_layout, 'Simple clock with date')
# Weather layout
weather_layout = [
{
'type': 'weather_icon',
'x': 5,
'y': 5,
'properties': {
'condition': 'sunny',
'size': 20
}
},
{
'type': 'data_text',
'x': 30,
'y': 8,
'properties': {
'data_key': 'weather.temperature',
'format': '{value}°',
'color': [255, 200, 0],
'default': '--°'
}
},
{
'type': 'data_text',
'x': 30,
'y': 18,
'properties': {
'data_key': 'weather.condition',
'format': '{value}',
'color': [200, 200, 200],
'default': 'Unknown'
}
}
]
self.create_layout('weather_display', weather_layout, 'Weather icon with temperature and condition')
# Mixed dashboard layout
dashboard_layout = [
{
'type': 'clock',
'x': 2,
'y': 2,
'properties': {
'format': '%H:%M',
'color': [255, 255, 255]
}
},
{
'type': 'weather_icon',
'x': 50,
'y': 2,
'properties': {
'size': 16
}
},
{
'type': 'data_text',
'x': 70,
'y': 5,
'properties': {
'data_key': 'weather.temperature',
'format': '{value}°',
'color': [255, 200, 0],
'default': '--°'
}
},
{
'type': 'line',
'x': 0,
'y': 15,
'properties': {
'x2': 128,
'y2': 15,
'color': [100, 100, 100]
}
},
{
'type': 'data_text',
'x': 2,
'y': 18,
'properties': {
'data_key': 'stocks.AAPL.price',
'format': 'AAPL: ${value}',
'color': [0, 255, 0],
'default': 'AAPL: N/A'
}
}
]
self.create_layout('dashboard', dashboard_layout, 'Mixed dashboard with clock, weather, and stocks')
logger.info("Created preset layouts")
def get_layout_preview(self, layout_name: str) -> Dict[str, Any]:
"""Get a preview representation of a layout."""
if layout_name not in self.layouts:
return {}
layout = self.layouts[layout_name]
elements = layout['elements']
# Create a simple preview representation
preview = {
'name': layout_name,
'description': layout.get('description', ''),
'element_count': len(elements),
'elements': []
}
for element in elements:
preview['elements'].append({
'type': element.get('type'),
'position': f"({element.get('x', 0)}, {element.get('y', 0)})",
'properties': list(element.get('properties', {}).keys())
})
return preview

View File

@@ -317,7 +317,7 @@ class BaseMiLBManager:
def _fetch_milb_api_data(self, use_cache: bool = True) -> Dict[str, Any]:
"""Fetch MiLB game data from the MLB Stats API."""
cache_key = "milb_api_data"
cache_key = "milb_live_api_data"
if use_cache:
cached_data = self.cache_manager.get_with_auto_strategy(cache_key)
if cached_data:

565
src/news_manager.py Normal file
View File

@@ -0,0 +1,565 @@
import time
import logging
import requests
import xml.etree.ElementTree as ET
import json
import random
from typing import Dict, Any, List, Tuple, Optional
from datetime import datetime, timedelta
import os
import urllib.parse
import re
import html
from src.config_manager import ConfigManager
from PIL import Image, ImageDraw, ImageFont
from src.cache_manager import CacheManager
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class NewsManager:
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
self.config_manager = ConfigManager()
self.display_manager = display_manager
self.news_config = config.get('news_manager', {})
self.last_update = time.time() # Initialize to current time
self.news_data = {}
self.current_headline_index = 0
self.scroll_position = 0
self.scrolling_image = None # Pre-rendered image for smooth scrolling
self.cached_text = None
self.cache_manager = CacheManager()
self.current_headlines = []
self.headline_start_times = []
self.total_scroll_width = 0
self.headlines_displayed = set() # Track displayed headlines for rotation
self.dynamic_duration = 60 # Default duration in seconds
self.is_fetching = False # Flag to prevent multiple simultaneous fetches
# Default RSS feeds
self.default_feeds = {
'MLB': 'http://espn.com/espn/rss/mlb/news',
'NFL': 'http://espn.go.com/espn/rss/nfl/news',
'NCAA FB': 'https://www.espn.com/espn/rss/ncf/news',
'NHL': 'https://www.espn.com/espn/rss/nhl/news',
'NBA': 'https://www.espn.com/espn/rss/nba/news',
'TOP SPORTS': 'https://www.espn.com/espn/rss/news',
'BIG10': 'https://www.espn.com/blog/feed?blog=bigten',
'NCAA': 'https://www.espn.com/espn/rss/ncaa/news',
'Other': 'https://www.coveringthecorner.com/rss/current.xml'
}
# Get scroll settings from config
self.scroll_speed = self.news_config.get('scroll_speed', 2)
self.scroll_delay = self.news_config.get('scroll_delay', 0.02)
self.update_interval = self.news_config.get('update_interval', 300) # 5 minutes
# Get headline settings from config
self.headlines_per_feed = self.news_config.get('headlines_per_feed', 2)
self.enabled_feeds = self.news_config.get('enabled_feeds', ['NFL', 'NCAA FB'])
self.custom_feeds = self.news_config.get('custom_feeds', {})
# Rotation settings
self.rotation_enabled = self.news_config.get('rotation_enabled', True)
self.rotation_threshold = self.news_config.get('rotation_threshold', 3) # After 3 full cycles
self.rotation_count = 0
# Dynamic duration settings
self.dynamic_duration_enabled = self.news_config.get('dynamic_duration', True)
self.min_duration = self.news_config.get('min_duration', 30)
self.max_duration = self.news_config.get('max_duration', 300)
self.duration_buffer = self.news_config.get('duration_buffer', 0.1)
# Font settings
self.font_size = self.news_config.get('font_size', 12)
self.font_path = self.news_config.get('font_path', '/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf')
# Colors
self.text_color = tuple(self.news_config.get('text_color', [255, 255, 255]))
self.separator_color = tuple(self.news_config.get('separator_color', [255, 0, 0]))
# Initialize session with retry strategy
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
logger.debug(f"NewsManager initialized with feeds: {self.enabled_feeds}")
logger.debug(f"Headlines per feed: {self.headlines_per_feed}")
logger.debug(f"Scroll settings - Speed: {self.scroll_speed} pixels/frame, Delay: {self.scroll_delay*1000:.2f}ms")
def parse_rss_feed(self, url: str, feed_name: str) -> List[Dict[str, Any]]:
"""Parse RSS feed and return list of headlines"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = self.session.get(url, headers=headers, timeout=10)
response.raise_for_status()
root = ET.fromstring(response.content)
headlines = []
# Handle different RSS formats
items = root.findall('.//item')
if not items:
items = root.findall('.//entry') # Atom feed format
for item in items[:self.headlines_per_feed * 2]: # Get extra to allow for filtering
title_elem = item.find('title')
if title_elem is not None:
title = html.unescape(title_elem.text or '').strip()
# Clean up title
title = re.sub(r'<[^>]+>', '', title) # Remove HTML tags
title = re.sub(r'\s+', ' ', title) # Normalize whitespace
if title and len(title) > 10: # Filter out very short titles
pub_date_elem = item.find('pubDate')
if pub_date_elem is None:
pub_date_elem = item.find('published') # Atom format
pub_date = pub_date_elem.text if pub_date_elem is not None else None
headlines.append({
'title': title,
'feed': feed_name,
'pub_date': pub_date,
'timestamp': datetime.now().isoformat()
})
logger.debug(f"Parsed {len(headlines)} headlines from {feed_name}")
return headlines[:self.headlines_per_feed]
except Exception as e:
logger.error(f"Error parsing RSS feed {feed_name} ({url}): {e}")
return []
def fetch_news_data(self):
"""Fetch news from all enabled feeds"""
try:
all_headlines = []
# Combine default and custom feeds
all_feeds = {**self.default_feeds, **self.custom_feeds}
for feed_name in self.enabled_feeds:
if feed_name in all_feeds:
url = all_feeds[feed_name]
headlines = self.parse_rss_feed(url, feed_name)
all_headlines.extend(headlines)
else:
logger.warning(f"Feed '{feed_name}' not found in available feeds")
# Store headlines by feed for rotation management
self.news_data = {}
for headline in all_headlines:
feed = headline['feed']
if feed not in self.news_data:
self.news_data[feed] = []
self.news_data[feed].append(headline)
# Prepare current headlines for display
self.prepare_headlines_for_display()
self.last_update = time.time()
logger.debug(f"Fetched {len(all_headlines)} total headlines from {len(self.enabled_feeds)} feeds")
except Exception as e:
logger.error(f"Error fetching news data: {e}")
def prepare_headlines_for_display(self):
"""Prepare headlines for scrolling display with rotation"""
if not self.news_data:
return
# Get headlines for display, applying rotation if enabled
display_headlines = []
for feed_name in self.enabled_feeds:
if feed_name in self.news_data:
feed_headlines = self.news_data[feed_name]
if self.rotation_enabled and len(feed_headlines) > self.headlines_per_feed:
# Rotate headlines to show different ones
start_idx = (self.rotation_count * self.headlines_per_feed) % len(feed_headlines)
selected = []
for i in range(self.headlines_per_feed):
idx = (start_idx + i) % len(feed_headlines)
selected.append(feed_headlines[idx])
display_headlines.extend(selected)
else:
display_headlines.extend(feed_headlines[:self.headlines_per_feed])
# Create scrolling text with separators
if display_headlines:
text_parts = []
for i, headline in enumerate(display_headlines):
feed_prefix = f"[{headline['feed']}] "
text_parts.append(feed_prefix + headline['title'])
# Join with separators and add spacing
separator = ""
self.cached_text = separator.join(text_parts) + "" # Add separator at end for smooth loop
# Calculate text dimensions for perfect scrolling
self.calculate_scroll_dimensions()
self.create_scrolling_image()
self.current_headlines = display_headlines
logger.debug(f"Prepared {len(display_headlines)} headlines for display")
def create_scrolling_image(self):
"""Create a pre-rendered image for smooth scrolling."""
if not self.cached_text:
self.scrolling_image = None
return
try:
font = ImageFont.truetype(self.font_path, self.font_size)
except Exception as e:
logger.warning(f"Failed to load custom font for pre-rendering: {e}. Using default.")
font = ImageFont.load_default()
height = self.display_manager.height
width = self.total_scroll_width
self.scrolling_image = Image.new('RGB', (width, height), (0, 0, 0))
draw = ImageDraw.Draw(self.scrolling_image)
text_height = self.font_size
y_pos = (height - text_height) // 2
draw.text((0, y_pos), self.cached_text, font=font, fill=self.text_color)
logger.debug("Pre-rendered scrolling news image created.")
def calculate_scroll_dimensions(self):
"""Calculate exact dimensions needed for smooth scrolling"""
if not self.cached_text:
return
try:
# Load font
try:
font = ImageFont.truetype(self.font_path, self.font_size)
logger.debug(f"Successfully loaded custom font: {self.font_path}")
except Exception as e:
logger.warning(f"Failed to load custom font '{self.font_path}': {e}. Using default font.")
font = ImageFont.load_default()
# Calculate text width
temp_img = Image.new('RGB', (1, 1))
temp_draw = ImageDraw.Draw(temp_img)
# Get text dimensions
bbox = temp_draw.textbbox((0, 0), self.cached_text, font=font)
self.total_scroll_width = bbox[2] - bbox[0]
# Calculate dynamic display duration
self.calculate_dynamic_duration()
logger.debug(f"Text width calculated: {self.total_scroll_width} pixels")
logger.debug(f"Dynamic duration calculated: {self.dynamic_duration} seconds")
except Exception as e:
logger.error(f"Error calculating scroll dimensions: {e}")
self.total_scroll_width = len(self.cached_text) * 8 # Fallback estimate
self.calculate_dynamic_duration()
def create_scrolling_image(self):
"""Create a pre-rendered image for smooth scrolling."""
if not self.cached_text:
self.scrolling_image = None
return
try:
font = ImageFont.truetype(self.font_path, self.font_size)
except Exception as e:
logger.warning(f"Failed to load custom font for pre-rendering: {e}. Using default.")
font = ImageFont.load_default()
height = self.display_manager.height
width = self.total_scroll_width
self.scrolling_image = Image.new('RGB', (width, height), (0, 0, 0))
draw = ImageDraw.Draw(self.scrolling_image)
text_height = self.font_size
y_pos = (height - text_height) // 2
draw.text((0, y_pos), self.cached_text, font=font, fill=self.text_color)
logger.debug("Pre-rendered scrolling news image created.")
def calculate_dynamic_duration(self):
"""Calculate the exact time needed to display all headlines"""
# If dynamic duration is disabled, use fixed duration from config
if not self.dynamic_duration_enabled:
self.dynamic_duration = self.news_config.get('fixed_duration', 60)
logger.debug(f"Dynamic duration disabled, using fixed duration: {self.dynamic_duration}s")
return
if not self.total_scroll_width:
self.dynamic_duration = self.min_duration # Use configured minimum
return
try:
# Get display width (assume full width of display)
display_width = getattr(self.display_manager, 'width', 128) # Default to 128 if not available
# Calculate total scroll distance needed
# Text needs to scroll from right edge to completely off left edge
total_scroll_distance = display_width + self.total_scroll_width
# Calculate time based on scroll speed and delay
# scroll_speed = pixels per frame, scroll_delay = seconds per frame
frames_needed = total_scroll_distance / self.scroll_speed
total_time = frames_needed * self.scroll_delay
# Add buffer time for smooth cycling (configurable %)
buffer_time = total_time * self.duration_buffer
calculated_duration = int(total_time + buffer_time)
# Apply configured min/max limits
if calculated_duration < self.min_duration:
self.dynamic_duration = self.min_duration
logger.debug(f"Duration capped to minimum: {self.min_duration}s")
elif calculated_duration > self.max_duration:
self.dynamic_duration = self.max_duration
logger.debug(f"Duration capped to maximum: {self.max_duration}s")
else:
self.dynamic_duration = calculated_duration
logger.debug(f"Dynamic duration calculation:")
logger.debug(f" Display width: {display_width}px")
logger.debug(f" Text width: {self.total_scroll_width}px")
logger.debug(f" Total scroll distance: {total_scroll_distance}px")
logger.debug(f" Frames needed: {frames_needed:.1f}")
logger.debug(f" Base time: {total_time:.2f}s")
logger.debug(f" Buffer time: {buffer_time:.2f}s ({self.duration_buffer*100}%)")
logger.debug(f" Calculated duration: {calculated_duration}s")
logger.debug(f" Final duration: {self.dynamic_duration}s")
except Exception as e:
logger.error(f"Error calculating dynamic duration: {e}")
self.dynamic_duration = self.min_duration # Use configured minimum as fallback
def should_update(self) -> bool:
"""Check if news data should be updated"""
return (time.time() - self.last_update) > self.update_interval
def get_news_display(self) -> Image.Image:
"""Generate the scrolling news ticker display by cropping the pre-rendered image."""
try:
if not self.scrolling_image:
logger.debug("No pre-rendered image available, showing loading image.")
return self.create_no_news_image()
width = self.display_manager.width
height = self.display_manager.height
# Use modulo for continuous scrolling
self.scroll_position = (self.scroll_position + self.scroll_speed) % self.total_scroll_width
# Crop the visible part of the image
x = self.scroll_position
visible_end = x + width
if visible_end <= self.total_scroll_width:
# No wrap-around needed
img = self.scrolling_image.crop((x, 0, visible_end, height))
else:
# Handle wrap-around
img = Image.new('RGB', (width, height))
width1 = self.total_scroll_width - x
portion1 = self.scrolling_image.crop((x, 0, self.total_scroll_width, height))
img.paste(portion1, (0, 0))
width2 = width - width1
portion2 = self.scrolling_image.crop((0, 0, width2, height))
img.paste(portion2, (width1, 0))
# Check for rotation when scroll completes a cycle
if self.scroll_position < self.scroll_speed: # Check if we just wrapped around
self.rotation_count += 1
if (self.rotation_enabled and
self.rotation_count >= self.rotation_threshold and
any(len(headlines) > self.headlines_per_feed for headlines in self.news_data.values())):
logger.info("News rotation threshold reached. Preparing new headlines.")
self.prepare_headlines_for_display()
self.rotation_count = 0
return img
except Exception as e:
logger.error(f"Error generating news display: {e}")
return self.create_error_image(str(e))
def create_no_news_image(self) -> Image.Image:
"""Create image when no news is available"""
width = self.display_manager.width
height = self.display_manager.height
img = Image.new('RGB', (width, height), (0, 0, 0))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype(self.font_path, self.font_size)
logger.debug(f"Successfully loaded custom font: {self.font_path}")
except Exception as e:
logger.warning(f"Failed to load custom font '{self.font_path}': {e}. Using default font.")
font = ImageFont.load_default()
text = "Loading news..."
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = (width - text_width) // 2
y = (height - text_height) // 2
draw.text((x, y), text, font=font, fill=self.text_color)
return img
def create_error_image(self, error_msg: str) -> Image.Image:
"""Create image for error display"""
width = self.display_manager.width
height = self.display_manager.height
img = Image.new('RGB', (width, height), (0, 0, 0))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype(self.font_path, max(8, self.font_size - 2))
logger.debug(f"Successfully loaded custom font: {self.font_path}")
except Exception as e:
logger.warning(f"Failed to load custom font '{self.font_path}': {e}. Using default font.")
font = ImageFont.load_default()
text = f"News Error: {error_msg[:50]}..."
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = max(0, (width - text_width) // 2)
y = (height - text_height) // 2
draw.text((x, y), text, font=font, fill=(255, 0, 0))
return img
def display_news(self, force_clear: bool = False):
"""Display method for news ticker - called by display controller"""
try:
# Only fetch data once when we start displaying
if not self.current_headlines and not self.is_fetching:
logger.debug("Initializing news display - fetching data")
self.is_fetching = True
try:
self.fetch_news_data()
finally:
self.is_fetching = False
# Get the current news display image
img = self.get_news_display()
# Set the image and update display
self.display_manager.image = img
self.display_manager.update_display()
# Add scroll delay to control speed
time.sleep(self.scroll_delay)
# Debug: log scroll position
if hasattr(self, 'scroll_position') and hasattr(self, 'total_scroll_width'):
logger.debug(f"Scroll position: {self.scroll_position}/{self.total_scroll_width}")
return True
except Exception as e:
logger.error(f"Error in news display: {e}")
# Create error image
error_img = self.create_error_image(str(e))
self.display_manager.image = error_img
self.display_manager.update_display()
return False
def run_news_display(self):
"""Standalone method to run news display in its own loop"""
try:
while True:
img = self.get_news_display()
self.display_manager.image = img
self.display_manager.update_display()
time.sleep(self.scroll_delay)
except KeyboardInterrupt:
logger.debug("News display interrupted by user")
except Exception as e:
logger.error(f"Error in news display loop: {e}")
def add_custom_feed(self, name: str, url: str):
"""Add a custom RSS feed"""
if name not in self.custom_feeds:
self.custom_feeds[name] = url
# Update config
if 'news_manager' not in self.config:
self.config['news_manager'] = {}
self.config['news_manager']['custom_feeds'] = self.custom_feeds
self.config_manager.save_config(self.config)
logger.debug(f"Added custom feed: {name} -> {url}")
def remove_custom_feed(self, name: str):
"""Remove a custom RSS feed"""
if name in self.custom_feeds:
del self.custom_feeds[name]
# Update config
self.config['news_manager']['custom_feeds'] = self.custom_feeds
self.config_manager.save_config(self.config)
logger.debug(f"Removed custom feed: {name}")
def set_enabled_feeds(self, feeds: List[str]):
"""Set which feeds are enabled"""
self.enabled_feeds = feeds
# Update config
if 'news_manager' not in self.config:
self.config['news_manager'] = {}
self.config['news_manager']['enabled_feeds'] = self.enabled_feeds
self.config_manager.save_config(self.config)
logger.debug(f"Updated enabled feeds: {self.enabled_feeds}")
# Refresh headlines
self.fetch_news_data()
def get_available_feeds(self) -> Dict[str, str]:
"""Get all available feeds (default + custom)"""
return {**self.default_feeds, **self.custom_feeds}
def get_feed_status(self) -> Dict[str, Any]:
"""Get status information about feeds"""
status = {
'enabled_feeds': self.enabled_feeds,
'available_feeds': list(self.get_available_feeds().keys()),
'headlines_per_feed': self.headlines_per_feed,
'last_update': self.last_update,
'total_headlines': sum(len(headlines) for headlines in self.news_data.values()),
'rotation_enabled': self.rotation_enabled,
'rotation_count': self.rotation_count,
'dynamic_duration': self.dynamic_duration
}
return status
def get_dynamic_duration(self) -> int:
"""Get the calculated dynamic duration for display"""
# For smooth scrolling, use a very short duration so display controller calls us frequently
# The scroll_speed controls how many pixels we move per call
# Return the current calculated duration without fetching data
return self.dynamic_duration # 0.1 second duration - display controller will call us 10 times per second