Created Base Sports Classes (#39)

* rebase

* Update NFL and NCAA FB fetch

* update FB updates

* kinda working, kinda broken

* Fixed and update loggers

* move to individual files

* timeout updates

* seems to work well

* Leaderboard overestimates time

* ignore that

* minor syntax updates

* More consolidation but i broke something

* fixed

* Hockey seems to work

* Fix my changes to logo downloader

* even more consolidation

* fixes

* more cleanup

* inheritance stuff

* Change football to ESPN down text, it does what ur already doing. Change color to red on Red ZOne

* Fix leaderboard

* Update football.py

Signed-off-by: Alex Resnick <adr8292@gmail.com>

* Minor fixes

* don't want that

* background fetch

* whoops

---------

Signed-off-by: Alex Resnick <adr8292@gmail.com>
Co-authored-by: Alex Resnick <adr8282@gmail.com>
Co-authored-by: ChuckBuilds <33324927+ChuckBuilds@users.noreply.github.com>
This commit is contained in:
Alex Resnick
2025-09-24 16:10:41 -05:00
committed by GitHub
parent 42e14f99b0
commit 76a9e98ba7
14 changed files with 2553 additions and 4322 deletions

1
.gitignore vendored
View File

@@ -22,6 +22,7 @@ ENV/
.idea/
*.swp
*.swo
emulator_config.json
# Cache directory
cache/

Binary file not shown.

Before

Width:  |  Height:  |  Size: 651 B

After

Width:  |  Height:  |  Size: 16 KiB

View File

@@ -328,7 +328,6 @@
"live_update_interval": 30,
"live_odds_update_interval": 3600,
"odds_update_interval": 3600,
"season_cache_duration_seconds": 86400,
"recent_games_to_show": 1,
"upcoming_games_to_show": 1,
"show_favorite_teams_only": true,
@@ -410,7 +409,6 @@
"live_update_interval": 30,
"live_odds_update_interval": 3600,
"odds_update_interval": 3600,
"season_cache_duration_seconds": 86400,
"recent_games_to_show": 1,
"upcoming_games_to_show": 1,
"show_favorite_teams_only": true,

View File

@@ -27,7 +27,7 @@ import json
import queue
from concurrent.futures import ThreadPoolExecutor, Future
import weakref
from src.cache_manager import CacheManager
# Configure logging
logger = logging.getLogger(__name__)
@@ -78,7 +78,7 @@ class BackgroundDataService:
with intelligent caching, retry logic, and progress tracking.
"""
def __init__(self, cache_manager, max_workers: int = 3, request_timeout: int = 30):
def __init__(self, cache_manager: CacheManager, max_workers: int = 3, request_timeout: int = 30):
"""
Initialize the background data service.

View File

@@ -0,0 +1,545 @@
from typing import Dict, Any, Optional, List
from src.display_manager import DisplayManager
from src.cache_manager import CacheManager
from datetime import datetime, timezone, timedelta
import logging
from PIL import Image, ImageDraw, ImageFont
import time
import pytz
from src.base_classes.sports import SportsCore
import requests
class Football(SportsCore):
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager, logger: logging.Logger, sport_key: str):
super().__init__(config, display_manager, cache_manager, logger, sport_key)
def _fetch_game_odds(self, _: Dict) -> None:
pass
def _fetch_odds(self, game: Dict, league: str) -> None:
super()._fetch_odds(game, "football", league)
def _extract_game_details(self, game_event: Dict) -> Optional[Dict]:
"""Extract relevant game details from ESPN NCAA FB API response."""
details, home_team, away_team, status, situation = self._extract_game_details_common(game_event)
if details is None or home_team is None or away_team is None or status is None:
return
try:
competition = game_event["competitions"][0]
status = competition["status"]
# --- Football Specific Details (Likely same for NFL/NCAAFB) ---
down_distance_text = ""
possession_indicator = None # Default to None
scoring_event = "" # Track scoring events
home_timeouts = 0
away_timeouts = 0
is_redzone = False
posession = None
if situation and status["type"]["state"] == "in":
# down = situation.get("down")
down_distance_text = situation.get("shortDownDistanceText")
# long_text = situation.get("downDistanceText")
# distance = situation.get("distance")
# Detect scoring events from status detail
status_detail = status["type"].get("detail", "").lower()
status_short = status["type"].get("shortDetail", "").lower()
is_redzone = situation.get("isRedZone")
posession = situation.get("possession")
# Check for scoring events in status text
if any(keyword in status_detail for keyword in ["touchdown", "td"]):
scoring_event = "TOUCHDOWN"
elif any(keyword in status_detail for keyword in ["field goal", "fg"]):
scoring_event = "FIELD GOAL"
elif any(keyword in status_detail for keyword in ["extra point", "pat", "point after"]):
scoring_event = "PAT"
elif any(keyword in status_short for keyword in ["touchdown", "td"]):
scoring_event = "TOUCHDOWN"
elif any(keyword in status_short for keyword in ["field goal", "fg"]):
scoring_event = "FIELD GOAL"
elif any(keyword in status_short for keyword in ["extra point", "pat"]):
scoring_event = "PAT"
# Determine possession based on team ID
possession_team_id = situation.get("possession")
if possession_team_id:
if possession_team_id == home_team.get("id"):
possession_indicator = "home"
elif possession_team_id == away_team.get("id"):
possession_indicator = "away"
home_timeouts = situation.get("homeTimeouts", 3) # Default to 3 if not specified
away_timeouts = situation.get("awayTimeouts", 3) # Default to 3 if not specified
# Format period/quarter
period = status.get("period", 0)
period_text = ""
if status["type"]["state"] == "in":
if period == 0: period_text = "Start" # Before kickoff
elif period == 1: period_text = "Q1"
elif period == 2: period_text = "Q2"
elif period == 3: period_text = "Q3" # Fixed: period 3 is 3rd quarter, not halftime
elif period == 4: period_text = "Q4"
elif period > 4: period_text = "OT" # OT starts after Q4
elif status["type"]["state"] == "halftime" or status["type"]["name"] == "STATUS_HALFTIME": # Check explicit halftime state
period_text = "HALF"
elif status["type"]["state"] == "post":
if period > 4 : period_text = "Final/OT"
else: period_text = "Final"
elif status["type"]["state"] == "pre":
period_text = details.get("game_time", "") # Show time for upcoming
# Timeouts (assuming max 3 per half, not carried over well in standard API)
# API often provides 'timeouts' directly under team, but reset logic is tricky
# We might need to simplify this or just use a fixed display if API is unreliable
# For upcoming games, we'll show based on number of games, not time window
# For recent games, we'll show based on number of games, not time window
is_within_window = True # Always include games, let the managers filter by count
details.update({
"period": period,
"period_text": period_text, # Formatted quarter/status
"clock": status.get("displayClock", "0:00"),
"home_timeouts": home_timeouts,
"away_timeouts": away_timeouts,
"down_distance_text": down_distance_text, # Added Down/Distance
"is_redzone": is_redzone,
"possession": posession, # ID of team with possession
"possession_indicator": possession_indicator, # Added for easy home/away check
"scoring_event": scoring_event, # Track scoring events (TOUCHDOWN, FIELD GOAL, PAT)
})
# Basic validation (can be expanded)
if not details['home_abbr'] or not details['away_abbr']:
self.logger.warning(f"Missing team abbreviation in event: {details['id']}")
return None
self.logger.debug(f"Extracted: {details['away_abbr']}@{details['home_abbr']}, Status: {status['type']['name']}, Live: {details['is_live']}, Final: {details['is_final']}, Upcoming: {details['is_upcoming']}")
return details
except Exception as e:
# Log the problematic event structure if possible
logging.error(f"Error extracting game details: {e} from event: {game_event.get('id')}", exc_info=True)
return None
def _fetch_todays_games(self, league: str) -> Optional[Dict]:
"""Fetch only today's games for live updates (not entire season)."""
return super()._fetch_todays_games("football", league)
def _get_weeks_data(self, league: str) -> Optional[Dict]:
"""
Get partial data for immediate display while background fetch is in progress.
This fetches current/recent games only for quick response.
"""
return super()._get_weeks_data("football", league)
class FootballLive(Football):
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager, logger: logging.Logger, sport_key: str):
super().__init__(config, display_manager, cache_manager, logger, sport_key)
self.update_interval = self.mode_config.get("live_update_interval", 15)
self.no_data_interval = 300
self.last_update = 0
self.live_games = []
self.current_game_index = 0
self.last_game_switch = 0
self.game_display_duration = self.mode_config.get("live_game_duration", 20)
self.last_display_update = 0
self.last_log_time = 0
self.log_interval = 300
def update(self):
"""Update live game data and handle game switching."""
if not self.is_enabled:
return
# Define current_time and interval before the problematic line (originally line 455)
# Ensure 'import time' is present at the top of the file.
current_time = time.time()
# Define interval using a pattern similar to NFLLiveManager's update method.
# Uses getattr for robustness, assuming attributes for live_games, test_mode,
# no_data_interval, and update_interval are available on self.
_live_games_attr = getattr(self, 'live_games', [])
_test_mode_attr = getattr(self, 'test_mode', False) # test_mode is often from a base class or config
_no_data_interval_attr = getattr(self, 'no_data_interval', 300) # Default similar to NFLLiveManager
_update_interval_attr = getattr(self, 'update_interval', 15) # Default similar to NFLLiveManager
interval = _no_data_interval_attr if not _live_games_attr and not _test_mode_attr else _update_interval_attr
# Original line from traceback (line 455), now with variables defined:
if current_time - self.last_update >= interval:
self.last_update = current_time
# Fetch rankings if enabled
if self.show_ranking:
self._fetch_team_rankings()
if self.test_mode:
# Simulate clock running down in test mode
if self.current_game and self.current_game["is_live"]:
try:
minutes, seconds = map(int, self.current_game["clock"].split(':'))
seconds -= 1
if seconds < 0:
seconds = 59
minutes -= 1
if minutes < 0:
# Simulate end of quarter/game
if self.current_game["period"] < 4: # Q4 is period 4
self.current_game["period"] += 1
# Update period_text based on new period
if self.current_game["period"] == 1: self.current_game["period_text"] = "Q1"
elif self.current_game["period"] == 2: self.current_game["period_text"] = "Q2"
elif self.current_game["period"] == 3: self.current_game["period_text"] = "Q3"
elif self.current_game["period"] == 4: self.current_game["period_text"] = "Q4"
# Reset clock for next quarter (e.g., 15:00)
minutes, seconds = 15, 0
else:
# Simulate game end
self.current_game["is_live"] = False
self.current_game["is_final"] = True
self.current_game["period_text"] = "Final"
minutes, seconds = 0, 0
self.current_game["clock"] = f"{minutes:02d}:{seconds:02d}"
# Simulate down change occasionally
if seconds % 15 == 0:
self.current_game["down_distance_text"] = f"{['1st','2nd','3rd','4th'][seconds % 4]} & {seconds % 10 + 1}"
self.current_game["status_text"] = f"{self.current_game['period_text']} {self.current_game['clock']}"
# Display update handled by main loop or explicit call if needed immediately
# self.display(force_clear=True) # Only if immediate update is desired here
except ValueError:
self.logger.warning("Test mode: Could not parse clock") # Changed log prefix
# No actual display call here, let main loop handle it
else:
# Fetch live game data
data = self._fetch_data()
new_live_games = []
if data and "events" in data:
for event in data["events"]:
details = self._extract_game_details(event)
if details and (details["is_live"] or details["is_halftime"]):
# If show_favorite_teams_only is true, only add if it's a favorite.
# Otherwise, add all games.
if self.mode_config.get("show_favorite_teams_only", False):
if details["home_abbr"] in self.favorite_teams or details["away_abbr"] in self.favorite_teams:
if self.show_odds:
self._fetch_game_odds(details)
new_live_games.append(details)
else:
if self.show_odds:
self._fetch_game_odds(details)
new_live_games.append(details)
# Log changes or periodically
current_time_for_log = time.time() # Use a consistent time for logging comparison
should_log = (
current_time_for_log - self.last_log_time >= self.log_interval or
len(new_live_games) != len(self.live_games) or
any(g1['id'] != g2.get('id') for g1, g2 in zip(self.live_games, new_live_games)) or # Check if game IDs changed
(not self.live_games and new_live_games) # Log if games appeared
)
if should_log:
if new_live_games:
filter_text = "favorite teams" if self.mode_config.get("show_favorite_teams_only", False) else "all teams"
self.logger.info(f"Found {len(new_live_games)} live/halftime games for {filter_text}.")
for game_info in new_live_games: # Renamed game to game_info
self.logger.info(f" - {game_info['away_abbr']}@{game_info['home_abbr']} ({game_info.get('status_text', 'N/A')})")
else:
filter_text = "favorite teams" if self.mode_config.get("show_favorite_teams_only", False) else "criteria"
self.logger.info(f"No live/halftime games found for {filter_text}.")
self.last_log_time = current_time_for_log
# Update game list and current game
if new_live_games:
# Check if the games themselves changed, not just scores/time
new_game_ids = {g['id'] for g in new_live_games}
current_game_ids = {g['id'] for g in self.live_games}
if new_game_ids != current_game_ids:
self.live_games = sorted(new_live_games, key=lambda g: g.get('start_time_utc') or datetime.now(timezone.utc)) # Sort by start time
# Reset index if current game is gone or list is new
if not self.current_game or self.current_game['id'] not in new_game_ids:
self.current_game_index = 0
self.current_game = self.live_games[0] if self.live_games else None
self.last_game_switch = current_time
else:
# Find current game's new index if it still exists
try:
self.current_game_index = next(i for i, g in enumerate(self.live_games) if g['id'] == self.current_game['id'])
self.current_game = self.live_games[self.current_game_index] # Update current_game with fresh data
except StopIteration: # Should not happen if check above passed, but safety first
self.current_game_index = 0
self.current_game = self.live_games[0]
self.last_game_switch = current_time
else:
# Just update the data for the existing games
temp_game_dict = {g['id']: g for g in new_live_games}
self.live_games = [temp_game_dict.get(g['id'], g) for g in self.live_games] # Update in place
if self.current_game:
self.current_game = temp_game_dict.get(self.current_game['id'], self.current_game)
# Display update handled by main loop based on interval
else:
# No live games found
if self.live_games: # Were there games before?
self.logger.info("Live games previously showing have ended or are no longer live.") # Changed log prefix
self.live_games = []
self.current_game = None
self.current_game_index = 0
else:
# Error fetching data or no events
if self.live_games: # Were there games before?
self.logger.warning("Could not fetch update; keeping existing live game data for now.") # Changed log prefix
else:
self.logger.warning("Could not fetch data and no existing live games.") # Changed log prefix
self.current_game = None # Clear current game if fetch fails and no games were active
# Handle game switching (outside test mode check)
if not self.test_mode and len(self.live_games) > 1 and (current_time - self.last_game_switch) >= self.game_display_duration:
self.current_game_index = (self.current_game_index + 1) % len(self.live_games)
self.current_game = self.live_games[self.current_game_index]
self.last_game_switch = current_time
self.logger.info(f"Switched live view to: {self.current_game['away_abbr']}@{self.current_game['home_abbr']}") # Changed log prefix
# Force display update via flag or direct call if needed, but usually let main loop handle
def _draw_scorebug_layout(self, game: Dict, force_clear: bool = False) -> None:
"""Draw the detailed scorebug layout for a live NCAA FB game.""" # Updated docstring
try:
main_img = Image.new('RGBA', (self.display_width, self.display_height), (0, 0, 0, 255))
overlay = Image.new('RGBA', (self.display_width, self.display_height), (0, 0, 0, 0))
draw_overlay = ImageDraw.Draw(overlay) # Draw text elements on overlay first
home_logo = self._load_and_resize_logo(game["home_id"], game["home_abbr"], game["home_logo_path"], game.get("home_logo_url"))
away_logo = self._load_and_resize_logo(game["away_id"], game["away_abbr"], game["away_logo_path"], game.get("away_logo_url"))
if not home_logo or not away_logo:
self.logger.error(f"Failed to load logos for live game: {game.get('id')}") # Changed log prefix
# Draw placeholder text if logos fail
draw_final = ImageDraw.Draw(main_img.convert('RGB'))
self._draw_text_with_outline(draw_final, "Logo Error", (5,5), self.fonts['status'])
self.display_manager.image.paste(main_img.convert('RGB'), (0, 0))
self.display_manager.update_display()
return
center_y = self.display_height // 2
# Draw logos (shifted slightly more inward than NHL perhaps)
home_x = self.display_width - home_logo.width + 10 #adjusted from 18 # Adjust position as needed
home_y = center_y - (home_logo.height // 2)
main_img.paste(home_logo, (home_x, home_y), home_logo)
away_x = -10 #adjusted from 18 # Adjust position as needed
away_y = center_y - (away_logo.height // 2)
main_img.paste(away_logo, (away_x, away_y), away_logo)
# --- Draw Text Elements on Overlay ---
# Note: Rankings are now handled in the records/rankings section below
# Scores (centered, slightly above bottom)
home_score = str(game.get("home_score", "0"))
away_score = str(game.get("away_score", "0"))
score_text = f"{away_score}-{home_score}"
score_width = draw_overlay.textlength(score_text, font=self.fonts['score'])
score_x = (self.display_width - score_width) // 2
score_y = (self.display_height // 2) - 3 #centered #from 14 # Position score higher
self._draw_text_with_outline(draw_overlay, score_text, (score_x, score_y), self.fonts['score'])
# Period/Quarter and Clock (Top center)
period_clock_text = f"{game.get('period_text', '')} {game.get('clock', '')}".strip()
if game.get("is_halftime"): period_clock_text = "Halftime" # Override for halftime
status_width = draw_overlay.textlength(period_clock_text, font=self.fonts['time'])
status_x = (self.display_width - status_width) // 2
status_y = 1 # Position at top
self._draw_text_with_outline(draw_overlay, period_clock_text, (status_x, status_y), self.fonts['time'])
# Down & Distance or Scoring Event (Below Period/Clock)
scoring_event = game.get("scoring_event", "")
down_distance = game.get("down_distance_text", "")
# Show scoring event if detected, otherwise show down & distance
if scoring_event and game.get("is_live"):
# Display scoring event with special formatting
event_width = draw_overlay.textlength(scoring_event, font=self.fonts['detail'])
event_x = (self.display_width - event_width) // 2
event_y = (self.display_height) - 7
# Color coding for different scoring events
if scoring_event == "TOUCHDOWN":
event_color = (255, 215, 0) # Gold
elif scoring_event == "FIELD GOAL":
event_color = (0, 255, 0) # Green
elif scoring_event == "PAT":
event_color = (255, 165, 0) # Orange
else:
event_color = (255, 255, 255) # White
self._draw_text_with_outline(draw_overlay, scoring_event, (event_x, event_y), self.fonts['detail'], fill=event_color)
elif down_distance and game.get("is_live"): # Only show if live and available
dd_width = draw_overlay.textlength(down_distance, font=self.fonts['detail'])
dd_x = (self.display_width - dd_width) // 2
dd_y = (self.display_height)- 7 # Top of D&D text
down_color = (200, 200, 0) if not game.get("is_redzone", False) else (255,0,0) # Yellowish text
self._draw_text_with_outline(draw_overlay, down_distance, (dd_x, dd_y), self.fonts['detail'], fill=down_color)
# Possession Indicator (small football icon)
possession = game.get("possession_indicator")
if possession: # Only draw if possession is known
ball_radius_x = 3 # Wider for football shape
ball_radius_y = 2 # Shorter for football shape
ball_color = (139, 69, 19) # Brown color for the football
lace_color = (255, 255, 255) # White for laces
# Approximate height of the detail font (4x6 font at size 6 is roughly 6px tall)
detail_font_height_approx = 6
ball_y_center = dd_y + (detail_font_height_approx // 2) # Center ball vertically with D&D text
possession_ball_padding = 3 # Pixels between D&D text and ball
if possession == "away":
# Position ball to the left of D&D text
ball_x_center = dd_x - possession_ball_padding - ball_radius_x
elif possession == "home":
# Position ball to the right of D&D text
ball_x_center = dd_x + dd_width + possession_ball_padding + ball_radius_x
else:
ball_x_center = 0 # Should not happen / no indicator
if ball_x_center > 0: # Draw if position is valid
# Draw the football shape (ellipse)
draw_overlay.ellipse(
(ball_x_center - ball_radius_x, ball_y_center - ball_radius_y, # x0, y0
ball_x_center + ball_radius_x, ball_y_center + ball_radius_y), # x1, y1
fill=ball_color, outline=(0,0,0)
)
# Draw a simple horizontal lace
draw_overlay.line(
(ball_x_center - 1, ball_y_center, ball_x_center + 1, ball_y_center),
fill=lace_color, width=1
)
# Timeouts (Bottom corners) - 3 small bars per team
timeout_bar_width = 4
timeout_bar_height = 2
timeout_spacing = 1
timeout_y = self.display_height - timeout_bar_height - 1 # Bottom edge
# Away Timeouts (Bottom Left)
away_timeouts_remaining = game.get("away_timeouts", 0)
for i in range(3):
to_x = 2 + i * (timeout_bar_width + timeout_spacing)
color = (255, 255, 255) if i < away_timeouts_remaining else (80, 80, 80) # White if available, gray if used
draw_overlay.rectangle([to_x, timeout_y, to_x + timeout_bar_width, timeout_y + timeout_bar_height], fill=color, outline=(0,0,0))
# Home Timeouts (Bottom Right)
home_timeouts_remaining = game.get("home_timeouts", 0)
for i in range(3):
to_x = self.display_width - 2 - timeout_bar_width - (2-i) * (timeout_bar_width + timeout_spacing)
color = (255, 255, 255) if i < home_timeouts_remaining else (80, 80, 80) # White if available, gray if used
draw_overlay.rectangle([to_x, timeout_y, to_x + timeout_bar_width, timeout_y + timeout_bar_height], fill=color, outline=(0,0,0))
# Draw odds if available
if 'odds' in game and game['odds']:
self._draw_dynamic_odds(draw_overlay, game['odds'], self.display_width, self.display_height)
# Draw records or rankings if enabled
if self.show_records or self.show_ranking:
try:
record_font = ImageFont.truetype("assets/fonts/4x6-font.ttf", 6)
self.logger.debug(f"Loaded 6px record font successfully")
except IOError:
record_font = ImageFont.load_default()
self.logger.warning(f"Failed to load 6px font, using default font (size: {record_font.size})")
# Get team abbreviations
away_abbr = game.get('away_abbr', '')
home_abbr = game.get('home_abbr', '')
record_bbox = draw_overlay.textbbox((0,0), "0-0", font=record_font)
record_height = record_bbox[3] - record_bbox[1]
record_y = self.display_height - record_height - 4
self.logger.debug(f"Record positioning: height={record_height}, record_y={record_y}, display_height={self.display_height}")
# Display away team info
if away_abbr:
if self.show_ranking and self.show_records:
# When both rankings and records are enabled, rankings replace records completely
rankings = self._fetch_team_rankings()
away_rank = rankings.get(away_abbr, 0)
if away_rank > 0:
away_text = f"#{away_rank}"
else:
# Show nothing for unranked teams when rankings are prioritized
away_text = ''
elif self.show_ranking:
# Show ranking only if available
rankings = self._fetch_team_rankings()
away_rank = rankings.get(away_abbr, 0)
if away_rank > 0:
away_text = f"#{away_rank}"
else:
away_text = ''
elif self.show_records:
# Show record only when rankings are disabled
away_text = game.get('away_record', '')
else:
away_text = ''
if away_text:
away_record_x = 3
self.logger.debug(f"Drawing away ranking '{away_text}' at ({away_record_x}, {record_y}) with font size {record_font.size if hasattr(record_font, 'size') else 'unknown'}")
self._draw_text_with_outline(draw_overlay, away_text, (away_record_x, record_y), record_font)
# Display home team info
if home_abbr:
if self.show_ranking and self.show_records:
# When both rankings and records are enabled, rankings replace records completely
rankings = self._fetch_team_rankings()
home_rank = rankings.get(home_abbr, 0)
if home_rank > 0:
home_text = f"#{home_rank}"
else:
# Show nothing for unranked teams when rankings are prioritized
home_text = ''
elif self.show_ranking:
# Show ranking only if available
rankings = self._fetch_team_rankings()
home_rank = rankings.get(home_abbr, 0)
if home_rank > 0:
home_text = f"#{home_rank}"
else:
home_text = ''
elif self.show_records:
# Show record only when rankings are disabled
home_text = game.get('home_record', '')
else:
home_text = ''
if home_text:
home_record_bbox = draw_overlay.textbbox((0,0), home_text, font=record_font)
home_record_width = home_record_bbox[2] - home_record_bbox[0]
home_record_x = self.display_width - home_record_width - 3
self.logger.debug(f"Drawing home ranking '{home_text}' at ({home_record_x}, {record_y}) with font size {record_font.size if hasattr(record_font, 'size') else 'unknown'}")
self._draw_text_with_outline(draw_overlay, home_text, (home_record_x, record_y), record_font)
# Composite the text overlay onto the main image
main_img = Image.alpha_composite(main_img, overlay)
main_img = main_img.convert('RGB') # Convert for display
# Display the final image
self.display_manager.image.paste(main_img, (0, 0))
self.display_manager.update_display() # Update display here for live
except Exception as e:
self.logger.error(f"Error displaying live Football game: {e}", exc_info=True) # Changed log prefix

328
src/base_classes/hockey.py Normal file
View File

@@ -0,0 +1,328 @@
from typing import Dict, Any, Optional
from src.display_manager import DisplayManager
from src.cache_manager import CacheManager
from datetime import datetime, timezone
import logging
from PIL import Image, ImageDraw, ImageFont
import time
from src.base_classes.sports import SportsCore
class Hockey(SportsCore):
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager, logger: logging.Logger, sport_key: str):
super().__init__(config, display_manager, cache_manager, logger, sport_key)
def _fetch_odds(self, game: Dict, league: str) -> None:
super()._fetch_odds(game, "hockey", league)
def _extract_game_details(self, game_event: Dict) -> Optional[Dict]:
"""Extract relevant game details from ESPN NCAA FB API response."""
# --- THIS METHOD MAY NEED ADJUSTMENTS FOR NCAA FB API DIFFERENCES ---
details, home_team, away_team, status, situation = self._extract_game_details_common(game_event)
if details is None or home_team is None or away_team is None or status is None:
return
try:
competition = game_event["competitions"][0]
status = competition["status"]
if situation and status["type"]["state"] == "in":
# Detect scoring events from status detail
status_detail = status["type"].get("detail", "").lower()
status_short = status["type"].get("shortDetail", "").lower()
# Format period/quarter
period = status.get("period", 0)
period_text = ""
if status["type"]["state"] == "in":
if period == 0: period_text = "Start" # Before kickoff
elif period == 1: period_text = "P1"
elif period == 2: period_text = "P2"
elif period == 3: period_text = "P3" # Fixed: period 3 is 3rd quarter, not halftime
elif period > 3: period_text = f"OT {period - 3}" # OT starts after P3
elif status["type"]["state"] == "post":
if period > 3 :
period_text = "Final/OT"
else:
period_text = "Final"
elif status["type"]["state"] == "pre":
period_text = details.get("game_time", "") # Show time for upcoming
details.update({
"period": period,
"period_text": period_text, # Formatted quarter/status
"clock": status.get("displayClock", "0:00")
})
# Basic validation (can be expanded)
if not details['home_abbr'] or not details['away_abbr']:
self.logger.warning(f"Missing team abbreviation in event: {details['id']}")
return None
self.logger.debug(f"Extracted: {details['away_abbr']}@{details['home_abbr']}, Status: {status['type']['name']}, Live: {details['is_live']}, Final: {details['is_final']}, Upcoming: {details['is_upcoming']}")
return details
except Exception as e:
# Log the problematic event structure if possible
logging.error(f"Error extracting game details: {e} from event: {game_event.get('id')}", exc_info=True)
return None
class HockeyLive(Hockey):
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager, logger: logging.Logger, sport_key: str):
super().__init__(config, display_manager, cache_manager, logger, sport_key)
self.update_interval = self.mode_config.get("live_update_interval", 15)
self.no_data_interval = 300
self.last_update = 0
self.live_games = []
self.current_game_index = 0
self.last_game_switch = 0
self.game_display_duration = self.mode_config.get("live_game_duration", 20)
self.last_display_update = 0
self.last_log_time = 0
self.log_interval = 300
def update(self):
"""Update live game data."""
if not self.is_enabled: return
current_time = time.time()
interval = self.no_data_interval if not self.live_games else self.update_interval
if current_time - self.last_update >= interval:
self.last_update = current_time
if self.test_mode:
# For testing, we'll just update the clock to show it's working
if self.current_game:
minutes = int(self.current_game["clock"].split(":")[0])
seconds = int(self.current_game["clock"].split(":")[1])
seconds -= 1
if seconds < 0:
seconds = 59
minutes -= 1
if minutes < 0:
minutes = 19
if self.current_game["period"] < 3:
self.current_game["period"] += 1
else:
self.current_game["period"] = 1
self.current_game["clock"] = f"{minutes:02d}:{seconds:02d}"
# Always update display in test mode
self.display(force_clear=True)
else:
# Fetch live game data from ESPN API
data = self._fetch_data()
if data and "events" in data:
# Find all live games involving favorite teams
new_live_games = []
for event in data["events"]:
details = self._extract_game_details(event)
if details and details["is_live"]:
self._fetch_odds(details)
new_live_games.append(details)
# Filter for favorite teams only if the config is set
if self.mode_config.get("show_favorite_teams_only", False):
new_live_games = [game for game in new_live_games
if game['home_abbr'] in self.favorite_teams or
game['away_abbr'] in self.favorite_teams]
# Only log if there's a change in games or enough time has passed
should_log = (
current_time - self.last_log_time >= self.log_interval or
len(new_live_games) != len(self.live_games) or
not self.live_games # Log if we had no games before
)
if should_log:
if new_live_games:
filter_text = "favorite teams" if self.mode_config.get("show_favorite_teams_only", False) else "all teams"
self.logger.info(f"[NCAAMH] Found {len(new_live_games)} live games involving {filter_text}")
for game in new_live_games:
self.logger.info(f"[NCAAMH] Live game: {game['away_abbr']} vs {game['home_abbr']} - Period {game['period']}, {game['clock']}")
else:
filter_text = "favorite teams" if self.mode_config.get("show_favorite_teams_only", False) else "criteria"
self.logger.info(f"[NCAAMH] No live games found matching {filter_text}")
self.last_log_time = current_time
if new_live_games:
# Update the current game with the latest data
for new_game in new_live_games:
if self.current_game and (
(new_game["home_abbr"] == self.current_game["home_abbr"] and
new_game["away_abbr"] == self.current_game["away_abbr"]) or
(new_game["home_abbr"] == self.current_game["away_abbr"] and
new_game["away_abbr"] == self.current_game["home_abbr"])
):
self.current_game = new_game
break
# Only update the games list if we have new games
if not self.live_games or set(game["away_abbr"] + game["home_abbr"] for game in new_live_games) != set(game["away_abbr"] + game["home_abbr"] for game in self.live_games):
self.live_games = new_live_games
# If we don't have a current game or it's not in the new list, start from the beginning
if not self.current_game or self.current_game not in self.live_games:
self.current_game_index = 0
self.current_game = self.live_games[0]
self.last_game_switch = current_time
# Update display if data changed, limit rate
if current_time - self.last_display_update >= 1.0:
# self.display(force_clear=True) # REMOVED: DisplayController handles this
self.last_display_update = current_time
else:
# No live games found
self.live_games = []
self.current_game = None
# Check if it's time to switch games
if len(self.live_games) > 1 and (current_time - self.last_game_switch) >= self.game_display_duration:
self.current_game_index = (self.current_game_index + 1) % len(self.live_games)
self.current_game = self.live_games[self.current_game_index]
self.last_game_switch = current_time
# self.display(force_clear=True) # REMOVED: DisplayController handles this
self.last_display_update = current_time # Track time for potential display update
def _draw_scorebug_layout(self, game: Dict, force_clear: bool = False) -> None:
"""Draw the detailed scorebug layout for a live NCAA FB game.""" # Updated docstring
try:
main_img = Image.new('RGBA', (self.display_width, self.display_height), (0, 0, 0, 255))
overlay = Image.new('RGBA', (self.display_width, self.display_height), (0, 0, 0, 0))
draw_overlay = ImageDraw.Draw(overlay) # Draw text elements on overlay first
home_logo = self._load_and_resize_logo(game["home_id"], game["home_abbr"], game["home_logo_path"], game.get("home_logo_url"))
away_logo = self._load_and_resize_logo(game["away_id"], game["away_abbr"], game["away_logo_path"], game.get("away_logo_url"))
if not home_logo or not away_logo:
self.logger.error(f"Failed to load logos for live game: {game.get('id')}") # Changed log prefix
# Draw placeholder text if logos fail
draw_final = ImageDraw.Draw(main_img.convert('RGB'))
self._draw_text_with_outline(draw_final, "Logo Error", (5,5), self.fonts['status'])
self.display_manager.image.paste(main_img.convert('RGB'), (0, 0))
self.display_manager.update_display()
return
center_y = self.display_height // 2
# Draw logos (shifted slightly more inward than NHL perhaps)
home_x = self.display_width - home_logo.width + 10 #adjusted from 18 # Adjust position as needed
home_y = center_y - (home_logo.height // 2)
main_img.paste(home_logo, (home_x, home_y), home_logo)
away_x = -10 #adjusted from 18 # Adjust position as needed
away_y = center_y - (away_logo.height // 2)
main_img.paste(away_logo, (away_x, away_y), away_logo)
# --- Draw Text Elements on Overlay ---
# Note: Rankings are now handled in the records/rankings section below
# Scores (centered, slightly above bottom)
home_score = str(game.get("home_score", "0"))
away_score = str(game.get("away_score", "0"))
score_text = f"{away_score}-{home_score}"
score_width = draw_overlay.textlength(score_text, font=self.fonts['score'])
score_x = (self.display_width - score_width) // 2
score_y = (self.display_height // 2) - 3 #centered #from 14 # Position score higher
self._draw_text_with_outline(draw_overlay, score_text, (score_x, score_y), self.fonts['score'])
# Period/Quarter and Clock (Top center)
period_clock_text = f"{game.get('period_text', '')} {game.get('clock', '')}".strip()
if game.get("is_halftime"): period_clock_text = "Halftime" # Override for halftime
status_width = draw_overlay.textlength(period_clock_text, font=self.fonts['time'])
status_x = (self.display_width - status_width) // 2
status_y = 1 # Position at top
self._draw_text_with_outline(draw_overlay, period_clock_text, (status_x, status_y), self.fonts['time'])
# Draw odds if available
if 'odds' in game and game['odds']:
self._draw_dynamic_odds(draw_overlay, game['odds'], self.display_width, self.display_height)
# Draw records or rankings if enabled
if self.show_records or self.show_ranking:
try:
record_font = ImageFont.truetype("assets/fonts/4x6-font.ttf", 6)
self.logger.debug(f"Loaded 6px record font successfully")
except IOError:
record_font = ImageFont.load_default()
self.logger.warning(f"Failed to load 6px font, using default font (size: {record_font.size})")
# Get team abbreviations
away_abbr = game.get('away_abbr', '')
home_abbr = game.get('home_abbr', '')
record_bbox = draw_overlay.textbbox((0,0), "0-0", font=record_font)
record_height = record_bbox[3] - record_bbox[1]
record_y = self.display_height - record_height - 4
self.logger.debug(f"Record positioning: height={record_height}, record_y={record_y}, display_height={self.display_height}")
# Display away team info
if away_abbr:
if self.show_ranking and self.show_records:
# When both rankings and records are enabled, rankings replace records completely
rankings = self._fetch_team_rankings()
away_rank = rankings.get(away_abbr, 0)
if away_rank > 0:
away_text = f"#{away_rank}"
else:
# Show nothing for unranked teams when rankings are prioritized
away_text = ''
elif self.show_ranking:
# Show ranking only if available
rankings = self._fetch_team_rankings()
away_rank = rankings.get(away_abbr, 0)
if away_rank > 0:
away_text = f"#{away_rank}"
else:
away_text = ''
elif self.show_records:
# Show record only when rankings are disabled
away_text = game.get('away_record', '')
else:
away_text = ''
if away_text:
away_record_x = 3
self.logger.debug(f"Drawing away ranking '{away_text}' at ({away_record_x}, {record_y}) with font size {record_font.size if hasattr(record_font, 'size') else 'unknown'}")
self._draw_text_with_outline(draw_overlay, away_text, (away_record_x, record_y), record_font)
# Display home team info
if home_abbr:
if self.show_ranking and self.show_records:
# When both rankings and records are enabled, rankings replace records completely
rankings = self._fetch_team_rankings()
home_rank = rankings.get(home_abbr, 0)
if home_rank > 0:
home_text = f"#{home_rank}"
else:
# Show nothing for unranked teams when rankings are prioritized
home_text = ''
elif self.show_ranking:
# Show ranking only if available
rankings = self._fetch_team_rankings()
home_rank = rankings.get(home_abbr, 0)
if home_rank > 0:
home_text = f"#{home_rank}"
else:
home_text = ''
elif self.show_records:
# Show record only when rankings are disabled
home_text = game.get('home_record', '')
else:
home_text = ''
if home_text:
home_record_bbox = draw_overlay.textbbox((0,0), home_text, font=record_font)
home_record_width = home_record_bbox[2] - home_record_bbox[0]
home_record_x = self.display_width - home_record_width - 3
self.logger.debug(f"Drawing home ranking '{home_text}' at ({home_record_x}, {record_y}) with font size {record_font.size if hasattr(record_font, 'size') else 'unknown'}")
self._draw_text_with_outline(draw_overlay, home_text, (home_record_x, record_y), record_font)
# Composite the text overlay onto the main image
main_img = Image.alpha_composite(main_img, overlay)
main_img = main_img.convert('RGB') # Convert for display
# Display the final image
self.display_manager.image.paste(main_img, (0, 0))
self.display_manager.update_display() # Update display here for live
except Exception as e:
self.logger.error(f"Error displaying live Hockey game: {e}", exc_info=True) # Changed log prefix

1135
src/base_classes/sports.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,6 @@
import time
import logging
import sys
import pytz
from typing import Dict, Any, List
from datetime import datetime, time as time_obj
@@ -324,7 +323,7 @@ class DisplayController:
# Set initial display to first available mode (clock)
self.current_mode_index = 0
self.current_display_mode = self.available_modes[0] if self.available_modes else 'none'
self.current_display_mode = "none"
# Reset logged duration when mode is initialized
if hasattr(self, '_last_logged_duration'):
delattr(self, '_last_logged_duration')
@@ -501,25 +500,21 @@ class DisplayController:
return self.display_durations.get(mode_key, 60)
# Handle dynamic duration for stocks
if mode_key == 'stocks' and self.stocks:
elif mode_key == 'stocks' and self.stocks:
try:
dynamic_duration = self.stocks.get_dynamic_duration()
# Only log if duration has changed or we haven't logged this duration yet
if not hasattr(self, '_last_logged_duration') or self._last_logged_duration != dynamic_duration:
logger.info(f"Using dynamic duration for stocks: {dynamic_duration} seconds")
self._last_logged_duration = dynamic_duration
# Debug: Always log the current dynamic duration value
logger.debug(f"Stocks dynamic duration check: {dynamic_duration}s")
return dynamic_duration
except Exception as e:
logger.error(f"Error getting dynamic duration for stocks: {e}")
# Fall back to configured duration
fallback_duration = self.display_durations.get(mode_key, 60)
logger.debug(f"Using fallback duration for stocks: {fallback_duration}s")
return fallback_duration
return self.display_durations.get(mode_key, 60)
# Handle dynamic duration for stock_news
if mode_key == 'stock_news' and self.news:
elif mode_key == 'stock_news' and self.news:
try:
dynamic_duration = self.news.get_dynamic_duration()
# Only log if duration has changed or we haven't logged this duration yet
@@ -533,7 +528,7 @@ class DisplayController:
return self.display_durations.get(mode_key, 60)
# Handle dynamic duration for odds_ticker
if mode_key == 'odds_ticker' and self.odds_ticker:
elif mode_key == 'odds_ticker' and self.odds_ticker:
try:
dynamic_duration = self.odds_ticker.get_dynamic_duration()
# Only log if duration has changed or we haven't logged this duration yet
@@ -546,23 +541,22 @@ class DisplayController:
# Fall back to configured duration
return self.display_durations.get(mode_key, 60)
# Handle leaderboard duration (user choice between fixed or dynamic)
if mode_key == 'leaderboard' and self.leaderboard:
# Handle dynamic duration for leaderboard
elif mode_key == 'leaderboard' and self.leaderboard:
try:
duration = self.leaderboard.get_duration()
mode_type = "dynamic" if self.leaderboard.dynamic_duration else "fixed"
dynamic_duration = self.leaderboard.get_dynamic_duration()
# Only log if duration has changed or we haven't logged this duration yet
if not hasattr(self, '_last_logged_leaderboard_duration') or self._last_logged_leaderboard_duration != duration:
logger.info(f"Using leaderboard {mode_type} duration: {duration} seconds")
self._last_logged_leaderboard_duration = duration
return duration
if not hasattr(self, '_last_logged_leaderboard_duration') or self._last_logged_leaderboard_duration != dynamic_duration:
logger.info(f"Using dynamic duration for leaderboard: {dynamic_duration} seconds")
self._last_logged_leaderboard_duration = dynamic_duration
return dynamic_duration
except Exception as e:
logger.error(f"Error getting duration for leaderboard: {e}")
logger.error(f"Error getting dynamic duration for leaderboard: {e}")
# Fall back to configured duration
return self.display_durations.get(mode_key, 600)
return self.display_durations.get(mode_key, 60)
# Simplify weather key handling
if mode_key.startswith('weather_'):
elif mode_key.startswith('weather_'):
return self.display_durations.get(mode_key, 15)
# duration_key = mode_key.split('_', 1)[1]
# if duration_key == 'current': duration_key = 'weather_current' # Keep specific keys
@@ -581,8 +575,6 @@ class DisplayController:
# Defer updates for modules that might cause lag during scrolling
if self.odds_ticker:
self.display_manager.defer_update(self.odds_ticker.update, priority=1)
if self.leaderboard:
self.display_manager.defer_update(self.leaderboard.update, priority=1)
if self.stocks:
self.display_manager.defer_update(self.stocks.update_stock_data, priority=2)
if self.news:
@@ -600,55 +592,6 @@ class DisplayController:
self.display_manager.defer_update(self.nfl_recent.update, priority=3)
if hasattr(self, 'nfl_upcoming') and self.nfl_upcoming:
self.display_manager.defer_update(self.nfl_upcoming.update, priority=3)
# Defer other sport manager updates
if hasattr(self, 'nhl_live') and self.nhl_live:
self.display_manager.defer_update(self.nhl_live.update, priority=3)
if hasattr(self, 'nhl_recent') and self.nhl_recent:
self.display_manager.defer_update(self.nhl_recent.update, priority=3)
if hasattr(self, 'nhl_upcoming') and self.nhl_upcoming:
self.display_manager.defer_update(self.nhl_upcoming.update, priority=3)
if hasattr(self, 'nba_live') and self.nba_live:
self.display_manager.defer_update(self.nba_live.update, priority=3)
if hasattr(self, 'nba_recent') and self.nba_recent:
self.display_manager.defer_update(self.nba_recent.update, priority=3)
if hasattr(self, 'nba_upcoming') and self.nba_upcoming:
self.display_manager.defer_update(self.nba_upcoming.update, priority=3)
if hasattr(self, 'mlb_live') and self.mlb_live:
self.display_manager.defer_update(self.mlb_live.update, priority=3)
if hasattr(self, 'mlb_recent') and self.mlb_recent:
self.display_manager.defer_update(self.mlb_recent.update, priority=3)
if hasattr(self, 'mlb_upcoming') and self.mlb_upcoming:
self.display_manager.defer_update(self.mlb_upcoming.update, priority=3)
if hasattr(self, 'milb_live') and self.milb_live:
self.display_manager.defer_update(self.milb_live.update, priority=3)
if hasattr(self, 'milb_recent') and self.milb_recent:
self.display_manager.defer_update(self.milb_recent.update, priority=3)
if hasattr(self, 'milb_upcoming') and self.milb_upcoming:
self.display_manager.defer_update(self.milb_upcoming.update, priority=3)
if hasattr(self, 'soccer_live') and self.soccer_live:
self.display_manager.defer_update(self.soccer_live.update, priority=3)
if hasattr(self, 'soccer_recent') and self.soccer_recent:
self.display_manager.defer_update(self.soccer_recent.update, priority=3)
if hasattr(self, 'soccer_upcoming') and self.soccer_upcoming:
self.display_manager.defer_update(self.soccer_upcoming.update, priority=3)
if hasattr(self, 'ncaa_baseball_live') and self.ncaa_baseball_live:
self.display_manager.defer_update(self.ncaa_baseball_live.update, priority=3)
if hasattr(self, 'ncaa_baseball_recent') and self.ncaa_baseball_recent:
self.display_manager.defer_update(self.ncaa_baseball_recent.update, priority=3)
if hasattr(self, 'ncaa_baseball_upcoming') and self.ncaa_baseball_upcoming:
self.display_manager.defer_update(self.ncaa_baseball_upcoming.update, priority=3)
if hasattr(self, 'ncaam_basketball_live') and self.ncaam_basketball_live:
self.display_manager.defer_update(self.ncaam_basketball_live.update, priority=3)
if hasattr(self, 'ncaam_basketball_recent') and self.ncaam_basketball_recent:
self.display_manager.defer_update(self.ncaam_basketball_recent.update, priority=3)
if hasattr(self, 'ncaam_basketball_upcoming') and self.ncaam_basketball_upcoming:
self.display_manager.defer_update(self.ncaam_basketball_upcoming.update, priority=3)
if hasattr(self, 'ncaam_hockey_live') and self.ncaam_hockey_live:
self.display_manager.defer_update(self.ncaam_hockey_live.update, priority=3)
if hasattr(self, 'ncaam_hockey_recent') and self.ncaam_hockey_recent:
self.display_manager.defer_update(self.ncaam_hockey_recent.update, priority=3)
if hasattr(self, 'ncaam_hockey_upcoming') and self.ncaam_hockey_upcoming:
self.display_manager.defer_update(self.ncaam_hockey_upcoming.update, priority=3)
# Continue with non-scrolling-sensitive updates
if self.weather: self.weather.get_weather()
if self.calendar: self.calendar.update(time.time())
@@ -666,57 +609,6 @@ class DisplayController:
if self.text_display: self.text_display.update()
if self.of_the_day: self.of_the_day.update(time.time())
# Update all sports managers in background
# NHL managers
if self.nhl_live: self.nhl_live.update()
if self.nhl_recent: self.nhl_recent.update()
if self.nhl_upcoming: self.nhl_upcoming.update()
# NBA managers
if self.nba_live: self.nba_live.update()
if self.nba_recent: self.nba_recent.update()
if self.nba_upcoming: self.nba_upcoming.update()
# MLB managers
if self.mlb_live: self.mlb_live.update()
if self.mlb_recent: self.mlb_recent.update()
if self.mlb_upcoming: self.mlb_upcoming.update()
# MiLB managers
if self.milb_live: self.milb_live.update()
if self.milb_recent: self.milb_recent.update()
if self.milb_upcoming: self.milb_upcoming.update()
# Soccer managers
if self.soccer_live: self.soccer_live.update()
if self.soccer_recent: self.soccer_recent.update()
if self.soccer_upcoming: self.soccer_upcoming.update()
# NFL managers
if self.nfl_live: self.nfl_live.update()
if self.nfl_recent: self.nfl_recent.update()
if self.nfl_upcoming: self.nfl_upcoming.update()
# NCAAFB managers
if self.ncaa_fb_live: self.ncaa_fb_live.update()
if self.ncaa_fb_recent: self.ncaa_fb_recent.update()
if self.ncaa_fb_upcoming: self.ncaa_fb_upcoming.update()
# NCAA Baseball managers
if self.ncaa_baseball_live: self.ncaa_baseball_live.update()
if self.ncaa_baseball_recent: self.ncaa_baseball_recent.update()
if self.ncaa_baseball_upcoming: self.ncaa_baseball_upcoming.update()
# NCAA Basketball managers
if self.ncaam_basketball_live: self.ncaam_basketball_live.update()
if self.ncaam_basketball_recent: self.ncaam_basketball_recent.update()
if self.ncaam_basketball_upcoming: self.ncaam_basketball_upcoming.update()
# NCAA Hockey managers
if self.ncaam_hockey_live: self.ncaam_hockey_live.update()
if self.ncaam_hockey_recent: self.ncaam_hockey_recent.update()
if self.ncaam_hockey_upcoming: self.ncaam_hockey_upcoming.update()
# News manager fetches data when displayed, not during updates
# if self.news_manager: self.news_manager.fetch_news_data()
@@ -1039,15 +931,7 @@ class DisplayController:
self.is_display_active = True
return
# Get current time in configured timezone
timezone_str = self.config.get('timezone', 'UTC')
try:
tz = pytz.timezone(timezone_str)
except pytz.exceptions.UnknownTimeZoneError:
logger.warning(f"Unknown timezone: {timezone_str}, falling back to UTC")
tz = pytz.UTC
now_time = datetime.now(tz).time()
now_time = datetime.now().time()
# Handle overnight schedules
if self.start_time <= self.end_time:
@@ -1122,6 +1006,10 @@ class DisplayController:
return
try:
self.cache_manager.clear_cache()
self._update_modules()
time.sleep(5)
self.current_display_mode = self.available_modes[self.current_mode_index] if self.available_modes else 'none'
while True:
current_time = time.time()
@@ -1134,9 +1022,8 @@ class DisplayController:
# Update data for all modules first
self._update_modules()
# Process deferred updates less frequently when scrolling to improve performance
if not self.display_manager.is_currently_scrolling() or (current_time % 2.0 < 0.1):
self.display_manager.process_deferred_updates()
# Process any deferred updates that may have accumulated
self.display_manager.process_deferred_updates()
# Update live modes in rotation if needed
self._update_live_modes_in_rotation()
@@ -1258,10 +1145,6 @@ class DisplayController:
if hasattr(self, '_last_logged_duration'):
delattr(self, '_last_logged_duration')
elif current_time - self.last_switch >= self.get_current_duration() or self.force_change:
# Debug timing information
elapsed_time = current_time - self.last_switch
expected_duration = self.get_current_duration()
logger.debug(f"Mode switch triggered: {self.current_display_mode} - Elapsed: {elapsed_time:.1f}s, Expected: {expected_duration}s, Force: {self.force_change}")
self.force_change = False
if self.current_display_mode == 'calendar' and self.calendar:
self.calendar.advance_event()
@@ -1283,8 +1166,6 @@ class DisplayController:
if needs_switch:
self.force_clear = True
self.last_switch = current_time
# Debug: Log when we set the switch time for a new mode
logger.debug(f"Mode switch completed: {self.current_display_mode} - Switch time set to {current_time}, Duration: {self.get_current_duration()}s")
else:
self.force_clear = False
# Only set manager_to_display if it hasn't been set by live priority logic

View File

@@ -5,6 +5,7 @@ from typing import Dict, Any, List, Optional
import os
import time
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path
try:
from .display_manager import DisplayManager
from .cache_manager import CacheManager
@@ -38,17 +39,19 @@ class LeaderboardManager:
self.is_enabled = self.leaderboard_config.get('enabled', False)
self.enabled_sports = self.leaderboard_config.get('enabled_sports', {})
self.update_interval = self.leaderboard_config.get('update_interval', 3600)
self.scroll_speed = self.leaderboard_config.get('scroll_speed', 1)
self.scroll_delay = self.leaderboard_config.get('scroll_delay', 0.01)
self.scroll_speed = self.leaderboard_config.get('scroll_speed', 2)
self.scroll_delay = self.leaderboard_config.get('scroll_delay', 0.05)
self.display_duration = self.leaderboard_config.get('display_duration', 30)
self.loop = self.leaderboard_config.get('loop', True)
self.request_timeout = self.leaderboard_config.get('request_timeout', 30)
self.time_over = 0
# Duration settings - user can choose between fixed or dynamic (exception-based)
self.dynamic_duration = self.leaderboard_config.get('dynamic_duration', True)
# Get duration from main display_durations section
self.display_duration = config.get('display', {}).get('display_durations', {}).get('leaderboard', 300)
self.max_display_time = self.leaderboard_config.get('max_display_time', 600) # 10 minutes maximum
# Dynamic duration settings
self.dynamic_duration_enabled = self.leaderboard_config.get('dynamic_duration', True)
self.min_duration = self.leaderboard_config.get('min_duration', 30)
self.max_duration = self.leaderboard_config.get('max_duration', 300)
self.duration_buffer = self.leaderboard_config.get('duration_buffer', 0.1)
self.dynamic_duration = 60 # Default duration in seconds
self.total_scroll_width = 0 # Track total width for dynamic duration calculation
# Initialize managers
self.cache_manager = CacheManager()
@@ -78,19 +81,6 @@ class LeaderboardManager:
self.leaderboard_image = None # This will hold the single, wide image
self.last_display_time = 0
# FPS tracking variables
self.frame_times = [] # Store last 30 frame times for averaging
self.last_frame_time = 0
self.fps_log_interval = 10.0 # Log FPS every 10 seconds
self.last_fps_log_time = 0
# Performance optimization caches
self._cached_draw = None
self._last_visible_image = None
self._last_scroll_position = -1
self._text_measurement_cache = {} # Cache for font measurements
self._logo_cache = {} # Cache for resized logos
# Font setup
self.fonts = self._load_fonts()
@@ -251,19 +241,6 @@ class LeaderboardManager:
}
return fonts
def _get_cached_text_bbox(self, text, font_name):
"""Get cached text bounding box measurements."""
cache_key = f"{text}_{font_name}"
if cache_key not in self._text_measurement_cache:
font = self.fonts[font_name]
bbox = font.getbbox(text)
self._text_measurement_cache[cache_key] = {
'width': bbox[2] - bbox[0],
'height': bbox[3] - bbox[1],
'bbox': bbox
}
return self._text_measurement_cache[cache_key]
def _draw_text_with_outline(self, draw, text, position, font, fill=(255, 255, 255), outline_color=(0, 0, 0)):
"""Draw text with a black outline for better readability on LED matrix."""
x, y = position
@@ -273,35 +250,31 @@ class LeaderboardManager:
# Draw text
draw.text((x, y), text, font=font, fill=fill)
def _get_cached_resized_logo(self, team_abbr: str, logo_dir: str, size: int, league: str = None, team_name: str = None) -> Optional[Image.Image]:
"""Get cached resized team logo."""
cache_key = f"{team_abbr}_{logo_dir}_{size}"
if cache_key not in self._logo_cache:
logo = self._get_team_logo(team_abbr, logo_dir, league, team_name)
if logo:
resized_logo = logo.resize((size, size), Image.Resampling.LANCZOS)
self._logo_cache[cache_key] = resized_logo
else:
self._logo_cache[cache_key] = None
return self._logo_cache[cache_key]
def _get_team_logo(self, team_abbr: str, logo_dir: str, league: str = None, team_name: str = None) -> Optional[Image.Image]:
def _get_team_logo(self, league: str, team_id: str, team_abbr: str, logo_dir: str) -> Optional[Image.Image]:
"""Get team logo from the configured directory, downloading if missing."""
if not team_abbr or not logo_dir:
logger.debug("Cannot get team logo with missing team_abbr or logo_dir")
return None
try:
logo_path = os.path.join(logo_dir, f"{team_abbr}.png")
logo_path = Path(logo_dir, f"{team_abbr}.png")
logger.debug(f"Attempting to load logo from path: {logo_path}")
if os.path.exists(logo_path):
logo = Image.open(logo_path)
logger.debug(f"Successfully loaded logo for {team_abbr} from {logo_path}")
return logo
else:
logger.warning(f"Logo not found at path: {logo_path}")
# Try to download the missing logo if we have league information
if league:
success = download_missing_logo(team_abbr, league, team_name)
logger.info(f"Attempting to download missing logo for {team_abbr} in league {league}")
# league: str, team_id: str, team_abbreviation: str, logo_path: Path, logo_url: str | None = None, create_placeholder: bool = True
success = download_missing_logo(league, team_id, team_abbr, logo_path, None)
if success:
# Try to load the downloaded logo
if os.path.exists(logo_path):
logo = Image.open(logo_path)
logger.info(f"Successfully downloaded and loaded logo for {team_abbr}")
return logo
return None
@@ -467,6 +440,7 @@ class LeaderboardManager:
for team_data in teams:
team_info = team_data.get('team', {})
team_name = team_info.get('name', 'Unknown')
team_id = team_info.get('id')
team_abbr = team_info.get('abbreviation', 'Unknown')
current_rank = team_data.get('current', 0)
record_summary = team_data.get('recordSummary', '0-0')
@@ -496,6 +470,7 @@ class LeaderboardManager:
standings.append({
'name': team_name,
'id': team_id,
'abbreviation': team_abbr,
'rank': current_rank,
'wins': wins,
@@ -571,6 +546,7 @@ class LeaderboardManager:
# Process each team in the ranking
for team_data in teams:
team_info = team_data.get('team', {})
team_id = team_info.get('id')
team_name = team_info.get('name', 'Unknown')
team_abbr = team_info.get('abbreviation', 'Unknown')
current_rank = team_data.get('current', 0)
@@ -601,6 +577,7 @@ class LeaderboardManager:
standings.append({
'name': team_name,
'id': team_id,
'abbreviation': team_abbr,
'rank': current_rank,
'wins': wins,
@@ -676,6 +653,7 @@ class LeaderboardManager:
team_name = team_data.get('displayName', 'Unknown')
team_abbr = team_data.get('abbreviation', 'Unknown')
team_id = team_data.get('id')
# Extract record from stats
wins = 0
@@ -715,6 +693,7 @@ class LeaderboardManager:
standings.append({
'name': team_name,
'id': team_id,
'abbreviation': team_abbr,
'wins': wins,
'losses': losses,
@@ -741,6 +720,7 @@ class LeaderboardManager:
team_name = team_data.get('displayName', 'Unknown')
team_abbr = team_data.get('abbreviation', 'Unknown')
team_id = team_data.get('id')
# Extract record from stats
wins = 0
@@ -780,6 +760,7 @@ class LeaderboardManager:
standings.append({
'name': team_name,
'id': team_id,
'abbreviation': team_abbr,
'wins': wins,
'losses': losses,
@@ -913,8 +894,7 @@ class LeaderboardManager:
# Calculate total width needed
total_width = 0
# Use display width for spacing between leagues (simulates blank screen)
spacing = self.display_manager.matrix.width
spacing = 40 # Spacing between leagues
# Calculate width for each league section
for league_data in self.leaderboard_data:
@@ -950,13 +930,13 @@ class LeaderboardManager:
# For other leagues, show position
number_text = f"{i+1}."
number_measurements = self._get_cached_text_bbox(number_text, 'xlarge')
number_width = number_measurements['width']
number_bbox = self.fonts['xlarge'].getbbox(number_text)
number_width = number_bbox[2] - number_bbox[0]
# Calculate width for team abbreviation (use large font like in drawing)
team_text = team['abbreviation']
text_measurements = self._get_cached_text_bbox(team_text, 'large')
text_width = text_measurements['width']
text_bbox = self.fonts['large'].getbbox(team_text)
text_width = text_bbox[2] - text_bbox[0]
# Total team width: bold number + spacing + logo + spacing + text + spacing
team_width = number_width + 4 + logo_size + 4 + text_width + 12 # Spacing between teams
@@ -971,7 +951,6 @@ class LeaderboardManager:
draw = ImageDraw.Draw(self.leaderboard_image)
current_x = 0
for league_idx, league_data in enumerate(self.leaderboard_data):
league_key = league_data['league']
league_config = league_data['league_config']
@@ -992,7 +971,6 @@ class LeaderboardManager:
league_logo = league_logo.resize((logo_width, logo_height), Image.Resampling.LANCZOS)
self.leaderboard_image.paste(league_logo, (logo_x, logo_y), league_logo if league_logo.mode == 'RGBA' else None)
# League name removed - only show league logo
else:
# No league logo available - skip league name display
@@ -1026,16 +1004,17 @@ class LeaderboardManager:
# For other leagues, show position
number_text = f"{i+1}."
number_measurements = self._get_cached_text_bbox(number_text, 'xlarge')
number_width = number_measurements['width']
number_height = number_measurements['height']
number_bbox = self.fonts['xlarge'].getbbox(number_text)
number_width = number_bbox[2] - number_bbox[0]
number_height = number_bbox[3] - number_bbox[1]
number_y = (height - number_height) // 2
self._draw_text_with_outline(draw, number_text, (team_x, number_y), self.fonts['xlarge'], fill=(255, 255, 0))
# Draw team logo (cached and resized)
team_logo = self._get_cached_resized_logo(team['abbreviation'], league_config['logo_dir'],
logo_size, league=league_key, team_name=team.get('name'))
# Draw team logo (95% of display height, centered vertically)
team_logo = self._get_team_logo(league_key, team["id"], team['abbreviation'], league_config['logo_dir'])
if team_logo:
# Resize team logo to dynamic size (95% of display height)
team_logo = team_logo.resize((logo_size, logo_size), Image.Resampling.LANCZOS)
# Paste team logo after the bold number (centered vertically)
logo_x = team_x + number_width + 4
@@ -1044,9 +1023,9 @@ class LeaderboardManager:
# Draw team abbreviation after the logo (centered vertically)
team_text = team['abbreviation']
text_measurements = self._get_cached_text_bbox(team_text, 'large')
text_width = text_measurements['width']
text_height = text_measurements['height']
text_bbox = self.fonts['large'].getbbox(team_text)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = logo_x + logo_size + 4
text_y = (height - text_height) // 2
self._draw_text_with_outline(draw, team_text, (text_x, text_y), self.fonts['large'], fill=(255, 255, 255))
@@ -1056,9 +1035,9 @@ class LeaderboardManager:
else:
# Fallback if no logo - draw team abbreviation after bold number (centered vertically)
team_text = team['abbreviation']
text_measurements = self._get_cached_text_bbox(team_text, 'large')
text_width = text_measurements['width']
text_height = text_measurements['height']
text_bbox = self.fonts['large'].getbbox(team_text)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = team_x + number_width + 4
text_y = (height - text_height) // 2
self._draw_text_with_outline(draw, team_text, (text_x, text_y), self.fonts['large'], fill=(255, 255, 255))
@@ -1072,12 +1051,12 @@ class LeaderboardManager:
# Move to next league section (match width calculation logic)
# Update current_x to where team drawing actually ended
logger.info(f"League {league_idx+1} ({league_key}) teams ended at x={team_x}px")
current_x = team_x + spacing # team_x is at end of teams, add display width gap (simulates blank screen)
logger.info(f"Next league will start at x={current_x}px (gap: {spacing}px)")
current_x = team_x + 20 + spacing # team_x is at end of teams, add internal spacing + inter-league spacing
logger.info(f"Next league will start at x={current_x}px (gap: {20 + spacing}px)")
# Set total scroll width for dynamic duration calculation
# Use actual content width (current_x at end) instead of pre-calculated total_width
actual_content_width = current_x - spacing # Remove the final spacing that won't be used
actual_content_width = current_x - (20 + spacing) # Remove the final spacing that won't be used
self.total_scroll_width = actual_content_width
logger.info(f"Content width - Calculated: {total_width}px, Actual: {actual_content_width}px")
@@ -1109,11 +1088,11 @@ class LeaderboardManager:
else:
number_text = f"{j+1}."
number_measurements = self._get_cached_text_bbox(number_text, 'xlarge')
number_width = number_measurements['width']
number_bbox = self.fonts['xlarge'].getbbox(number_text)
number_width = number_bbox[2] - number_bbox[0]
team_text = team['abbreviation']
text_measurements = self._get_cached_text_bbox(team_text, 'large')
text_width = text_measurements['width']
text_bbox = self.fonts['large'].getbbox(team_text)
text_width = text_bbox[2] - text_bbox[0]
team_width = number_width + 4 + logo_size + 4 + text_width + 12
teams_width += team_width
@@ -1132,22 +1111,128 @@ class LeaderboardManager:
else:
logger.info(f" Final league ends at: {league_end_x}px")
logger.info(f"Total image width: {total_width}px, Display width: {self.display_manager.matrix.width}px")
logger.info(f"Total image width: {total_width}px, Display width: {height}px")
# Calculate dynamic duration using proper scroll-based calculation
if self.dynamic_duration_enabled:
self.calculate_dynamic_duration()
logger.info(f"Created leaderboard image with width {total_width}")
except Exception as e:
logger.error(f"Error creating leaderboard image: {e}")
self.leaderboard_image = None
def get_duration(self) -> int:
"""Get the duration for display based on user preference"""
if self.dynamic_duration:
# Use long timeout and let content determine when done via StopIteration
return self.max_display_time
else:
# Use fixed duration from config
return self.display_duration
def calculate_dynamic_duration(self):
"""Calculate the exact time needed to display all leaderboard content"""
logger.info(f"Calculating dynamic duration - enabled: {self.dynamic_duration_enabled}, content width: {self.total_scroll_width}px")
# If dynamic duration is disabled, use fixed duration from config
if not self.dynamic_duration_enabled:
self.dynamic_duration = self.leaderboard_config.get('display_duration', 60)
logger.debug(f"Dynamic duration disabled, using fixed duration: {self.dynamic_duration}s")
return
if not self.total_scroll_width:
self.dynamic_duration = self.min_duration # Use configured minimum
logger.debug(f"total_scroll_width is 0, using minimum duration: {self.min_duration}s")
return
try:
# Get display width (assume full width of display)
display_width = getattr(self.display_manager, 'matrix', None)
if display_width:
display_width = display_width.width
else:
display_width = 128 # Default to 128 if not available
# Calculate total scroll distance needed
# For looping content, we need to scroll the entire content width
# For non-looping content, we need content width minus display width (since last part shows fully)
if self.loop:
total_scroll_distance = self.total_scroll_width
else:
# For single pass, we need to scroll until the last content is fully visible
total_scroll_distance = max(0, self.total_scroll_width - display_width)
# Calculate time based on scroll speed and delay
# scroll_speed = pixels per frame, scroll_delay = seconds per frame
# However, actual observed speed is slower than theoretical calculation
# Based on log analysis: 1950px in 36s = 54.2 px/s actual speed
# vs theoretical: 1px/0.01s = 100 px/s
# Use actual observed speed for more accurate timing
actual_scroll_speed = 54.2 # pixels per second (calculated from logs)
total_time = total_scroll_distance / actual_scroll_speed
# Add buffer time for smooth cycling (configurable %)
buffer_time = total_time * self.duration_buffer
# Calculate duration for single complete pass
if self.loop:
# For looping: set duration to exactly one loop cycle (no extra time to prevent multiple loops)
calculated_duration = int(total_time)
logger.debug(f"Looping enabled, duration set to exactly one loop cycle: {calculated_duration}s")
else:
# For single pass: precise calculation to show content exactly once
# Add buffer to prevent cutting off the last content
completion_buffer = total_time * 0.05 # 5% extra to ensure complete display
calculated_duration = int(total_time + buffer_time + completion_buffer)
logger.debug(f"Single pass mode, added {completion_buffer:.2f}s completion buffer for precise timing")
# Apply configured min/max limits
if calculated_duration < self.min_duration:
self.dynamic_duration = self.min_duration
logger.debug(f"Duration capped to minimum: {self.min_duration}s")
elif calculated_duration > self.max_duration:
self.dynamic_duration = self.max_duration
logger.debug(f"Duration capped to maximum: {self.max_duration}s")
else:
self.dynamic_duration = calculated_duration
# Additional safety check: if the calculated duration seems too short for the content,
# ensure we have enough time to display all content properly
if self.dynamic_duration < 45 and self.total_scroll_width > 200:
# If we have content but short duration, increase it
# Use a more generous calculation: at least 45s or 1s per 20px
self.dynamic_duration = max(45, int(self.total_scroll_width / 20))
logger.debug(f"Adjusted duration for content: {self.dynamic_duration}s (content width: {self.total_scroll_width}px)")
logger.info(f"Leaderboard dynamic duration calculation:")
logger.info(f" Display width: {display_width}px")
logger.info(f" Content width: {self.total_scroll_width}px")
logger.info(f" Total scroll distance: {total_scroll_distance}px")
logger.info(f" Configured scroll speed: {self.scroll_speed}px/frame")
logger.info(f" Configured scroll delay: {self.scroll_delay}s/frame")
logger.info(f" Actual observed scroll speed: {actual_scroll_speed}px/s (from log analysis)")
logger.info(f" Base time: {total_time:.2f}s")
logger.info(f" Buffer time: {buffer_time:.2f}s ({self.duration_buffer*100}%)")
logger.info(f" Looping enabled: {self.loop}")
logger.info(f" Calculated duration: {calculated_duration}s")
logger.info(f"Final calculated duration: {self.dynamic_duration}s")
# Verify the duration makes sense for the content
expected_scroll_time = self.total_scroll_width / actual_scroll_speed
logger.info(f" Verification - Time to scroll content: {expected_scroll_time:.1f}s")
except Exception as e:
logger.error(f"Error calculating dynamic duration: {e}")
self.dynamic_duration = self.min_duration # Use configured minimum as fallback
def get_dynamic_duration(self) -> int:
"""Get the calculated dynamic duration for display"""
# If we don't have a valid dynamic duration yet (total_scroll_width is 0),
# try to update the data first
if self.total_scroll_width == 0 and self.is_enabled:
logger.debug("get_dynamic_duration called but total_scroll_width is 0, attempting update...")
try:
# Force an update to get the data and calculate proper duration
# Bypass the update interval check for duration calculation
self.update()
logger.debug(f"Force update completed, total_scroll_width: {self.total_scroll_width}px")
except Exception as e:
logger.error(f"Error updating leaderboard for dynamic duration: {e}")
logger.debug(f"get_dynamic_duration called, returning: {self.dynamic_duration}s")
return self.dynamic_duration
def update(self) -> None:
"""Update leaderboard data."""
@@ -1199,71 +1284,67 @@ class LeaderboardManager:
def display(self, force_clear: bool = False) -> None:
"""Display the leaderboard."""
logger.debug("Entering leaderboard display method")
logger.debug(f"Leaderboard enabled: {self.is_enabled}")
logger.debug(f"Current scroll position: {self.scroll_position}")
logger.debug(f"Leaderboard image width: {self.leaderboard_image.width if self.leaderboard_image else 'None'}")
logger.debug(f"Using dynamic duration for leaderboard: {self.dynamic_duration}s")
if not self.is_enabled:
logger.debug("Leaderboard is disabled, exiting display method.")
return
# Reset display start time when force_clear is True or when starting fresh
if force_clear or not hasattr(self, '_display_start_time'):
self._display_start_time = time.time()
logger.debug(f"Reset/initialized display start time: {self._display_start_time}")
# Also reset scroll position for clean start
self.scroll_position = 0
# Initialize FPS tracking
self.last_frame_time = 0
self.frame_times = []
self.last_fps_log_time = time.time()
# Reset performance caches
self._cached_draw = None
self._last_visible_image = None
self._last_scroll_position = -1
# Clear caches but limit their size to prevent memory leaks
if len(self._text_measurement_cache) > 100:
self._text_measurement_cache.clear()
if len(self._logo_cache) > 50:
self._logo_cache.clear()
logger.info("Leaderboard FPS tracking initialized")
else:
# Check if the display start time is too old (more than 2x the dynamic duration)
current_time = time.time()
elapsed_time = current_time - self._display_start_time
if elapsed_time > (self.dynamic_duration * 2):
logger.debug(f"Display start time is too old ({elapsed_time:.1f}s), resetting")
self._display_start_time = current_time
self.scroll_position = 0
logger.debug(f"Number of leagues in data at start of display method: {len(self.leaderboard_data)}")
if not self.leaderboard_data:
logger.warning("Leaderboard has no data. Attempting to update...")
self.update()
if not self.leaderboard_data:
logger.warning("Still no data after update. Displaying fallback message.")
self._display_fallback_message()
return
if self.leaderboard_image is None:
logger.warning("Leaderboard image is not available. Attempting to create it.")
self._create_leaderboard_image()
if self.leaderboard_image is None:
logger.error("Failed to create leaderboard image.")
self._display_fallback_message()
return
try:
current_time = time.time()
# FPS tracking only (no artificial throttling)
if self.last_frame_time > 0:
frame_time = current_time - self.last_frame_time
# FPS tracking - use circular buffer to prevent memory growth
self.frame_times.append(frame_time)
if len(self.frame_times) > 30: # Keep buffer size reasonable
self.frame_times.pop(0)
# Log FPS status every 10 seconds
if current_time - self.last_fps_log_time >= self.fps_log_interval:
if self.frame_times:
avg_frame_time = sum(self.frame_times) / len(self.frame_times)
current_fps = 1.0 / frame_time if frame_time > 0 else 0
avg_fps = 1.0 / avg_frame_time if avg_frame_time > 0 else 0
logger.info(f"Leaderboard FPS: Current={current_fps:.1f}, Average={avg_fps:.1f}, Frame Time={frame_time*1000:.1f}ms")
self.last_fps_log_time = current_time
self.last_frame_time = current_time
# Check if we should be scrolling
should_scroll = current_time - self.last_scroll_time >= self.scroll_delay
# Signal scrolling state to display manager
self.display_manager.set_scrolling_state(True)
if should_scroll:
self.display_manager.set_scrolling_state(True)
else:
# If we're not scrolling, check if we should process deferred updates
self.display_manager.process_deferred_updates()
# Scroll the image every frame for smooth animation
self.scroll_position += self.scroll_speed
# Scroll the image
if should_scroll:
self.scroll_position += self.scroll_speed
self.last_scroll_time = current_time
# Get display dimensions once
# Calculate crop region
width = self.display_manager.matrix.width
height = self.display_manager.matrix.height
@@ -1271,51 +1352,67 @@ class LeaderboardManager:
if self.loop:
# Reset position when we've scrolled past the end for a continuous loop
if self.scroll_position >= self.leaderboard_image.width:
logger.info(f"Leaderboard loop reset: scroll_position {self.scroll_position} >= image width {self.leaderboard_image.width}")
self.scroll_position = 0
logger.info("Leaderboard starting new loop cycle")
else:
# Stop scrolling when we reach the end
if self.scroll_position >= self.leaderboard_image.width - width:
logger.info(f"Leaderboard reached end: scroll_position {self.scroll_position} >= {self.leaderboard_image.width - width}")
self.scroll_position = self.leaderboard_image.width - width
# Signal that scrolling has stopped
self.display_manager.set_scrolling_state(False)
logger.info("Leaderboard scrolling stopped - reached end of content")
if self.time_over == 0:
self.time_over = time.time()
elif time.time() - self.time_over >= 2:
self.time_over = 0
raise StopIteration
# Simple timeout check - prevent hanging beyond maximum display time
# Check if we're at a natural break point for mode switching
elapsed_time = current_time - self._display_start_time
if elapsed_time > self.max_display_time:
raise StopIteration("Maximum display time exceeded")
remaining_time = self.dynamic_duration - elapsed_time
# Optimize: Only create new visible image if scroll position changed significantly
# Use integer scroll position to reduce unnecessary crops
int_scroll_position = int(self.scroll_position)
if int_scroll_position != self._last_scroll_position:
# Ensure crop coordinates are within bounds
crop_left = max(0, int_scroll_position)
crop_right = min(self.leaderboard_image.width, int_scroll_position + width)
# Log scroll progress every 50 pixels to help debug (less verbose)
if self.scroll_position % 50 == 0 and self.scroll_position > 0:
logger.info(f"Leaderboard progress: elapsed={elapsed_time:.1f}s, remaining={remaining_time:.1f}s, scroll_pos={self.scroll_position}/{self.leaderboard_image.width}px")
if crop_right > crop_left: # Valid crop region
# Create the visible part of the image by cropping from the leaderboard_image
self._last_visible_image = self.leaderboard_image.crop((
crop_left,
0,
crop_right,
height
))
self._last_scroll_position = int_scroll_position
# If we have less than 2 seconds remaining, check if we can complete the content display
if remaining_time < 2.0 and self.scroll_position > 0:
# Calculate how much time we need to complete the current scroll position
# Use actual observed scroll speed (54.2 px/s) instead of theoretical calculation
actual_scroll_speed = 54.2 # pixels per second (calculated from logs)
# Cache the draw object to avoid creating it every frame
self._cached_draw = ImageDraw.Draw(self._last_visible_image)
if self.loop:
# For looping, we need to complete one full cycle
distance_to_complete = self.leaderboard_image.width - self.scroll_position
else:
# Invalid crop region, skip this frame
return
# For single pass, we need to reach the end (content width minus display width)
end_position = max(0, self.leaderboard_image.width - width)
distance_to_complete = end_position - self.scroll_position
time_to_complete = distance_to_complete / actual_scroll_speed
if time_to_complete <= remaining_time:
# We have enough time to complete the scroll, continue normally
logger.debug(f"Sufficient time remaining ({remaining_time:.1f}s) to complete scroll ({time_to_complete:.1f}s)")
else:
# Not enough time, reset to beginning for clean transition
logger.warning(f"Not enough time to complete content display - remaining: {remaining_time:.1f}s, needed: {time_to_complete:.1f}s")
logger.debug(f"Resetting scroll position for clean transition")
self.scroll_position = 0
# Create the visible part of the image by cropping from the leaderboard_image
visible_image = self.leaderboard_image.crop((
self.scroll_position,
0,
self.scroll_position + width,
height
))
# Display the visible portion
self.display_manager.image = self._last_visible_image
self.display_manager.draw = self._cached_draw
self.display_manager.image = visible_image
self.display_manager.draw = ImageDraw.Draw(self.display_manager.image)
self.display_manager.update_display()
except StopIteration as e:

View File

@@ -51,9 +51,10 @@ class LogoDownloader:
'nba': 'assets/sports/nba_logos',
'mlb': 'assets/sports/mlb_logos',
'nhl': 'assets/sports/nhl_logos',
# NCAA sports use same directory
'ncaa_fb': 'assets/sports/ncaa_logos',
'ncaa_fb_all': 'assets/sports/ncaa_logos', # FCS teams go in same directory
'fcs': 'assets/sports/ncaa_logos', # FCS teams go in same directory
'ncaa_fb_all': 'assets/sports/ncaa_logos',
'fcs': 'assets/sports/ncaa_logos',
'ncaam_basketball': 'assets/sports/ncaa_logos',
'ncaa_baseball': 'assets/sports/ncaa_logos',
'ncaam_hockey': 'assets/sports/ncaa_logos',
@@ -95,7 +96,8 @@ class LogoDownloader:
'Connection': 'keep-alive'
}
def normalize_abbreviation(self, abbreviation: str) -> str:
@staticmethod
def normalize_abbreviation(abbreviation: str) -> str:
"""Normalize team abbreviation for consistent filename usage."""
# Handle special characters that can cause filesystem issues
normalized = abbreviation.upper()
@@ -125,7 +127,7 @@ class LogoDownloader:
logger.error(f"Failed to create logo directory {logo_dir}: {e}")
return False
def download_logo(self, logo_url: str, filepath: Path, team_name: str) -> bool:
def download_logo(self, logo_url: str, filepath: Path, team_abbreviation: str) -> bool:
"""Download a single logo from URL and save to filepath."""
try:
response = self.session.get(logo_url, headers=self.headers, timeout=self.request_timeout)
@@ -134,7 +136,7 @@ class LogoDownloader:
# Verify it's actually an image
content_type = response.headers.get('content-type', '').lower()
if not any(img_type in content_type for img_type in ['image/png', 'image/jpeg', 'image/jpg', 'image/gif']):
logger.warning(f"Downloaded content for {team_name} is not an image: {content_type}")
logger.warning(f"Downloaded content for {team_abbreviation} is not an image: {content_type}")
return False
with open(filepath, 'wb') as f:
@@ -157,10 +159,10 @@ class LogoDownloader:
# Save the converted image
img.save(filepath, 'PNG')
logger.info(f"Successfully downloaded and converted logo for {team_name} -> {filepath.name}")
logger.info(f"Successfully downloaded and converted logo for {team_abbreviation} -> {filepath.name}")
return True
except Exception as e:
logger.error(f"Downloaded file for {team_name} is not a valid image or conversion failed: {e}")
logger.error(f"Downloaded file for {team_abbreviation} is not a valid image or conversion failed: {e}")
try:
os.remove(filepath) # Remove invalid file
except:
@@ -168,10 +170,10 @@ class LogoDownloader:
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download logo for {team_name}: {e}")
logger.error(f"Failed to download logo for {team_abbreviation}: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error downloading logo for {team_name}: {e}")
logger.error(f"Unexpected error downloading logo for {team_abbreviation}: {e}")
return False
def fetch_teams_data(self, league: str) -> Optional[Dict]:
@@ -197,6 +199,29 @@ class LogoDownloader:
logger.error(f"Error parsing JSON response for {league}: {e}")
return None
def fetch_single_team(self, league: str, team_id: str) -> Optional[Dict]:
"""Fetch team data from ESPN API for a specific league."""
api_url = self.API_ENDPOINTS.get(league)
if not api_url:
logger.error(f"No API endpoint configured for league: {league}")
return None
try:
logger.info(f"Fetching team data for team {team_id} in {league} from ESPN API...")
response = self.session.get(f"{api_url}/{team_id}", headers=self.headers, timeout=self.request_timeout)
response.raise_for_status()
data = response.json()
logger.info(f"Successfully fetched team data for {team_id} in {league}")
return data
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching team data for {team_id} in {league}: {e}")
return None
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON response for{team_id} in {league}: {e}")
return None
def extract_teams_from_data(self, data: Dict, league: str) -> List[Dict[str, str]]:
"""Extract team information from ESPN API response."""
teams = []
@@ -450,66 +475,24 @@ class LogoDownloader:
logger.info(f"Comprehensive NCAA football logo download complete: {downloaded_count} downloaded, {failed_count} failed")
return downloaded_count, failed_count
def download_missing_logo_for_team(self, team_abbreviation: str, league: str, team_name: str = None) -> bool:
def download_missing_logo_for_team(self, league: str, team_id: str, team_abbreviation: str, logo_path: Path) -> bool:
"""Download a specific team's logo if it's missing."""
logo_dir = self.get_logo_directory(league)
if not self.ensure_logo_directory(logo_dir):
return False
filename = f"{self.normalize_abbreviation(team_abbreviation)}.png"
filepath = Path(logo_dir) / filename
# Return True if logo already exists
if filepath.exists():
logger.debug(f"Logo already exists for {team_abbreviation}")
return True
# Fetch team data to find the logo URL
data = self.fetch_teams_data(league)
data = self.fetch_single_team(league, team_id)
if not data:
return False
teams = self.extract_teams_from_data(data, league)
# Find the specific team with improved matching
target_team = None
normalized_search = self.normalize_abbreviation(team_abbreviation)
# First try exact match
for team in teams:
if team['abbreviation'].upper() == team_abbreviation.upper():
target_team = team
break
# If not found, try normalized match
if not target_team:
for team in teams:
normalized_team_abbr = self.normalize_abbreviation(team['abbreviation'])
if normalized_team_abbr == normalized_search:
target_team = team
break
# If still not found, try partial matching for common variations
if not target_team:
search_variations = self._get_team_name_variations(team_abbreviation)
for team in teams:
team_variations = self._get_team_name_variations(team['abbreviation'])
if any(var in team_variations for var in search_variations):
target_team = team
logger.info(f"Found team {team_abbreviation} as {team['abbreviation']} ({team['display_name']})")
break
if not target_team:
logger.warning(f"Team {team_abbreviation} not found in {league} data")
try:
logo_url = data["team"]["logos"][0]["href"]
except KeyError:
return False
# Download the logo
success = self.download_logo(target_team['logo_url'], filepath, target_team['display_name'])
success = self.download_logo(logo_url, logo_path, team_abbreviation)
if success:
time.sleep(0.1) # Small delay
return success
def download_all_missing_logos(self, leagues: List[str] = None, force_download: bool = False) -> Dict[str, Tuple[int, int]]:
def download_all_missing_logos(self, leagues: List[str] | None = None, force_download: bool = False) -> Dict[str, Tuple[int, int]]:
"""Download missing logos for all specified leagues."""
if leagues is None:
leagues = list(self.API_ENDPOINTS.keys())
@@ -531,7 +514,7 @@ class LogoDownloader:
logger.info(f"Overall logo download results: {total_downloaded} downloaded, {total_failed} failed")
return results
def create_placeholder_logo(self, team_abbreviation: str, logo_dir: str, team_name: str = None) -> bool:
def create_placeholder_logo(self, team_abbreviation: str, logo_dir: str) -> bool:
"""Create a placeholder logo when real logo cannot be downloaded."""
try:
# Ensure the logo directory exists
@@ -642,7 +625,7 @@ def get_soccer_league_key(league_code: str) -> str:
# Convenience function for easy integration
def download_missing_logo(team_abbreviation: str, league: str, team_name: str = None, create_placeholder: bool = True) -> bool:
def download_missing_logo(league: str, team_id: str, team_abbreviation: str, logo_path: Path, logo_url: str | None = None, create_placeholder: bool = True) -> bool:
"""
Convenience function to download a missing team logo.
@@ -659,6 +642,7 @@ def download_missing_logo(team_abbreviation: str, league: str, team_name: str =
# Check if logo already exists
logo_dir = downloader.get_logo_directory(league)
downloader.ensure_logo_directory(logo_dir)
filename = f"{downloader.normalize_abbreviation(team_abbreviation)}.png"
filepath = Path(logo_dir) / filename
@@ -667,18 +651,24 @@ def download_missing_logo(team_abbreviation: str, league: str, team_name: str =
return True
# Try to download the real logo first
logger.info(f"Attempting to download logo for {team_abbreviation} ({team_name or 'Unknown'}) from {league}")
success = downloader.download_missing_logo_for_team(team_abbreviation, league, team_name)
logger.info(f"Attempting to download logo for {team_abbreviation} from {league}")
if logo_url:
success = downloader.download_logo(logo_url, filepath, team_abbreviation)
if success:
time.sleep(0.1) # Small delay
return success
success = downloader.download_missing_logo_for_team(league, team_id, team_abbreviation, logo_path)
if not success and create_placeholder:
logger.info(f"Creating placeholder logo for {team_abbreviation} ({team_name or 'Unknown'})")
logger.info(f"Creating placeholder logo for {team_abbreviation}")
# Create placeholder as fallback
success = downloader.create_placeholder_logo(team_abbreviation, logo_dir, team_name)
success = downloader.create_placeholder_logo(team_abbreviation, logo_dir)
if success:
logger.info(f"Successfully handled logo for {team_abbreviation} ({team_name or 'Unknown'})")
logger.info(f"Successfully handled logo for {team_abbreviation}")
else:
logger.warning(f"Failed to download or create logo for {team_abbreviation} ({team_name or 'Unknown'})")
logger.warning(f"Failed to download or create logo for {team_abbreviation}")
return success

File diff suppressed because it is too large Load Diff

View File

@@ -13,21 +13,13 @@ from src.logo_downloader import download_missing_logo
import pytz
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from src.base_classes.sports import SportsRecent, SportsUpcoming
from src.base_classes.hockey import Hockey, HockeyLive
from pathlib import Path
# Constants
ESPN_NCAAMH_SCOREBOARD_URL = "https://site.api.espn.com/apis/site/v2/sports/hockey/mens-college-hockey/scoreboard" # Changed URL for NCAA FB
ESPN_NCAAMH_SCOREBOARD_URL = "https://site.api.espn.com/apis/site/v2/sports/hockey/mens-college-hockey/scoreboard"
# Configure logging to match main configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)03d - %(levelname)s:%(name)s:%(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
class BaseNCAAMHockeyManager: # Renamed class
class BaseNCAAMHockeyManager(Hockey): # Renamed class
"""Base class for NCAA Mens Hockey managers with common functionality.""" # Updated docstring
# Class variables for warning tracking
_no_data_warning_logged = False
@@ -37,70 +29,17 @@ class BaseNCAAMHockeyManager: # Renamed class
_last_shared_update = 0
_processed_games_cache = {} # Cache for processed game data
_processed_games_timestamp = 0
logger = logging.getLogger('NCAAMH') # Changed logger name
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager):
self.display_manager = display_manager
self.config = config
self.cache_manager = cache_manager
self.config_manager = self.cache_manager.config_manager
self.odds_manager = OddsManager(self.cache_manager, self.config_manager)
self.ncaam_hockey_config = config.get("ncaam_hockey_scoreboard", {}) # Changed config key
self.is_enabled = self.ncaam_hockey_config.get("enabled", False)
self.show_odds = self.ncaam_hockey_config.get("show_odds", False)
self.test_mode = self.ncaam_hockey_config.get("test_mode", False)
self.logo_dir = self.ncaam_hockey_config.get("logo_dir", "assets/sports/ncaa_logos") # Changed logo dir
self.update_interval = self.ncaam_hockey_config.get("update_interval_seconds", 60)
self.show_records = self.ncaam_hockey_config.get('show_records', False)
self.show_ranking = self.ncaam_hockey_config.get('show_ranking', False)
self.season_cache_duration = self.ncaam_hockey_config.get("season_cache_duration_seconds", 86400) # 24 hours default
# Number of games to show (instead of time-based windows)
self.recent_games_to_show = self.ncaam_hockey_config.get("recent_games_to_show", 5) # Show last 5 games
self.upcoming_games_to_show = self.ncaam_hockey_config.get("upcoming_games_to_show", 10) # Show next 10 games
# Set up session with retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=5, # increased number of retries
backoff_factor=1, # increased backoff factor
status_forcelist=[429, 500, 502, 503, 504], # added 429 to retry list
allowed_methods=["GET", "HEAD", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
# Set up headers
self.headers = {
'User-Agent': 'LEDMatrix/1.0 (https://github.com/yourusername/LEDMatrix; contact@example.com)',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
self.last_update = 0
self.current_game = None
self.fonts = self._load_fonts()
self.favorite_teams = self.ncaam_hockey_config.get("favorite_teams", [])
self.logger = logging.getLogger('NCAAMH') # Changed logger name
super().__init__(config=config, display_manager=display_manager, cache_manager=cache_manager, logger=self.logger, sport_key="ncaam_hockey")
# Check display modes to determine what data to fetch
display_modes = self.ncaam_hockey_config.get("display_modes", {})
display_modes = self.mode_config.get("display_modes", {})
self.recent_enabled = display_modes.get("ncaam_hockey_recent", False)
self.upcoming_enabled = display_modes.get("ncaam_hockey_upcoming", False)
self.live_enabled = display_modes.get("ncaam_hockey_live", False)
self.logger.setLevel(logging.INFO)
self.display_width = self.display_manager.matrix.width
self.display_height = self.display_manager.matrix.height
self._logo_cache = {}
# Initialize team rankings cache
self._team_rankings_cache = {}
self._rankings_cache_timestamp = 0
self._rankings_cache_duration = 3600 # Cache rankings for 1 hour
self.logger.info(f"Initialized NCAAMHockey manager with display dimensions: {self.display_width}x{self.display_height}")
self.logger.info(f"Logo directory: {self.logo_dir}")
self.logger.info(f"Display modes - Recent: {self.recent_enabled}, Upcoming: {self.upcoming_enabled}, Live: {self.live_enabled}")
@@ -163,44 +102,7 @@ class BaseNCAAMHockeyManager: # Renamed class
return False
def _fetch_odds(self, game: Dict) -> None:
"""Fetch odds for a specific game if conditions are met."""
# Check if odds should be shown for this sport
if not self.show_odds:
return
# Check if we should only fetch for favorite teams
is_favorites_only = self.ncaam_hockey_config.get("show_favorite_teams_only", False)
if is_favorites_only:
home_abbr = game.get('home_abbr')
away_abbr = game.get('away_abbr')
if not (home_abbr in self.favorite_teams or away_abbr in self.favorite_teams):
self.logger.debug(f"Skipping odds fetch for non-favorite game in favorites-only mode: {away_abbr}@{home_abbr}")
return
self.logger.debug(f"Proceeding with odds fetch for game: {game.get('id', 'N/A')}")
# Fetch odds using OddsManager (ESPN API)
try:
# Determine update interval based on game state
is_live = game.get('status', '').lower() == 'in'
update_interval = self.ncaam_hockey_config.get("live_odds_update_interval", 60) if is_live \
else self.ncaam_hockey_config.get("odds_update_interval", 3600)
odds_data = self.odds_manager.get_odds(
sport="hockey",
league="mens-college-hockey",
event_id=game['id'],
update_interval_seconds=update_interval
)
if odds_data:
game['odds'] = odds_data
self.logger.debug(f"Successfully fetched and attached odds for game {game['id']}")
else:
self.logger.debug(f"No odds data returned for game {game['id']}")
except Exception as e:
self.logger.error(f"Error fetching odds for game {game.get('id', 'N/A')}: {e}")
super()._fetch_odds(game, "mens-college-hockey")
def _fetch_ncaa_fb_api_data(self, use_cache: bool = True) -> Optional[Dict]:
"""
@@ -217,7 +119,7 @@ class BaseNCAAMHockeyManager: # Renamed class
for year in years_to_check:
cache_key = f"ncaamh_schedule_{year}"
if use_cache:
cached_data = self.cache_manager.get(cache_key, max_age=self.season_cache_duration)
cached_data = self.cache_manager.get(cache_key)
if cached_data:
self.logger.info(f"[NCAAMH] Using cached schedule for {year}")
all_events.extend(cached_data)
@@ -243,731 +145,53 @@ class BaseNCAAMHockeyManager: # Renamed class
return {'events': all_events}
def _fetch_data(self, date_str: str = None) -> Optional[Dict]:
"""
Fetch data using background service cache first, fallback to direct API call.
This eliminates redundant caching and ensures Recent/Upcoming managers
use the same data source as the background service.
"""
# For Live managers, always fetch fresh data
def _fetch_data(self) -> Optional[Dict]:
"""Fetch data using shared data mechanism or direct fetch for live."""
if isinstance(self, NCAAMHockeyLiveManager):
return self._fetch_ncaa_fb_api_data(use_cache=False)
else:
return self._fetch_ncaa_fb_api_data(use_cache=True)
# For Recent/Upcoming managers, try to use background service cache first
from datetime import datetime
import pytz
cache_key = f"ncaam_hockey_{datetime.now(pytz.utc).strftime('%Y%m%d')}"
# Check if background service has fresh data
if self.cache_manager.is_background_data_available(cache_key, 'ncaam_hockey'):
cached_data = self.cache_manager.get_background_cached_data(cache_key, 'ncaam_hockey')
if cached_data:
self.logger.info(f"[NCAAMHockey] Using background service cache for {cache_key}")
return cached_data
# Fallback to direct API call if background data not available
self.logger.info(f"[NCAAMHockey] Background data not available, fetching directly for {cache_key}")
return self._fetch_ncaa_fb_api_data(use_cache=True)
def _load_fonts(self):
"""Load fonts used by the scoreboard."""
fonts = {}
try:
fonts['score'] = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 12)
fonts['time'] = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
fonts['team'] = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
fonts['status'] = ImageFont.truetype("assets/fonts/4x6-font.ttf", 6)
logging.info("[NCAAMH] Successfully loaded Press Start 2P font for all text elements")
except IOError:
logging.warning("[NCAAMH] Press Start 2P font not found, trying 4x6 font.")
try:
# Try to load the 4x6 font as a fallback
fonts['score'] = ImageFont.truetype("assets/fonts/4x6-font.ttf", 12)
fonts['time'] = ImageFont.truetype("assets/fonts/4x6-font.ttf", 8)
fonts['team'] = ImageFont.truetype("assets/fonts/4x6-font.ttf", 8)
fonts['status'] = ImageFont.truetype("assets/fonts/4x6-font.ttf", 9)
logging.info("[NCAAMH] Successfully loaded 4x6 font for all text elements")
except IOError:
logging.warning("[NCAAMH] 4x6 font not found, using default PIL font.")
# Use default PIL font as a last resort
fonts['score'] = ImageFont.load_default()
fonts['time'] = ImageFont.load_default()
fonts['team'] = ImageFont.load_default()
fonts['status'] = ImageFont.load_default()
return fonts
def _draw_text_with_outline(self, draw, text, position, font, fill=(255, 255, 255), outline_color=(0, 0, 0)):
"""
Draw text with a black outline for better readability.
Args:
draw: ImageDraw object
text: Text to draw
position: (x, y) position to draw the text
font: Font to use
fill: Text color (default: white)
outline_color: Outline color (default: black)
"""
x, y = position
# Draw the outline by drawing the text in black at 8 positions around the text
for dx, dy in [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]:
draw.text((x + dx, y + dy), text, font=font, fill=outline_color)
# Draw the text in the specified color
draw.text((x, y), text, font=font, fill=fill)
def _load_and_resize_logo(self, team_abbrev: str, team_name: str = None) -> Optional[Image.Image]:
"""Load and resize a team logo, with caching and automatic download if missing."""
if team_abbrev in self._logo_cache:
return self._logo_cache[team_abbrev]
logo_path = os.path.join(self.logo_dir, f"{team_abbrev}.png")
self.logger.debug(f"Logo path: {logo_path}")
try:
# Try to download missing logo first
if not os.path.exists(logo_path):
self.logger.info(f"Logo not found for {team_abbrev} at {logo_path}. Attempting to download.")
# Try to download the logo from ESPN API
success = download_missing_logo(team_abbrev, 'ncaam_hockey', team_name)
if not success:
# Create placeholder if download fails
self.logger.warning(f"Failed to download logo for {team_abbrev}. Creating placeholder.")
os.makedirs(os.path.dirname(logo_path), exist_ok=True)
logo = Image.new('RGBA', (32, 32), (200, 200, 200, 255)) # Gray placeholder
draw = ImageDraw.Draw(logo)
draw.text((2, 10), team_abbrev, fill=(0, 0, 0, 255))
logo.save(logo_path)
self.logger.info(f"Created placeholder logo at {logo_path}")
logo = Image.open(logo_path)
if logo.mode != 'RGBA':
logo = logo.convert('RGBA')
max_width = int(self.display_width * 1.5)
max_height = int(self.display_height * 1.5)
logo.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
self._logo_cache[team_abbrev] = logo
return logo
except Exception as e:
self.logger.error(f"Error loading logo for {team_abbrev}: {e}", exc_info=True)
return None
def _extract_game_details(self, game_event: Dict) -> Optional[Dict]:
"""Extract relevant game details from ESPN API response."""
if not game_event:
return None
try:
competition = game_event["competitions"][0]
status = competition["status"]
competitors = competition["competitors"]
game_date_str = game_event["date"]
# Parse game date/time
try:
start_time_utc = datetime.fromisoformat(game_date_str.replace("Z", "+00:00"))
self.logger.debug(f"[NCAAMH] Parsed game time: {start_time_utc}")
except ValueError:
logging.warning(f"[NCAAMH] Could not parse game date: {game_date_str}")
start_time_utc = None
home_team = next(c for c in competitors if c.get("homeAway") == "home")
away_team = next(c for c in competitors if c.get("homeAway") == "away")
home_record = home_team.get('records', [{}])[0].get('summary', '') if home_team.get('records') else ''
away_record = away_team.get('records', [{}])[0].get('summary', '') if away_team.get('records') else ''
# Don't show "0-0" records - set to blank instead
if home_record == "0-0":
home_record = ''
if away_record == "0-0":
away_record = ''
# Format game time and date for display
game_time = ""
game_date = ""
if start_time_utc:
# Convert to local time
local_time = start_time_utc.astimezone(self._get_timezone())
game_time = local_time.strftime("%-I:%M%p")
# Check date format from config
use_short_date_format = self.config.get('display', {}).get('use_short_date_format', False)
if use_short_date_format:
game_date = local_time.strftime("%-m/%-d")
else:
game_date = self.display_manager.format_date_with_ordinal(local_time)
details = {
"start_time_utc": start_time_utc,
"status_text": status["type"]["shortDetail"],
"period": status.get("period", 0),
"clock": status.get("displayClock", "0:00"),
"is_live": status["type"]["state"] in ("in", "halftime"),
"is_final": status["type"]["state"] == "post",
"is_upcoming": status["type"]["state"] == "pre",
"home_abbr": home_team["team"]["abbreviation"],
"home_score": home_team.get("score", "0"),
"home_record": home_record,
"home_logo_path": os.path.join(self.logo_dir, f"{home_team['team']['abbreviation']}.png"),
"away_abbr": away_team["team"]["abbreviation"],
"away_score": away_team.get("score", "0"),
"away_record": away_record,
"away_logo_path": os.path.join(self.logo_dir, f"{away_team['team']['abbreviation']}.png"),
"game_time": game_time,
"game_date": game_date,
"id": game_event.get("id")
}
# Log game details for debugging
self.logger.debug(f"[NCAAMH] Extracted game details: {details['away_abbr']} vs {details['home_abbr']}")
# Use .get() to avoid KeyError if optional keys are missing
self.logger.debug(
f"[NCAAMH] Game status: is_final={details.get('is_final')}, "
f"is_upcoming={details.get('is_upcoming')}, is_live={details.get('is_live')}"
)
# Validate logo files
for team in ["home", "away"]:
logo_path = details[f"{team}_logo_path"]
if not os.path.isfile(logo_path):
# logging.warning(f"[NCAAMH] {team.title()} logo not found: {logo_path}")
details[f"{team}_logo_path"] = None
else:
try:
with Image.open(logo_path) as img:
logging.debug(f"[NCAAMH] {team.title()} logo is valid: {img.format}, size: {img.size}")
except Exception as e:
logging.error(f"[NCAAMH] {team.title()} logo file exists but is not valid: {e}")
details[f"{team}_logo_path"] = None
return details
except Exception as e:
logging.error(f"[NCAAMH] Error extracting game details: {e}")
return None
def _draw_scorebug_layout(self, game: Dict, force_clear: bool = False) -> None:
"""Draw the scorebug layout for the current game."""
try:
# Create a new black image for the main display
main_img = Image.new('RGBA', (self.display_width, self.display_height), (0, 0, 0, 255))
# Load logos once
home_logo = self._load_and_resize_logo(game["home_abbr"])
away_logo = self._load_and_resize_logo(game["away_abbr"])
if not home_logo or not away_logo:
self.logger.error("Failed to load one or both team logos")
return
# Create a single overlay for both logos
overlay = Image.new('RGBA', (self.display_width, self.display_height), (0, 0, 0, 0))
# Calculate vertical center line for alignment
center_y = self.display_height // 2
# Draw home team logo (far right, extending beyond screen)
home_x = self.display_width - home_logo.width + 2
home_y = center_y - (home_logo.height // 2)
# Paste the home logo onto the overlay
overlay.paste(home_logo, (home_x, home_y), home_logo)
# Draw away team logo (far left, extending beyond screen)
away_x = -2
away_y = center_y - (away_logo.height // 2)
# Paste the away logo onto the overlay
overlay.paste(away_logo, (away_x, away_y), away_logo)
# Composite the overlay with the main image
main_img = Image.alpha_composite(main_img, overlay)
# Convert to RGB for final display
main_img = main_img.convert('RGB')
draw = ImageDraw.Draw(main_img)
# Check if this is an upcoming game
is_upcoming = game.get("is_upcoming", False)
if is_upcoming:
# For upcoming games, show date and time stacked in the center
game_date = game.get("game_date", "")
game_time = game.get("game_time", "")
# Show "Next Game" at the top
status_text = "Next Game"
status_width = draw.textlength(status_text, font=self.fonts['status'])
status_x = (self.display_width - status_width) // 2
status_y = 2
self._draw_text_with_outline(draw, status_text, (status_x, status_y), self.fonts['status'])
# Calculate position for the date text (centered horizontally, below "Next Game")
date_width = draw.textlength(game_date, font=self.fonts['time'])
date_x = (self.display_width - date_width) // 2
date_y = center_y - 5 # Position in center
self._draw_text_with_outline(draw, game_date, (date_x, date_y), self.fonts['time'])
# Calculate position for the time text (centered horizontally, in center)
time_width = draw.textlength(game_time, font=self.fonts['time'])
time_x = (self.display_width - time_width) // 2
time_y = date_y + 10 # Position below date
self._draw_text_with_outline(draw, game_time, (time_x, time_y), self.fonts['time'])
else:
# For live/final games, show scores and period/time
home_score = str(game.get("home_score", "0"))
away_score = str(game.get("away_score", "0"))
score_text = f"{away_score}-{home_score}"
# Calculate position for the score text (centered at the bottom)
score_width = draw.textlength(score_text, font=self.fonts['score'])
score_x = (self.display_width - score_width) // 2
score_y = self.display_height - 15
self._draw_text_with_outline(draw, score_text, (score_x, score_y), self.fonts['score'])
# Draw period and time or Final
if game.get("is_final", False):
status_text = "Final"
else:
period = game.get("period", 0)
clock = game.get("clock", "0:00")
# Format period text
if period > 3:
period_text = "OT"
else:
period_text = f"{period}{'st' if period == 1 else 'nd' if period == 2 else 'rd'}"
status_text = f"{period_text} {clock}"
# Calculate position for the status text (centered at the top)
status_width = draw.textlength(status_text, font=self.fonts['time'])
status_x = (self.display_width - status_width) // 2
status_y = 5
self._draw_text_with_outline(draw, status_text, (status_x, status_y), self.fonts['time'])
# Display odds if available
if 'odds' in game:
odds = game['odds']
spread = odds.get('spread', {}).get('point', None)
if spread is not None:
# Format spread text
spread_text = f"{spread:+.1f}" if spread > 0 else f"{spread:.1f}"
# Choose color and position based on which team has the spread
if odds.get('spread', {}).get('team') == game['home_abbr']:
text_color = (255, 100, 100) # Reddish
spread_x = self.display_width - draw.textlength(spread_text, font=self.fonts['status']) - 2
else:
text_color = (100, 255, 100) # Greenish
spread_x = 2
spread_y = 0
self._draw_text_with_outline(draw, spread_text, (spread_x, spread_y), self.fonts['status'], fill=text_color)
# Draw records if enabled
if self.show_records:
try:
record_font = ImageFont.truetype("assets/fonts/4x6-font.ttf", 6)
except IOError:
record_font = ImageFont.load_default()
away_record = game.get('away_record', '')
home_record = game.get('home_record', '')
record_bbox = draw.textbbox((0,0), "0-0", font=record_font)
record_height = record_bbox[3] - record_bbox[1]
record_y = self.display_height - record_height
if away_record:
away_record_x = 2
self._draw_text_with_outline(draw, away_record, (away_record_x, record_y), record_font)
if home_record:
home_record_bbox = draw.textbbox((0,0), home_record, font=record_font)
home_record_width = home_record_bbox[2] - home_record_bbox[0]
home_record_x = self.display_width - home_record_width - 2
self._draw_text_with_outline(draw, home_record, (home_record_x, record_y), record_font)
# Display the image
self.display_manager.image.paste(main_img, (0, 0))
self.display_manager.update_display()
except Exception as e:
self.logger.error(f"Error displaying game: {e}", exc_info=True)
def display(self, force_clear: bool = False) -> None:
"""Common display method for all NCAAMH managers"""
if not self.current_game:
current_time = time.time()
if not hasattr(self, '_last_warning_time'):
self._last_warning_time = 0
if current_time - self._last_warning_time > 300: # 5 minutes cooldown
self.logger.warning("[NCAAMH] No game data available to display")
self._last_warning_time = current_time
return
self._draw_scorebug_layout(self.current_game, force_clear)
class NCAAMHockeyLiveManager(BaseNCAAMHockeyManager): # Renamed class
class NCAAMHockeyLiveManager(BaseNCAAMHockeyManager, HockeyLive): # Renamed class
"""Manager for live NCAA Mens Hockey games."""
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager):
super().__init__(config, display_manager, cache_manager)
self.update_interval = self.ncaam_hockey_config.get("live_update_interval", 15) # 15 seconds for live games
self.no_data_interval = 300 # 5 minutes when no live games
self.last_update = 0
self.logger.info("Initialized NCAA Mens Hockey Live Manager")
self.live_games = [] # List to store all live games
self.current_game_index = 0 # Index to track which game to show
self.last_game_switch = 0 # Track when we last switched games
self.game_display_duration = self.ncaam_hockey_config.get("live_game_duration", 20) # Display each live game for 20 seconds
self.last_display_update = 0 # Track when we last updated the display
self.last_log_time = 0
self.log_interval = 300 # Only log status every 5 minutes
self.logger = logging.getLogger('NCAAMHockeyLiveManager') # Changed logger name
# Initialize with test game only if test mode is enabled
if self.test_mode:
self.current_game = {
"id": "401596361",
"home_abbr": "RIT",
"away_abbr": "PU",
"away_abbr": "CLAR ",
"home_score": "3",
"away_score": "2",
"period": 2,
"period_text": "1st",
"home_id": "178",
"away_id": "2137",
"clock": "12:34",
"home_logo_path": os.path.join(self.logo_dir, "RIT.png"),
"away_logo_path": os.path.join(self.logo_dir, "PU.png"),
"home_logo_path": Path(self.logo_dir, "RIT.png"),
"away_logo_path": Path(self.logo_dir, "CLAR .png"),
"game_time": "7:30 PM",
"game_date": "Apr 17"
}
self.live_games = [self.current_game]
logging.info("[NCAAMH] Initialized NCAAMHockeyLiveManager with test game: RIT vs PU")
self.logger.info("Initialized NCAAMHockeyLiveManager with test game: RIT vs CLAR ")
else:
logging.info("[NCAAMH] Initialized NCAAMHockeyLiveManager in live mode")
self.logger.info("Initialized NCAAMHockeyLiveManager in live mode")
def update(self):
"""Update live game data."""
if not self.is_enabled: return
current_time = time.time()
interval = self.no_data_interval if not self.live_games else self.update_interval
if current_time - self.last_update >= interval:
self.last_update = current_time
if self.test_mode:
# For testing, we'll just update the clock to show it's working
if self.current_game:
minutes = int(self.current_game["clock"].split(":")[0])
seconds = int(self.current_game["clock"].split(":")[1])
seconds -= 1
if seconds < 0:
seconds = 59
minutes -= 1
if minutes < 0:
minutes = 19
if self.current_game["period"] < 3:
self.current_game["period"] += 1
else:
self.current_game["period"] = 1
self.current_game["clock"] = f"{minutes:02d}:{seconds:02d}"
# Always update display in test mode
self.display(force_clear=True)
else:
# Fetch live game data from ESPN API
data = self._fetch_data()
if data and "events" in data:
# Find all live games involving favorite teams
new_live_games = []
for event in data["events"]:
details = self._extract_game_details(event)
if details and details["is_live"]:
self._fetch_odds(details)
new_live_games.append(details)
# Filter for favorite teams only if the config is set
if self.ncaam_hockey_config.get("show_favorite_teams_only", False):
new_live_games = [game for game in new_live_games
if game['home_abbr'] in self.favorite_teams or
game['away_abbr'] in self.favorite_teams]
# Only log if there's a change in games or enough time has passed
should_log = (
current_time - self.last_log_time >= self.log_interval or
len(new_live_games) != len(self.live_games) or
not self.live_games # Log if we had no games before
)
if should_log:
if new_live_games:
filter_text = "favorite teams" if self.ncaam_hockey_config.get("show_favorite_teams_only", False) else "all teams"
self.logger.info(f"[NCAAMH] Found {len(new_live_games)} live games involving {filter_text}")
for game in new_live_games:
self.logger.info(f"[NCAAMH] Live game: {game['away_abbr']} vs {game['home_abbr']} - Period {game['period']}, {game['clock']}")
else:
filter_text = "favorite teams" if self.ncaam_hockey_config.get("show_favorite_teams_only", False) else "criteria"
self.logger.info(f"[NCAAMH] No live games found matching {filter_text}")
self.last_log_time = current_time
if new_live_games:
# Update the current game with the latest data
for new_game in new_live_games:
if self.current_game and (
(new_game["home_abbr"] == self.current_game["home_abbr"] and
new_game["away_abbr"] == self.current_game["away_abbr"]) or
(new_game["home_abbr"] == self.current_game["away_abbr"] and
new_game["away_abbr"] == self.current_game["home_abbr"])
):
self.current_game = new_game
break
# Only update the games list if we have new games
if not self.live_games or set(game["away_abbr"] + game["home_abbr"] for game in new_live_games) != set(game["away_abbr"] + game["home_abbr"] for game in self.live_games):
self.live_games = new_live_games
# If we don't have a current game or it's not in the new list, start from the beginning
if not self.current_game or self.current_game not in self.live_games:
self.current_game_index = 0
self.current_game = self.live_games[0]
self.last_game_switch = current_time
# Update display if data changed, limit rate
if current_time - self.last_display_update >= 1.0:
# self.display(force_clear=True) # REMOVED: DisplayController handles this
self.last_display_update = current_time
else:
# No live games found
self.live_games = []
self.current_game = None
# Check if it's time to switch games
if len(self.live_games) > 1 and (current_time - self.last_game_switch) >= self.game_display_duration:
self.current_game_index = (self.current_game_index + 1) % len(self.live_games)
self.current_game = self.live_games[self.current_game_index]
self.last_game_switch = current_time
# self.display(force_clear=True) # REMOVED: DisplayController handles this
self.last_display_update = current_time # Track time for potential display update
def display(self, force_clear=False):
"""Display live game information."""
if not self.current_game:
return
super().display(force_clear) # Call parent class's display method
class NCAAMHockeyRecentManager(BaseNCAAMHockeyManager):
class NCAAMHockeyRecentManager(BaseNCAAMHockeyManager, SportsRecent):
"""Manager for recently completed NCAAMH games."""
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager):
super().__init__(config, display_manager, cache_manager)
self.recent_games = []
self.current_game_index = 0
self.last_update = 0
self.update_interval = self.ncaam_hockey_config.get("recent_update_interval", 3600) # Use config, default 1 hour
self.recent_games_to_show = self.ncaam_hockey_config.get("recent_games_to_show", 5) # Number of most recent games to display
self.last_game_switch = 0
self.game_display_duration = 15 # Display each game for 15 seconds
self.logger = logging.getLogger('NCAAMHockeyRecentManager') # Changed logger name
self.logger.info(f"Initialized NCAAMHRecentManager with {len(self.favorite_teams)} favorite teams")
def update(self):
"""Update recent games data."""
current_time = time.time()
if current_time - self.last_update < self.update_interval:
return
self.last_update = current_time
try:
# Fetch data from ESPN API
data = self._fetch_data()
if not data or 'events' not in data:
self.logger.warning("[NCAAMH] No events found in ESPN API response")
return
events = data['events']
self.logger.info(f"[NCAAMH] Successfully fetched {len(events)} events from ESPN API")
# Process games
processed_games = []
for event in events:
game = self._extract_game_details(event)
if game and game['is_final']:
# Fetch odds if enabled
self._fetch_odds(game)
processed_games.append(game)
# Filter for favorite teams only if the config is set
if self.ncaam_hockey_config.get("show_favorite_teams_only", False):
team_games = [game for game in processed_games
if game['home_abbr'] in self.favorite_teams or
game['away_abbr'] in self.favorite_teams]
else:
team_games = processed_games
# Sort games by start time, most recent first, then limit to recent_games_to_show
team_games.sort(key=lambda x: x.get('start_time_utc') or datetime.min.replace(tzinfo=timezone.utc), reverse=True)
team_games = team_games[:self.recent_games_to_show]
self.logger.info(f"[NCAAMH] Found {len(team_games)} recent games for favorite teams (limited to {self.recent_games_to_show})")
new_game_ids = {g['id'] for g in team_games}
current_game_ids = {g['id'] for g in getattr(self, 'games_list', [])}
if new_game_ids != current_game_ids:
self.games_list = team_games
self.current_game_index = 0
self.current_game = self.games_list[0] if self.games_list else None
self.last_game_switch = current_time
elif self.games_list:
self.current_game = self.games_list[self.current_game_index]
if not self.games_list:
self.current_game = None
except Exception as e:
self.logger.error(f"[NCAAMH] Error updating recent games: {e}", exc_info=True)
def display(self, force_clear=False):
"""Display recent games."""
if not self.games_list:
self.logger.info("[NCAAMH] No recent games to display")
return # Skip display update entirely
try:
current_time = time.time()
# Check if it's time to switch games
if len(self.games_list) > 1 and current_time - self.last_game_switch >= self.game_display_duration:
# Move to next game
self.current_game_index = (self.current_game_index + 1) % len(self.games_list)
self.current_game = self.games_list[self.current_game_index]
self.last_game_switch = current_time
force_clear = True # Force clear when switching games
# Draw the scorebug layout
self._draw_scorebug_layout(self.current_game, force_clear)
# Update display
self.display_manager.update_display()
except Exception as e:
self.logger.error(f"[NCAAMH] Error displaying recent game: {e}", exc_info=True)
class NCAAMHockeyUpcomingManager(BaseNCAAMHockeyManager):
class NCAAMHockeyUpcomingManager(BaseNCAAMHockeyManager, SportsUpcoming):
"""Manager for upcoming NCAA Mens Hockey games."""
def __init__(self, config: Dict[str, Any], display_manager: DisplayManager, cache_manager: CacheManager):
super().__init__(config, display_manager, cache_manager)
self.upcoming_games = []
self.current_game_index = 0
self.last_update = 0
self.update_interval = self.ncaam_hockey_config.get("upcoming_update_interval", 3600) # Use config, default 1 hour
self.upcoming_games_to_show = self.ncaam_hockey_config.get("upcoming_games_to_show", 5) # Number of upcoming games to display
self.last_log_time = 0
self.log_interval = 300 # Only log status every 5 minutes
self.last_warning_time = 0
self.warning_cooldown = 300 # Only show warning every 5 minutes
self.last_game_switch = 0 # Track when we last switched games
self.game_display_duration = 15 # Display each game for 15 seconds
self.logger = logging.getLogger('NCAAMHockeyUpcomingManager') # Changed logger name
self.logger.info(f"Initialized NCAAMHUpcomingManager with {len(self.favorite_teams)} favorite teams")
def update(self):
"""Update upcoming games data."""
current_time = time.time()
if current_time - self.last_update < self.update_interval:
return
self.last_update = current_time
try:
# Fetch data from ESPN API
data = self._fetch_data()
if not data or 'events' not in data:
self.logger.warning("[NCAAMH] No events found in ESPN API response")
return
events = data['events']
self.logger.info(f"[NCAAMH] Successfully fetched {len(events)} events from ESPN API")
# Process games
new_upcoming_games = []
for event in events:
game = self._extract_game_details(event)
if game and game['is_upcoming']:
# Only fetch odds for games that will be displayed
if self.ncaam_hockey_config.get("show_favorite_teams_only", False):
if not self.favorite_teams or (game['home_abbr'] not in self.favorite_teams and game['away_abbr'] not in self.favorite_teams):
continue
self._fetch_odds(game)
new_upcoming_games.append(game)
# Filter for favorite teams only if the config is set
if self.ncaam_hockey_config.get("show_favorite_teams_only", False):
team_games = [game for game in new_upcoming_games
if game['home_abbr'] in self.favorite_teams or
game['away_abbr'] in self.favorite_teams]
else:
team_games = new_upcoming_games
# Sort games by start time, soonest first, then limit to configured count
team_games.sort(key=lambda x: x.get('start_time_utc') or datetime.max.replace(tzinfo=timezone.utc))
team_games = team_games[:self.upcoming_games_to_show]
# Only log if there's a change in games or enough time has passed
should_log = (
current_time - self.last_log_time >= self.log_interval or
len(team_games) != len(self.upcoming_games) or
not self.upcoming_games # Log if we had no games before
)
if should_log:
if team_games:
self.logger.info(f"[NCAAMH] Found {len(team_games)} upcoming games for favorite teams (limited to {self.upcoming_games_to_show})")
for game in team_games:
self.logger.info(f"[NCAAMH] Upcoming game: {game['away_abbr']} vs {game['home_abbr']} - {game['game_date']} {game['game_time']}")
else:
self.logger.info("[NCAAMH] No upcoming games found for favorite teams")
self.logger.debug(f"[NCAAMH] Favorite teams: {self.favorite_teams}")
self.last_log_time = current_time
self.upcoming_games = team_games
if self.upcoming_games:
if not self.current_game or self.current_game['id'] not in {g['id'] for g in self.upcoming_games}:
self.current_game_index = 0
self.current_game = self.upcoming_games[0]
self.last_game_switch = current_time
else:
self.current_game = None
except Exception as e:
self.logger.error(f"[NCAAMH] Error updating upcoming games: {e}", exc_info=True)
def display(self, force_clear=False):
"""Display upcoming games."""
if not self.upcoming_games:
current_time = time.time()
if current_time - self.last_warning_time > self.warning_cooldown:
self.logger.info("[NCAAMH] No upcoming games to display")
self.last_warning_time = current_time
return # Skip display update entirely
try:
current_time = time.time()
# Check if it's time to switch games
if len(self.upcoming_games) > 1 and current_time - self.last_game_switch >= self.game_display_duration:
# Move to next game
self.current_game_index = (self.current_game_index + 1) % len(self.upcoming_games)
self.current_game = self.upcoming_games[self.current_game_index]
self.last_game_switch = current_time
force_clear = True # Force clear when switching games
# Draw the scorebug layout
self._draw_scorebug_layout(self.current_game, force_clear)
# Update display
self.display_manager.update_display()
except Exception as e:
self.logger.error(f"[NCAAMH] Error displaying upcoming game: {e}", exc_info=True)

File diff suppressed because it is too large Load Diff

View File

@@ -7,9 +7,9 @@ from datetime import datetime, timedelta, timezone
import os
from PIL import Image, ImageDraw, ImageFont
import pytz
from pathlib import Path
from src.display_manager import DisplayManager
from src.cache_manager import CacheManager
from src.config_manager import ConfigManager
from src.odds_manager import OddsManager
from src.logo_downloader import download_missing_logo
from src.background_data_service import get_background_service
@@ -315,28 +315,33 @@ class OddsTickerManager:
logger.error(f"Error fetching team rankings: {e}")
return {}
def _get_team_logo(self, team_abbr: str, logo_dir: str, league: str = None, team_name: str = None) -> Optional[Image.Image]:
def convert_image(self, logo_path: Path) -> Optional[Image.Image]:
if logo_path.exists():
logo = Image.open(logo_path)
# Convert palette images with transparency to RGBA to avoid PIL warnings
if logo.mode == 'P' and 'transparency' in logo.info:
logo = logo.convert('RGBA')
logger.debug(f"Successfully loaded logo {logo_path}")
return logo
return None
def _get_team_logo(self, league: str, team_id: str, team_abbr: str, logo_dir: str) -> Optional[Image.Image]:
"""Get team logo from the configured directory, downloading if missing."""
if not team_abbr or not logo_dir:
logger.debug("Cannot get team logo with missing team_abbr or logo_dir")
return None
try:
logo_path = os.path.join(logo_dir, f"{team_abbr}.png")
logo_path = Path(logo_dir, f"{team_abbr}.png")
logger.debug(f"Attempting to load logo from path: {logo_path}")
if os.path.exists(logo_path):
logo = Image.open(logo_path)
# Convert palette images with transparency to RGBA to avoid PIL warnings
if logo.mode == 'P' and 'transparency' in logo.info:
logo = logo.convert('RGBA')
logger.debug(f"Successfully loaded logo for {team_abbr} from {logo_path}")
return logo
if (image := self.convert_image(logo_path)):
return image
else:
logger.warning(f"Logo not found at path: {logo_path}")
# Try to download the missing logo if we have league information
if league:
logger.info(f"Attempting to download missing logo for {team_abbr} in league {league}")
success = download_missing_logo(team_abbr, league, team_name)
success = download_missing_logo(league, team_id, team_abbr, logo_path, None)
if success:
# Try to load the downloaded logo
if os.path.exists(logo_path):
@@ -511,6 +516,8 @@ class OddsTickerManager:
competitors = event['competitions'][0]['competitors']
home_team = next(c for c in competitors if c['homeAway'] == 'home')
away_team = next(c for c in competitors if c['homeAway'] == 'away')
home_id = home_team['team']['id']
away_id = away_team['team']['id']
home_abbr = home_team['team']['abbreviation']
away_abbr = away_team['team']['abbreviation']
home_name = home_team['team'].get('name', home_abbr)
@@ -632,6 +639,8 @@ class OddsTickerManager:
game = {
'id': game_id,
'home_id': home_id,
'away_id': away_id,
'home_team': home_abbr,
'away_team': away_abbr,
'home_team_name': home_name,
@@ -1015,10 +1024,8 @@ class OddsTickerManager:
datetime_font = self.fonts['medium'] # Use large font for date/time
# Get team logos (with automatic download if missing)
home_logo = self._get_team_logo(game['home_team'], game['logo_dir'],
league=game.get('league'), team_name=game.get('home_team_name'))
away_logo = self._get_team_logo(game['away_team'], game['logo_dir'],
league=game.get('league'), team_name=game.get('away_team_name'))
home_logo = self._get_team_logo(game["league"], game['home_id'], game['home_team'], game['logo_dir'])
away_logo = self._get_team_logo(game["league"], game['away_id'], game['away_team'], game['logo_dir'])
broadcast_logo = None
# Enhanced broadcast logo debugging
@@ -1045,7 +1052,7 @@ class OddsTickerManager:
logger.info(f"Game {game.get('id')}: Final mapped logo name: '{logo_name}' from broadcast names: {broadcast_names}")
if logo_name:
broadcast_logo = self._get_team_logo(logo_name, 'assets/broadcast_logos')
broadcast_logo = self.convert_image(Path("assets/broadcast_logos",f"{logo_name}.png"))
if broadcast_logo:
logger.info(f"Game {game.get('id')}: Successfully loaded broadcast logo for '{logo_name}' - Size: {broadcast_logo.size}")
else: