6 Commits

Author SHA1 Message Date
Chuck
aae95a1015 refactor(api): resolve sudo/systemctl/reboot/poweroff paths at startup
Use shutil.which() with safe fallbacks for the four privileged binaries
instead of relying on bare names being resolved by the subprocess shell
search. Resolves paths once at module load rather than per-call.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 14:49:34 -04:00
Chuck
246ea54635 fix: address five review findings (Pillow CVEs, daemon exception narrowing, timeout handling, plugin store)
- march-madness/requirements.txt: Pillow>=12.2.0 (patches CVE-2026-42308
  and CVE-2026-42310; previous floor of 10.3.0 was insufficient)

- wifi_monitor_daemon: narrow final except Exception to
  (subprocess.SubprocessError, OSError) so programming errors in the NM
  restart block are no longer silently swallowed

- api_v3/execute_system_action: add explicit subprocess.TimeoutExpired
  handler before the generic Exception catch; returns action-specific
  message with 'status','message','returncode','stdout','stderr' fields
  so the UI receives a precise, actionable payload instead of the generic
  'Failed to execute system action' string

- plugins_manager.js: move searchPluginStore into .finally() so the
  plugin store renders regardless of whether loadInstalledPlugins succeeds
  or fails; .catch() still logs the error

- first_time_install.sh: add safe_plugin_rm.sh NOPASSWD rule to the
  /tmp/ledmatrix_web_sudoers block; configure_web_sudo.sh had this rule
  but the standalone installer never granted it, leaving plugin removal
  broken after first-time install

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 13:56:24 -04:00
Chuck
a0f957be9e fix: address five review findings (NM retry loop, start_display message, code quality)
- wifi_monitor_daemon: reset _consecutive_internet_failures = 0 in both
  NM-restart exception handlers; previously both left the counter at threshold,
  causing an immediate retry on the next iteration instead of waiting another
  full backoff period

- api_v3: fix start_display failure message — when mode is set and systemctl
  returns non-zero, message now includes the failure reason and a hint rather
  than always reporting success phrasing

- wifi_manager: move _redirect_backend from class variable to instance variable
  in __init__ alongside _ap_enabled_at; class-level default shadowed correctly
  in practice (single instance) but was misleading

- wifi_manager: narrow broad except Exception in _check_internet_connectivity
  to (subprocess.SubprocessError, OSError) for ping and OSError for HTTP
  (urllib.error.URLError is an OSError subclass in Python 3)

- wifi_manager: remove redundant local 'import re as _re' in _validate_ap_config;
  re is already imported at module level (line 37)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 13:26:51 -04:00
Chuck
76cd010aab fix: address five valid review findings; skip two
Fixed:
- march-madness/requirements.txt: Pillow>=10.3.0 (patches CVE-2024-28219;
  10.3.0 is the actual fix version — reviewer cited 12.2.0 but that risks
  breaking API changes without test coverage)
- wifi_monitor_daemon.py: add missing `import subprocess`; subprocess.run
  and CalledProcessError would NameError at runtime on the NM restart path
- wifi_manager.py: validate ap_idle_timeout_minutes before arithmetic —
  coerce to int, clamp 1–1440, fall back to 15 on bad config values
- wifi_manager.py: call _remove_nm_dnsmasq_captive_conf() on all three
  rollback paths in _enable_ap_mode_nmcli_hotspot() and in the top-level
  except block so stale dnsmasq drop-ins are never left behind
- api_v3.py: fix wrong_password prefix strip — removeprefix("wrong_password:")
  then lstrip() handles both "wrong_password: msg" and "wrong_password:msg"
- plugins_manager.js: add .catch() to loadInstalledPlugins().then() to
  surface failures instead of silently dropping unhandled rejections

Skipped:
- WiFiManager AP state persistence: architectural overhaul; _is_ap_mode_active()
  already derives from live system state, not in-memory variables
- Absolute subprocess paths in api_v3.py: paths vary by distro (/usr/bin vs
  /bin); web service has a normal PATH; sudoers already use resolved paths

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 13:26:51 -04:00
Chuck
587daa780e revert: restore AP-mode grace period to 90s (3 checks)
The counter reset after NM restart already fully prevents the SSH-lockout
cascade: _disconnected_checks can never accumulate across NM restarts
because it is reset to 0 before the next daemon iteration runs.

The 3→6 increase provided no additional fix for the described problem and
caused a UX regression: fresh Pi devices with no WiFi configured would
wait 3 minutes instead of 90 seconds for the LEDMatrix-Setup hotspot to
appear.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 13:26:31 -04:00
Chuck
c19df29a21 fix: service control buttons and AP-mode SSH lockout post-install
Two user-reported issues after fresh install:

1. All service buttons (Start/Stop/Restart Display, Restart Web Service)
   failed silently — only Reboot worked.

   Root cause: sudoers rules use `ledmatrix.service` (with suffix) but
   api_v3.py called `sudo systemctl start ledmatrix` (no suffix). sudo
   does exact string matching, so every service action was rejected with
   returncode=1. Also missing from sudoers: ledmatrix-web, journalctl,
   and is-active entries.

   Fix:
   - Add `.service` suffix to all 8 sudo systemctl call sites in
     api_v3.py (_ensure_display_service_running, _stop_display_service,
     and all execute_system_action branches).
   - Add timeout=15 to all subprocess.run calls in execute_system_action
     (previously could hang indefinitely).
   - Add missing sudoers rules to first_time_install.sh and
     configure_web_sudo.sh: ledmatrix-web.service start/stop/restart,
     is-active for both name forms, and journalctl -u/-t ledmatrix rules.

2. SSH and web UI became inaccessible after ~1 hour even though the
   display kept running.

   Root cause: wifi_monitor_daemon restarts NetworkManager after 5
   consecutive internet failures (~2.5 min). Each NM restart drops WiFi
   briefly. During that window check_and_manage_ap_mode() increments
   _disconnected_checks but the daemon never reset it after the restart.
   After 3 such NM-restart cycles, _disconnected_checks reached 3 and
   AP mode activated — changing the Pi from WiFi client to hotspot
   (192.168.4.1) and killing SSH on the old IP.

   Fix:
   - Reset wifi_manager._disconnected_checks = 0 in the daemon
     immediately after a successful NM restart so the brief drop it
     causes doesn't count toward AP-mode activation.
   - Increase _disconnected_checks_required from 3 to 6 (90s → 3min)
     as an additional buffer against transient network flaps.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 13:26:31 -04:00
148 changed files with 4327 additions and 5764 deletions

1
.gitmodules vendored
View File

@@ -1,4 +1,3 @@
[submodule "rpi-rgb-led-matrix-master"]
path = rpi-rgb-led-matrix-master
url = https://github.com/hzeller/rpi-rgb-led-matrix.git
branch = master

View File

@@ -1,10 +1,5 @@
# LEDMatrix
[![License](https://img.shields.io/badge/license-GPL--3.0-green)](LICENSE)
[![Discord](https://img.shields.io/badge/Discord-community-5865F2?logo=discord&logoColor=white)](https://discord.gg/RdrC37rEag)
[![GitHub Stars](https://img.shields.io/github/stars/ChuckBuilds/ledmatrix?style=flat&color=yellow)](https://github.com/ChuckBuilds/ledmatrix)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/77fc9b446a5948e5b0aed7a7aaeb1bab)](https://app.codacy.com/gh/ChuckBuilds/LEDMatrix/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_grade)
## Welcome to LEDMatrix!
Welcome to the LEDMatrix Project! This open-source project enables you to run an information-rich display on a Raspberry Pi connected to an LED RGB Matrix panel. Whether you want to see your calendar, weather forecasts, sports scores, stock prices, or any other information at a glance, LEDMatrix brings it all together.
@@ -132,15 +127,10 @@ The system supports live, recent, and upcoming game information for multiple spo
| This project can be finnicky! RGB LED Matrix displays are not built the same or to a high-quality standard. We have seen many displays arrive dead or partially working in our discord. Please purchase from a reputable vendor. |
### Raspberry Pi
- Raspberry Pi Zero's don't have enough processing power for this project.
- **Raspberry Pi 3B, 4, or 5**
- Raspberry Pi Zero's don't have enough processing power for this project and the Pi 5 is unsupported due to new GPIO output.
- **Raspberry Pi 3B or 4 (NOT RPi 5!)**
[Amazon Affiliate Link Raspberry Pi 4 4GB RAM](https://amzn.to/4dJixuX)
[Amazon Affiliate Link Raspberry Pi 4 8GB RAM](https://amzn.to/4qbqY7F)
- **Pi 5 users**: the installer automatically detects Pi 5 and builds the `rpi-rgb-led-matrix` library with RP1 support. If you previously installed on a Pi 4 and migrated the SD card, or if you see `mmap` errors in the logs, force a fresh library build:
```bash
sudo RPI_RGB_FORCE_REBUILD=1 ./first_time_install.sh
```
- Pi 5 config: leave `rp1_rio` at `0` (PIO mode, default) and set `gpio_slowdown` to `1` or `2`.
### RGB Matrix Bonnet / HAT
@@ -592,7 +582,7 @@ These settings control runtime behavior and GPIO timing:
- **Critical setting**: Must match your Raspberry Pi model for stability
- **Raspberry Pi 3**: Use 3
- **Raspberry Pi 4**: Use 4
- **Raspberry Pi 5**: Use 12 in PIO mode (`rp1_rio: 0`, the default); start with `1` and increase if you see flickering
- **Raspberry Pi 5**: Use 5 (or higher if needed)
- **Raspberry Pi Zero/1**: Use 1-2
- Incorrect values can cause display corruption, flickering, or system instability
- If you experience issues, try adjusting this value up or down by 1

View File

@@ -1,43 +1,43 @@
{
"web_display_autostart": true,
"schedule": {
"enabled": false,
"enabled": true,
"mode": "per-day",
"start_time": "07:00",
"end_time": "23:00",
"days": {
"monday": {
"enabled": false,
"enabled": true,
"start_time": "07:00",
"end_time": "23:00"
},
"tuesday": {
"enabled": false,
"enabled": true,
"start_time": "07:00",
"end_time": "23:00"
},
"wednesday": {
"enabled": false,
"enabled": true,
"start_time": "07:00",
"end_time": "23:00"
},
"thursday": {
"enabled": false,
"enabled": true,
"start_time": "07:00",
"end_time": "23:00"
},
"friday": {
"enabled": false,
"enabled": true,
"start_time": "07:00",
"end_time": "23:00"
},
"saturday": {
"enabled": false,
"enabled": true,
"start_time": "07:00",
"end_time": "23:00"
},
"sunday": {
"enabled": false,
"enabled": true,
"start_time": "07:00",
"end_time": "23:00"
}
@@ -51,46 +51,46 @@
"end_time": "07:00",
"days": {
"monday": {
"enabled": false,
"enabled": true,
"start_time": "20:00",
"end_time": "07:00"
},
"tuesday": {
"enabled": false,
"enabled": true,
"start_time": "20:00",
"end_time": "07:00"
},
"wednesday": {
"enabled": false,
"enabled": true,
"start_time": "20:00",
"end_time": "07:00"
},
"thursday": {
"enabled": false,
"enabled": true,
"start_time": "20:00",
"end_time": "07:00"
},
"friday": {
"enabled": false,
"enabled": true,
"start_time": "20:00",
"end_time": "07:00"
},
"saturday": {
"enabled": false,
"enabled": true,
"start_time": "20:00",
"end_time": "07:00"
},
"sunday": {
"enabled": false,
"enabled": true,
"start_time": "20:00",
"end_time": "07:00"
}
}
},
"timezone": "America/New_York",
"timezone": "America/Chicago",
"location": {
"city": "Tampa",
"state": "Florida",
"city": "Dallas",
"state": "Texas",
"country": "US"
},
"display": {
@@ -112,8 +112,7 @@
"limit_refresh_rate_hz": 100
},
"runtime": {
"gpio_slowdown": 3,
"rp1_rio": 0
"gpio_slowdown": 3
},
"display_durations": {},
"use_short_date_format": true,
@@ -127,11 +126,6 @@
"buffer_ahead": 2
}
},
"sync": {
"role": "standalone",
"port": 5765,
"follower_position": "left"
},
"plugin_system": {
"plugins_directory": "plugin-repos",
"auto_discover": true,

View File

@@ -34,16 +34,16 @@ This document outlines the transformation of the LEDMatrix project into a modula
## Table of Contents
1. [Current Architecture Analysis](#1-current-architecture-analysis)
2. [Plugin System Design](#2-plugin-system-design)
3. [Plugin Store & Discovery](#3-plugin-store--discovery)
4. [Web UI Transformation](#4-web-ui-transformation)
5. [Migration Strategy](#5-migration-strategy)
6. [Plugin Developer Guidelines](#6-plugin-developer-guidelines)
7. [Technical Implementation Details](#7-technical-implementation-details)
8. [Best Practices & Standards](#8-best-practices--standards)
9. [Security Considerations](#9-security-considerations)
10. [Implementation Roadmap](#10-implementation-roadmap)
1. [Current Architecture Analysis](#current-architecture-analysis)
2. [Plugin System Design](#plugin-system-design)
3. [Plugin Store & Discovery](#plugin-store--discovery)
4. [Web UI Transformation](#web-ui-transformation)
5. [Migration Strategy](#migration-strategy)
6. [Plugin Developer Guidelines](#plugin-developer-guidelines)
7. [Technical Implementation Details](#technical-implementation-details)
8. [Best Practices & Standards](#best-practices--standards)
9. [Security Considerations](#security-considerations)
10. [Implementation Roadmap](#implementation-roadmap)
---

View File

@@ -36,17 +36,9 @@ if [ -r /proc/device-tree/model ]; then
DEVICE_MODEL=$(tr -d '\0' </proc/device-tree/model)
echo "Detected device: $DEVICE_MODEL"
else
DEVICE_MODEL=""
echo "⚠ Could not detect Raspberry Pi model (continuing anyway)"
fi
# Detect Pi 5 for hardware-specific install decisions (RP1 library verification)
IS_PI5=0
if echo "${DEVICE_MODEL:-}" | grep -qi "Raspberry Pi 5"; then
IS_PI5=1
echo "Raspberry Pi 5 detected — will verify RP1 library support."
fi
# Check OS version - must be Raspberry Pi OS Lite (Trixie)
echo ""
echo "Checking operating system requirements..."
@@ -267,6 +259,8 @@ else
fi
echo ""
CLEAR='
'
CURRENT_STEP="Install system dependencies"
echo "Step 1: Installing system dependencies..."
echo "----------------------------------------"
@@ -279,7 +273,7 @@ apt_update
# Install required system packages
echo "Installing Python packages and dependencies..."
apt_install python3-pip python3-venv python-dev-is-python3 python3-pil python3-pil.imagetk build-essential python3-setuptools python3-wheel cmake ninja-build
apt_install python3-pip python3-venv python3-dev python3-pil python3-pil.imagetk build-essential python3-setuptools python3-wheel cython3 scons cmake ninja-build
# Install additional system dependencies that might be needed
echo "Installing additional system dependencies..."
@@ -677,6 +671,8 @@ if [ -f "$PROJECT_ROOT_DIR/requirements.txt" ]; then
echo "[$PACKAGE_NUM/$TOTAL_PACKAGES] Installing: $line"
# Check if package is already installed (basic check - may not catch all cases)
PACKAGE_NAME=$(echo "$line" | sed -E 's/[<>=!].*$//' | sed 's/^[[:space:]]*//;s/[[:space:]]*$//')
# Try installing with verbose output and timeout (if available)
# Use --no-cache-dir to avoid cache issues, --verbose for diagnostics
INSTALL_OUTPUT=$(mktemp)
@@ -791,28 +787,9 @@ CURRENT_STEP="Build and install rpi-rgb-led-matrix"
echo "Step 6: Building and installing rpi-rgb-led-matrix..."
echo "-----------------------------------------------------"
# On Pi 5, also check that the installed library has rp1_rio support.
# A library built before Pi 5 support was added imports fine but maps to the
# Pi 3 peripheral bus address (0x3f000000) instead of the RP1 chip at runtime.
_HAS_RP1=0
if python3 -c 'from rgbmatrix import RGBMatrixOptions; assert hasattr(RGBMatrixOptions(), "rp1_rio")' >/dev/null 2>&1; then
_HAS_RP1=1
fi
_SKIP_BUILD=0
# If already installed and not forcing rebuild, skip expensive build
if python3 -c 'from rgbmatrix import RGBMatrix, RGBMatrixOptions' >/dev/null 2>&1 && [ "${RPI_RGB_FORCE_REBUILD:-0}" != "1" ]; then
if [ "$IS_PI5" = "1" ] && [ "$_HAS_RP1" = "0" ]; then
echo "⚠ Pi 5 detected: installed rgbmatrix lacks rp1_rio support (older build)."
echo " Forcing rebuild to get Pi 5 RP1 support..."
else
_SKIP_BUILD=1
fi
fi
if [ "$_SKIP_BUILD" = "1" ]; then
_skip_suffix=""
if [ "$IS_PI5" = "1" ]; then _skip_suffix=" with Pi 5 RP1 support"; fi
echo "rgbmatrix already installed${_skip_suffix}; skipping build (set RPI_RGB_FORCE_REBUILD=1 to force rebuild)."
echo "rgbmatrix Python package already available; skipping build (set RPI_RGB_FORCE_REBUILD=1 to force rebuild)."
else
# Ensure rpi-rgb-led-matrix submodule is initialized
if [ ! -d "$PROJECT_ROOT_DIR/rpi-rgb-led-matrix-master" ]; then
@@ -848,13 +825,20 @@ else
fi
pushd "$PROJECT_ROOT_DIR/rpi-rgb-led-matrix-master" >/dev/null
echo "Installing rpi-rgb-led-matrix Python package (scikit-build-core + cmake)..."
echo " Build deps required: python-dev-is-python3 cmake"
echo " This compiles C++ — may take 2-5 minutes on Pi 4/5..."
echo "Building rpi-rgb-led-matrix Python bindings..."
# Build the library first, then Python bindings
# The build-python target depends on the library being built
if ! make build-python; then
echo "✗ Failed to build rpi-rgb-led-matrix Python bindings"
echo " Make sure you have the required build tools installed:"
echo " sudo apt install -y build-essential python3-dev cython3 scons"
popd >/dev/null
exit 1
fi
cd bindings/python
echo "Installing rpi-rgb-led-matrix Python package via pip..."
if ! python3 -m pip install --break-system-packages .; then
echo "✗ Failed to install rpi-rgb-led-matrix Python package"
echo " Ensure build tools are installed:"
echo " sudo apt install -y python-dev-is-python3 cmake build-essential"
popd >/dev/null
exit 1
fi
@@ -879,17 +863,6 @@ except Exception as e:
PY
then
echo "✓ rpi-rgb-led-matrix installed and verified"
# Pi 5: confirm the freshly-built library has rp1_rio support
if [ "$IS_PI5" = "1" ]; then
if python3 -c 'from rgbmatrix import RGBMatrixOptions; assert hasattr(RGBMatrixOptions(), "rp1_rio")' >/dev/null 2>&1; then
echo "✓ Pi 5 RP1 (rp1_rio) support confirmed"
else
echo "⚠ rp1_rio not found after rebuild — the submodule may be an older version."
echo " Try updating the submodule and rebuilding:"
echo " git submodule update --remote rpi-rgb-led-matrix-master"
echo " sudo RPI_RGB_FORCE_REBUILD=1 ./first_time_install.sh"
fi
fi
else
echo "✗ rpi-rgb-led-matrix import test failed"
exit 1
@@ -1506,7 +1479,7 @@ echo "WiFi Connection Status:"
if command -v nmcli >/dev/null 2>&1; then
WIFI_STATUS=$(nmcli -t -f DEVICE,TYPE,STATE device status 2>/dev/null | grep -i wifi || echo "")
if [ -n "$WIFI_STATUS" ]; then
echo "$WIFI_STATUS" | while IFS=':' read -r _ _ state; do
echo "$WIFI_STATUS" | while IFS=':' read -r device type state; do
if [ "$state" = "connected" ]; then
SSID=$(nmcli -t -f active,ssid device wifi 2>/dev/null | grep "^yes:" | cut -d: -f2 | head -1)
if [ -n "$SSID" ]; then

View File

@@ -0,0 +1,138 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "March Madness Plugin Configuration",
"type": "object",
"properties": {
"enabled": {
"type": "boolean",
"default": false,
"description": "Enable the March Madness tournament display"
},
"leagues": {
"type": "object",
"title": "Tournament Leagues",
"description": "Which NCAA tournaments to display",
"properties": {
"ncaam": {
"type": "boolean",
"default": true,
"description": "Show NCAA Men's Tournament games"
},
"ncaaw": {
"type": "boolean",
"default": true,
"description": "Show NCAA Women's Tournament games"
}
},
"additionalProperties": false
},
"favorite_teams": {
"type": "array",
"title": "Favorite Teams",
"description": "Team abbreviations to highlight (e.g., DUKE, UNC). Leave empty to show all teams equally.",
"items": {
"type": "string"
},
"uniqueItems": true,
"default": []
},
"display_options": {
"type": "object",
"title": "Display Options",
"x-collapsed": true,
"properties": {
"show_seeds": {
"type": "boolean",
"default": true,
"description": "Show tournament seeds (1-16) next to team names"
},
"show_round_logos": {
"type": "boolean",
"default": true,
"description": "Show round logo separators between game groups"
},
"highlight_upsets": {
"type": "boolean",
"default": true,
"description": "Highlight upset winners (higher seed beating lower seed) in gold"
},
"show_bracket_progress": {
"type": "boolean",
"default": true,
"description": "Show which teams are still alive in each region"
},
"scroll_speed": {
"type": "number",
"default": 1.0,
"minimum": 0.5,
"maximum": 5.0,
"description": "Scroll speed (pixels per frame)"
},
"scroll_delay": {
"type": "number",
"default": 0.02,
"minimum": 0.001,
"maximum": 0.1,
"description": "Delay between scroll frames (seconds)"
},
"target_fps": {
"type": "integer",
"default": 120,
"minimum": 30,
"maximum": 200,
"description": "Target frames per second"
},
"loop": {
"type": "boolean",
"default": true,
"description": "Loop the scroll continuously"
},
"dynamic_duration": {
"type": "boolean",
"default": true,
"description": "Automatically adjust display duration based on content width"
},
"min_duration": {
"type": "integer",
"default": 30,
"minimum": 10,
"maximum": 300,
"description": "Minimum display duration in seconds"
},
"max_duration": {
"type": "integer",
"default": 300,
"minimum": 30,
"maximum": 600,
"description": "Maximum display duration in seconds"
}
},
"additionalProperties": false
},
"data_settings": {
"type": "object",
"title": "Data Settings",
"x-collapsed": true,
"properties": {
"update_interval": {
"type": "integer",
"default": 300,
"minimum": 60,
"maximum": 3600,
"description": "How often to refresh tournament data (seconds). Automatically shortens to 60s when live games are detected."
},
"request_timeout": {
"type": "integer",
"default": 30,
"minimum": 5,
"maximum": 60,
"description": "API request timeout in seconds"
}
},
"additionalProperties": false
}
},
"required": ["enabled"],
"additionalProperties": false,
"x-propertyOrder": ["enabled", "leagues", "favorite_teams", "display_options", "data_settings"]
}

View File

@@ -0,0 +1,910 @@
"""March Madness Plugin — NCAA Tournament bracket tracker for LED Matrix.
Displays a horizontally-scrolling ticker of NCAA Tournament games grouped by
round, with seeds, round logos, live scores, and upset highlighting.
"""
import re
import threading
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
import pytz
import requests
from PIL import Image, ImageDraw, ImageFont
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from src.plugin_system.base_plugin import BasePlugin
try:
from src.common.scroll_helper import ScrollHelper
except ImportError:
ScrollHelper = None
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SCOREBOARD_URLS = {
"ncaam": "https://site.api.espn.com/apis/site/v2/sports/basketball/mens-college-basketball/scoreboard",
"ncaaw": "https://site.api.espn.com/apis/site/v2/sports/basketball/womens-college-basketball/scoreboard",
}
ROUND_ORDER = {"NCG": 0, "F4": 1, "E8": 2, "S16": 3, "R32": 4, "R64": 5, "": 6}
ROUND_DISPLAY_NAMES = {
"NCG": "Championship",
"F4": "Final Four",
"E8": "Elite Eight",
"S16": "Sweet Sixteen",
"R32": "Round of 32",
"R64": "Round of 64",
}
ROUND_LOGO_FILES = {
"NCG": "CHAMPIONSHIP.png",
"F4": "FINAL_4.png",
"E8": "ELITE_8.png",
"S16": "SWEET_16.png",
"R32": "ROUND_32.png",
"R64": "ROUND_64.png",
}
REGION_ORDER = {"E": 0, "W": 1, "S": 2, "MW": 3, "": 4}
# Colors
COLOR_WHITE = (255, 255, 255)
COLOR_GOLD = (255, 215, 0)
COLOR_GRAY = (160, 160, 160)
COLOR_DIM = (100, 100, 100)
COLOR_RED = (255, 60, 60)
COLOR_GREEN = (60, 200, 60)
COLOR_BLACK = (0, 0, 0)
COLOR_DARK_BG = (20, 20, 20)
# ---------------------------------------------------------------------------
# Plugin Class
# ---------------------------------------------------------------------------
class MarchMadnessPlugin(BasePlugin):
"""NCAA March Madness tournament bracket tracker."""
def __init__(
self,
plugin_id: str,
config: Dict[str, Any],
display_manager: Any,
cache_manager: Any,
plugin_manager: Any,
):
super().__init__(plugin_id, config, display_manager, cache_manager, plugin_manager)
# Config
leagues_config = config.get("leagues", {})
self.show_ncaam: bool = leagues_config.get("ncaam", True)
self.show_ncaaw: bool = leagues_config.get("ncaaw", True)
self.favorite_teams: List[str] = [t.upper() for t in config.get("favorite_teams", [])]
display_options = config.get("display_options", {})
self.show_seeds: bool = display_options.get("show_seeds", True)
self.show_round_logos: bool = display_options.get("show_round_logos", True)
self.highlight_upsets: bool = display_options.get("highlight_upsets", True)
self.show_bracket_progress: bool = display_options.get("show_bracket_progress", True)
self.scroll_speed: float = display_options.get("scroll_speed", 1.0)
self.scroll_delay: float = display_options.get("scroll_delay", 0.02)
self.target_fps: int = display_options.get("target_fps", 120)
self.loop: bool = display_options.get("loop", True)
self.dynamic_duration_enabled: bool = display_options.get("dynamic_duration", True)
self.min_duration: int = display_options.get("min_duration", 30)
self.max_duration: int = display_options.get("max_duration", 300)
if self.min_duration > self.max_duration:
self.logger.warning(
f"min_duration ({self.min_duration}) > max_duration ({self.max_duration}); swapping values"
)
self.min_duration, self.max_duration = self.max_duration, self.min_duration
data_settings = config.get("data_settings", {})
self.update_interval: int = data_settings.get("update_interval", 300)
self.request_timeout: int = data_settings.get("request_timeout", 30)
# Scrolling flag for display controller
self.enable_scrolling = True
# State
self.games_data: List[Dict] = []
self.ticker_image: Optional[Image.Image] = None
self.last_update: float = 0
self.dynamic_duration: float = 60
self.total_scroll_width: int = 0
self._display_start_time: Optional[float] = None
self._end_reached_logged: bool = False
self._update_lock = threading.Lock()
self._has_live_games: bool = False
self._cached_dynamic_duration: Optional[float] = None
self._duration_cache_time: float = 0
# Display dimensions
self.display_width: int = self.display_manager.matrix.width
self.display_height: int = self.display_manager.matrix.height
# HTTP session with retry
self.session = requests.Session()
retry = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
self.session.mount("https://", HTTPAdapter(max_retries=retry))
self.headers = {"User-Agent": "LEDMatrix/2.0"}
# ScrollHelper
if ScrollHelper:
self.scroll_helper = ScrollHelper(self.display_width, self.display_height, logger=self.logger)
if hasattr(self.scroll_helper, "set_frame_based_scrolling"):
self.scroll_helper.set_frame_based_scrolling(True)
self.scroll_helper.set_scroll_speed(self.scroll_speed)
self.scroll_helper.set_scroll_delay(self.scroll_delay)
if hasattr(self.scroll_helper, "set_target_fps"):
self.scroll_helper.set_target_fps(self.target_fps)
self.scroll_helper.set_dynamic_duration_settings(
enabled=self.dynamic_duration_enabled,
min_duration=self.min_duration,
max_duration=self.max_duration,
buffer=0.1,
)
else:
self.scroll_helper = None
self.logger.warning("ScrollHelper not available")
# Fonts
self.fonts = self._load_fonts()
# Logos
self._round_logos: Dict[str, Image.Image] = {}
self._team_logo_cache: Dict[str, Optional[Image.Image]] = {}
self._march_madness_logo: Optional[Image.Image] = None
self._load_round_logos()
self.logger.info(
f"MarchMadnessPlugin initialized — NCAAM: {self.show_ncaam}, "
f"NCAAW: {self.show_ncaaw}, favorites: {self.favorite_teams}"
)
# ------------------------------------------------------------------
# Fonts
# ------------------------------------------------------------------
def _load_fonts(self) -> Dict[str, ImageFont.FreeTypeFont]:
fonts = {}
try:
fonts["score"] = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 10)
except IOError:
fonts["score"] = ImageFont.load_default()
try:
fonts["time"] = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
except IOError:
fonts["time"] = ImageFont.load_default()
try:
fonts["detail"] = ImageFont.truetype("assets/fonts/4x6-font.ttf", 6)
except IOError:
fonts["detail"] = ImageFont.load_default()
return fonts
# ------------------------------------------------------------------
# Logo loading
# ------------------------------------------------------------------
def _load_round_logos(self) -> None:
logo_dir = Path("assets/sports/ncaa_logos")
for round_key, filename in ROUND_LOGO_FILES.items():
path = logo_dir / filename
try:
img = Image.open(path).convert("RGBA")
# Resize to fit display height
target_h = self.display_height - 4
ratio = target_h / img.height
target_w = int(img.width * ratio)
self._round_logos[round_key] = img.resize((target_w, target_h), Image.Resampling.LANCZOS)
except (OSError, ValueError) as e:
self.logger.warning(f"Could not load round logo {filename}: {e}")
except Exception:
self.logger.exception(f"Unexpected error loading round logo {filename}")
# March Madness logo
mm_path = logo_dir / "MARCH_MADNESS.png"
try:
img = Image.open(mm_path).convert("RGBA")
target_h = self.display_height - 4
ratio = target_h / img.height
target_w = int(img.width * ratio)
self._march_madness_logo = img.resize((target_w, target_h), Image.Resampling.LANCZOS)
except (OSError, ValueError) as e:
self.logger.warning(f"Could not load March Madness logo: {e}")
except Exception:
self.logger.exception("Unexpected error loading March Madness logo")
def _get_team_logo(self, abbr: str) -> Optional[Image.Image]:
if abbr in self._team_logo_cache:
return self._team_logo_cache[abbr]
logo_dir = Path("assets/sports/ncaa_logos")
path = logo_dir / f"{abbr}.png"
try:
img = Image.open(path).convert("RGBA")
target_h = self.display_height - 6
ratio = target_h / img.height
target_w = int(img.width * ratio)
img = img.resize((target_w, target_h), Image.Resampling.LANCZOS)
self._team_logo_cache[abbr] = img
return img
except (FileNotFoundError, OSError, ValueError):
self._team_logo_cache[abbr] = None
return None
except Exception:
self.logger.exception(f"Unexpected error loading team logo for {abbr}")
self._team_logo_cache[abbr] = None
return None
# ------------------------------------------------------------------
# Data fetching
# ------------------------------------------------------------------
def _is_tournament_window(self) -> bool:
today = datetime.now(pytz.utc)
return (3, 10) <= (today.month, today.day) <= (4, 10)
def _fetch_tournament_data(self) -> List[Dict]:
"""Fetch tournament games from ESPN scoreboard API."""
all_games: List[Dict] = []
leagues = []
if self.show_ncaam:
leagues.append("ncaam")
if self.show_ncaaw:
leagues.append("ncaaw")
for league_key in leagues:
url = SCOREBOARD_URLS.get(league_key)
if not url:
continue
cache_key = f"march_madness_{league_key}_scoreboard"
cache_max_age = 60 if self._has_live_games else self.update_interval
cached = self.cache_manager.get(cache_key, max_age=cache_max_age)
if cached:
all_games.extend(cached)
continue
try:
# NCAA basketball scoreboard without dates param returns current games
params = {"limit": 1000, "groups": 100}
resp = self.session.get(url, params=params, headers=self.headers, timeout=self.request_timeout)
resp.raise_for_status()
data = resp.json()
events = data.get("events", [])
league_games = []
for event in events:
game = self._parse_event(event, league_key)
if game:
league_games.append(game)
self.cache_manager.set(cache_key, league_games)
self.logger.info(f"Fetched {len(league_games)} {league_key} tournament games")
all_games.extend(league_games)
except Exception:
self.logger.exception(f"Error fetching {league_key} tournament data")
return all_games
def _parse_event(self, event: Dict, league_key: str) -> Optional[Dict]:
"""Parse an ESPN event into a game dict."""
competitions = event.get("competitions", [])
if not competitions:
return None
comp = competitions[0]
# Confirm tournament game
comp_type = comp.get("type", {})
is_tournament = comp_type.get("abbreviation") == "TRNMNT"
notes = comp.get("notes", [])
headline = ""
if notes:
headline = notes[0].get("headline", "")
if not is_tournament and "Championship" in headline:
is_tournament = True
if not is_tournament:
return None
# Status
status = comp.get("status", {}).get("type", {})
state = status.get("state", "pre")
status_detail = status.get("shortDetail", "")
# Teams
competitors = comp.get("competitors", [])
home_team = next((c for c in competitors if c.get("homeAway") == "home"), None)
away_team = next((c for c in competitors if c.get("homeAway") == "away"), None)
if not home_team or not away_team:
return None
home_abbr = home_team.get("team", {}).get("abbreviation", "???")
away_abbr = away_team.get("team", {}).get("abbreviation", "???")
home_score = home_team.get("score", "0")
away_score = away_team.get("score", "0")
# Seeds
home_seed = home_team.get("curatedRank", {}).get("current", 0)
away_seed = away_team.get("curatedRank", {}).get("current", 0)
if home_seed >= 99:
home_seed = 0
if away_seed >= 99:
away_seed = 0
# Round and region
tournament_round = self._parse_round(headline)
tournament_region = self._parse_region(headline)
# Date/time
date_str = event.get("date", "")
start_time_utc = None
game_date = ""
game_time = ""
try:
if date_str.endswith("Z"):
date_str = date_str.replace("Z", "+00:00")
dt = datetime.fromisoformat(date_str)
if dt.tzinfo is None:
start_time_utc = dt.replace(tzinfo=pytz.UTC)
else:
start_time_utc = dt.astimezone(pytz.UTC)
local = start_time_utc.astimezone(pytz.timezone("US/Eastern"))
game_date = local.strftime("%-m/%-d")
game_time = local.strftime("%-I:%M%p").replace("AM", "am").replace("PM", "pm")
except (ValueError, AttributeError):
pass
# Period / clock for live games
period = 0
clock = ""
period_text = ""
is_halftime = False
if state == "in":
status_obj = comp.get("status", {})
period = status_obj.get("period", 0)
clock = status_obj.get("displayClock", "")
detail_lower = status_detail.lower()
uses_quarters = league_key == "ncaaw" or "quarter" in detail_lower or detail_lower.startswith("q")
if period <= (4 if uses_quarters else 2):
period_text = f"Q{period}" if uses_quarters else f"H{period}"
else:
ot_num = period - (4 if uses_quarters else 2)
period_text = f"OT{ot_num}" if ot_num > 1 else "OT"
if "halftime" in detail_lower:
is_halftime = True
elif state == "post":
period_text = status.get("shortDetail", "Final")
if "Final" not in period_text:
period_text = "Final"
# Determine winner and upset
is_final = state == "post"
is_upset = False
winner_side = ""
if is_final:
try:
h = int(float(home_score))
a = int(float(away_score))
if h > a:
winner_side = "home"
if home_seed > away_seed > 0:
is_upset = True
elif a > h:
winner_side = "away"
if away_seed > home_seed > 0:
is_upset = True
except (ValueError, TypeError):
pass
return {
"id": event.get("id", ""),
"league": league_key,
"home_abbr": home_abbr,
"away_abbr": away_abbr,
"home_score": str(home_score),
"away_score": str(away_score),
"home_seed": home_seed,
"away_seed": away_seed,
"tournament_round": tournament_round,
"tournament_region": tournament_region,
"state": state,
"is_final": is_final,
"is_live": state == "in",
"is_upcoming": state == "pre",
"is_halftime": is_halftime,
"period": period,
"period_text": period_text,
"clock": clock,
"status_detail": status_detail,
"game_date": game_date,
"game_time": game_time,
"start_time_utc": start_time_utc,
"is_upset": is_upset,
"winner_side": winner_side,
"headline": headline,
}
@staticmethod
def _parse_round(headline: str) -> str:
hl = headline.lower()
if "national championship" in hl:
return "NCG"
if "final four" in hl:
return "F4"
if "elite 8" in hl or "elite eight" in hl:
return "E8"
if "sweet 16" in hl or "sweet sixteen" in hl:
return "S16"
if "2nd round" in hl or "second round" in hl:
return "R32"
if "1st round" in hl or "first round" in hl:
return "R64"
return ""
@staticmethod
def _parse_region(headline: str) -> str:
if "East Region" in headline:
return "E"
if "West Region" in headline:
return "W"
if "South Region" in headline:
return "S"
if "Midwest Region" in headline:
return "MW"
m = re.search(r"Regional (\d+)", headline)
if m:
return f"R{m.group(1)}"
return ""
# ------------------------------------------------------------------
# Game processing
# ------------------------------------------------------------------
def _process_games(self, games: List[Dict]) -> Dict[str, List[Dict]]:
"""Group games by round, sorted by round significance then region/seed."""
grouped: Dict[str, List[Dict]] = {}
for game in games:
rnd = game.get("tournament_round", "")
grouped.setdefault(rnd, []).append(game)
# Sort each round's games by region then seed matchup
for rnd, round_games in grouped.items():
round_games.sort(
key=lambda g: (
REGION_ORDER.get(g.get("tournament_region", ""), 4),
min(g.get("away_seed", 99), g.get("home_seed", 99)),
)
)
return grouped
# ------------------------------------------------------------------
# Rendering
# ------------------------------------------------------------------
def _draw_text_with_outline(
self,
draw: ImageDraw.Draw,
text: str,
xy: tuple,
font: ImageFont.FreeTypeFont,
fill: tuple = COLOR_WHITE,
outline: tuple = COLOR_BLACK,
) -> None:
x, y = xy
for dx in (-1, 0, 1):
for dy in (-1, 0, 1):
if dx or dy:
draw.text((x + dx, y + dy), text, font=font, fill=outline)
draw.text((x, y), text, font=font, fill=fill)
def _create_round_separator(self, round_key: str) -> Image.Image:
"""Create a separator tile for a tournament round."""
height = self.display_height
name = ROUND_DISPLAY_NAMES.get(round_key, round_key)
font = self.fonts["time"]
# Measure text
tmp = Image.new("RGB", (1, 1))
tmp_draw = ImageDraw.Draw(tmp)
text_width = int(tmp_draw.textlength(name, font=font))
# Logo on each side
logo = self._round_logos.get(round_key, self._march_madness_logo)
logo_w = logo.width if logo else 0
padding = 6
total_w = padding + logo_w + padding + text_width + padding + logo_w + padding
total_w = max(total_w, 80)
img = Image.new("RGB", (total_w, height), COLOR_DARK_BG)
draw = ImageDraw.Draw(img)
# Draw logos
x = padding
if logo:
logo_y = (height - logo.height) // 2
img.paste(logo, (x, logo_y), logo)
x += logo_w + padding
# Draw round name
text_y = (height - 8) // 2 # 8px font
self._draw_text_with_outline(draw, name, (x, text_y), font, fill=COLOR_GOLD)
x += text_width + padding
if logo:
logo_y = (height - logo.height) // 2
img.paste(logo, (x, logo_y), logo)
return img
def _create_game_tile(self, game: Dict) -> Image.Image:
"""Create a single game tile for the scrolling ticker."""
height = self.display_height
font_score = self.fonts["score"]
font_time = self.fonts["time"]
font_detail = self.fonts["detail"]
# Load team logos
away_logo = self._get_team_logo(game["away_abbr"])
home_logo = self._get_team_logo(game["home_abbr"])
logo_w = 0
if away_logo:
logo_w = max(logo_w, away_logo.width)
if home_logo:
logo_w = max(logo_w, home_logo.width)
if logo_w == 0:
logo_w = 24
# Build text elements
away_seed_str = f"({game['away_seed']})" if self.show_seeds and game.get("away_seed", 0) > 0 else ""
home_seed_str = f"({game['home_seed']})" if self.show_seeds and game.get("home_seed", 0) > 0 else ""
away_text = f"{away_seed_str}{game['away_abbr']}"
home_text = f"{game['home_abbr']}{home_seed_str}"
# Measure text widths
tmp = Image.new("RGB", (1, 1))
tmp_draw = ImageDraw.Draw(tmp)
away_text_w = int(tmp_draw.textlength(away_text, font=font_detail))
home_text_w = int(tmp_draw.textlength(home_text, font=font_detail))
# Center content: status line
if game["is_live"]:
if game["is_halftime"]:
status_text = "Halftime"
else:
status_text = f"{game['period_text']} {game['clock']}".strip()
elif game["is_final"]:
status_text = game.get("period_text", "Final")
else:
status_text = f"{game['game_date']} {game['game_time']}".strip()
status_w = int(tmp_draw.textlength(status_text, font=font_time))
# Score line (for live/final)
score_text = ""
if game["is_live"] or game["is_final"]:
score_text = f"{game['away_score']}-{game['home_score']}"
score_w = int(tmp_draw.textlength(score_text, font=font_score)) if score_text else 0
# Calculate tile width
h_pad = 4
center_w = max(status_w, score_w, 40)
tile_w = h_pad + logo_w + h_pad + away_text_w + h_pad + center_w + h_pad + home_text_w + h_pad + logo_w + h_pad
img = Image.new("RGB", (tile_w, height), COLOR_BLACK)
draw = ImageDraw.Draw(img)
# Paste away logo
x = h_pad
if away_logo:
logo_y = (height - away_logo.height) // 2
img.paste(away_logo, (x, logo_y), away_logo)
x += logo_w + h_pad
# Away team text (seed + abbr)
is_fav_away = game["away_abbr"] in self.favorite_teams if self.favorite_teams else False
away_color = COLOR_GOLD if is_fav_away else COLOR_WHITE
if game["is_final"] and game["winner_side"] == "away" and self.highlight_upsets and game["is_upset"]:
away_color = COLOR_GOLD
team_text_y = (height - 6) // 2 - 5 # Upper half
self._draw_text_with_outline(draw, away_text, (x, team_text_y), font_detail, fill=away_color)
x += away_text_w + h_pad
# Center block
center_x = x
center_mid = center_x + center_w // 2
# Status text (top center of center block)
status_x = center_mid - status_w // 2
status_y = 2
status_color = COLOR_GREEN if game["is_live"] else COLOR_GRAY
self._draw_text_with_outline(draw, status_text, (status_x, status_y), font_time, fill=status_color)
# Score (bottom center of center block, for live/final)
if score_text:
score_x = center_mid - score_w // 2
score_y = height - 13
# Upset highlighting
if game["is_final"] and game["is_upset"] and self.highlight_upsets:
score_color = COLOR_GOLD
elif game["is_live"]:
score_color = COLOR_WHITE
else:
score_color = COLOR_WHITE
self._draw_text_with_outline(draw, score_text, (score_x, score_y), font_score, fill=score_color)
# Date for final games (below score)
if game["is_final"] and game.get("game_date"):
date_w = int(draw.textlength(game["game_date"], font=font_detail))
date_x = center_mid - date_w // 2
date_y = height - 6
self._draw_text_with_outline(draw, game["game_date"], (date_x, date_y), font_detail, fill=COLOR_DIM)
x = center_x + center_w + h_pad
# Home team text
is_fav_home = game["home_abbr"] in self.favorite_teams if self.favorite_teams else False
home_color = COLOR_GOLD if is_fav_home else COLOR_WHITE
if game["is_final"] and game["winner_side"] == "home" and self.highlight_upsets and game["is_upset"]:
home_color = COLOR_GOLD
self._draw_text_with_outline(draw, home_text, (x, team_text_y), font_detail, fill=home_color)
x += home_text_w + h_pad
# Paste home logo
if home_logo:
logo_y = (height - home_logo.height) // 2
img.paste(home_logo, (x, logo_y), home_logo)
return img
def _create_ticker_image(self) -> None:
"""Build the full scrolling ticker image from game tiles."""
if not self.games_data:
self.ticker_image = None
if self.scroll_helper:
self.scroll_helper.clear_cache()
return
grouped = self._process_games(self.games_data)
content_items: List[Image.Image] = []
# Order rounds by significance (most important first)
sorted_rounds = sorted(grouped.keys(), key=lambda r: ROUND_ORDER.get(r, 6))
for rnd in sorted_rounds:
games = grouped[rnd]
if not games:
continue
# Add round separator
if self.show_round_logos and rnd:
separator = self._create_round_separator(rnd)
content_items.append(separator)
# Add game tiles
for game in games:
tile = self._create_game_tile(game)
content_items.append(tile)
if not content_items:
self.ticker_image = None
if self.scroll_helper:
self.scroll_helper.clear_cache()
return
if not self.scroll_helper:
self.ticker_image = None
return
gap_width = 16
# Use ScrollHelper to create the scrolling image
self.ticker_image = self.scroll_helper.create_scrolling_image(
content_items=content_items,
item_gap=gap_width,
element_gap=0,
)
self.total_scroll_width = self.scroll_helper.total_scroll_width
self.dynamic_duration = self.scroll_helper.get_dynamic_duration()
self.logger.info(
f"Ticker image created: {self.ticker_image.width}px wide, "
f"{len(self.games_data)} games, dynamic_duration={self.dynamic_duration:.0f}s"
)
# ------------------------------------------------------------------
# Plugin lifecycle
# ------------------------------------------------------------------
def update(self) -> None:
"""Fetch and process tournament data."""
if not self.enabled:
return
current_time = time.time()
# Use shorter interval if live games detected
interval = 60 if self._has_live_games else self.update_interval
if current_time - self.last_update < interval:
return
with self._update_lock:
self.last_update = current_time
if not self._is_tournament_window():
self.logger.debug("Outside tournament window, skipping fetch")
self.games_data = []
self.ticker_image = None
if self.scroll_helper:
self.scroll_helper.clear_cache()
return
try:
games = self._fetch_tournament_data()
self._has_live_games = any(g["is_live"] for g in games)
self.games_data = games
self._create_ticker_image()
self.logger.info(
f"Updated: {len(games)} games, "
f"live={self._has_live_games}"
)
except Exception as e:
self.logger.error(f"Update error: {e}", exc_info=True)
def display(self, force_clear: bool = False) -> None:
"""Render one scroll frame."""
if not self.enabled:
return
if force_clear or self._display_start_time is None:
self._display_start_time = time.time()
if self.scroll_helper:
self.scroll_helper.reset_scroll()
self._end_reached_logged = False
if not self.games_data or self.ticker_image is None:
self._display_fallback()
return
if not self.scroll_helper:
self._display_fallback()
return
try:
if self.loop or not self.scroll_helper.is_scroll_complete():
self.scroll_helper.update_scroll_position()
elif not self._end_reached_logged:
self.logger.info("Scroll complete")
self._end_reached_logged = True
visible = self.scroll_helper.get_visible_portion()
if visible is None:
self._display_fallback()
return
self.dynamic_duration = self.scroll_helper.get_dynamic_duration()
matrix_w = self.display_manager.matrix.width
matrix_h = self.display_manager.matrix.height
if not hasattr(self.display_manager, "image") or self.display_manager.image is None:
self.display_manager.image = Image.new("RGB", (matrix_w, matrix_h), COLOR_BLACK)
self.display_manager.image.paste(visible, (0, 0))
self.display_manager.update_display()
self.scroll_helper.log_frame_rate()
except Exception as e:
self.logger.error(f"Display error: {e}", exc_info=True)
self._display_fallback()
def _display_fallback(self) -> None:
w = self.display_manager.matrix.width
h = self.display_manager.matrix.height
img = Image.new("RGB", (w, h), COLOR_BLACK)
draw = ImageDraw.Draw(img)
if self._is_tournament_window():
text = "No games"
else:
text = "Off-season"
text_w = int(draw.textlength(text, font=self.fonts["time"]))
text_x = (w - text_w) // 2
text_y = (h - 8) // 2
draw.text((text_x, text_y), text, font=self.fonts["time"], fill=COLOR_GRAY)
# Show March Madness logo if available
if self._march_madness_logo:
logo_y = (h - self._march_madness_logo.height) // 2
img.paste(self._march_madness_logo, (2, logo_y), self._march_madness_logo)
self.display_manager.image = img
self.display_manager.update_display()
# ------------------------------------------------------------------
# Duration / cycle management
# ------------------------------------------------------------------
def get_display_duration(self) -> float:
current_time = time.time()
if self._cached_dynamic_duration is not None:
cache_age = current_time - self._duration_cache_time
if cache_age < 5.0:
return self._cached_dynamic_duration
self._cached_dynamic_duration = self.dynamic_duration
self._duration_cache_time = current_time
return self.dynamic_duration
def supports_dynamic_duration(self) -> bool:
if not self.enabled:
return False
return self.dynamic_duration_enabled
def is_cycle_complete(self) -> bool:
if not self.supports_dynamic_duration():
return True
if self._display_start_time is not None and self.dynamic_duration > 0:
elapsed = time.time() - self._display_start_time
if elapsed >= self.dynamic_duration:
return True
if not self.loop and self.scroll_helper and self.scroll_helper.is_scroll_complete():
return True
return False
def reset_cycle_state(self) -> None:
super().reset_cycle_state()
self._display_start_time = None
self._end_reached_logged = False
if self.scroll_helper:
self.scroll_helper.reset_scroll()
# ------------------------------------------------------------------
# Vegas mode
# ------------------------------------------------------------------
def get_vegas_content(self):
if not self.games_data:
return None
tiles = []
for game in self.games_data:
tiles.append(self._create_game_tile(game))
return tiles if tiles else None
def get_vegas_content_type(self) -> str:
return "multi"
# ------------------------------------------------------------------
# Info / cleanup
# ------------------------------------------------------------------
def get_info(self) -> Dict:
info = super().get_info()
info["total_games"] = len(self.games_data)
info["has_live_games"] = self._has_live_games
info["dynamic_duration"] = self.dynamic_duration
info["tournament_window"] = self._is_tournament_window()
return info
def cleanup(self) -> None:
self.games_data = []
self.ticker_image = None
if self.scroll_helper:
self.scroll_helper.clear_cache()
self._team_logo_cache.clear()
if self.session:
self.session.close()
self.session = None
super().cleanup()

View File

@@ -0,0 +1,37 @@
{
"id": "march-madness",
"name": "March Madness",
"version": "1.0.0",
"description": "NCAA March Madness tournament bracket tracker with round branding, seeded matchups, live scores, and upset highlighting",
"author": "ChuckBuilds",
"category": "sports",
"tags": [
"ncaa",
"basketball",
"march-madness",
"tournament",
"bracket",
"scrolling"
],
"repo": "https://github.com/ChuckBuilds/ledmatrix-plugins",
"branch": "main",
"plugin_path": "plugins/march-madness",
"versions": [
{
"version": "1.0.0",
"ledmatrix_min": "2.0.0",
"released": "2026-02-16"
}
],
"stars": 0,
"downloads": 0,
"last_updated": "2026-02-16",
"verified": true,
"screenshot": "",
"display_modes": [
"march_madness"
],
"dependencies": {},
"entry_point": "manager.py",
"class_name": "MarchMadnessPlugin"
}

View File

@@ -0,0 +1,4 @@
requests>=2.28.0
Pillow>=12.2.0
pytz>=2022.1
numpy>=1.24.0

View File

@@ -1,3 +1,3 @@
Pillow>=12.2.0
Pillow>=10.4.0
PyYAML>=6.0.2
requests>=2.33.0
requests>=2.32.0

View File

@@ -3,7 +3,7 @@
# Tested on Raspbian OS 12 (Bookworm) and 13 (Trixie)
# Image processing
Pillow>=12.2.0,<13.0.0
Pillow>=10.4.0,<12.0.0
numpy>=1.24.0 # For fast array operations in ScrollHelper (compatible with 2.x)
# Timezone handling
@@ -12,7 +12,7 @@ timezonefinder>=6.5.0,<7.0.0 # Updated for better performance and accuracy
geopy>=2.4.1,<3.0.0
# HTTP requests
requests>=2.33.0,<3.0.0
requests>=2.32.0,<3.0.0
# Google API integration
google-auth-oauthlib>=1.2.0,<2.0.0
@@ -23,10 +23,10 @@ google-api-python-client>=2.147.0,<3.0.0
freetype-py>=2.5.1,<3.0.0
# Spotify integration
spotipy>=2.25.2,<3.0.0
spotipy>=2.24.0,<3.0.0
# Flask web framework
Flask>=3.1.3,<4.0.0
Flask>=3.0.0,<4.0.0
# Text processing
unidecode>=1.3.8,<2.0.0
@@ -35,7 +35,7 @@ unidecode>=1.3.8,<2.0.0
icalevents>=0.1.27,<1.0.0
# WebSocket support
python-socketio>=5.14.0,<6.0.0
python-socketio>=5.11.0,<6.0.0
python-engineio>=4.9.0,<5.0.0
websockets>=12.0,<14.0
websocket-client>=1.8.0,<2.0.0
@@ -44,7 +44,7 @@ websocket-client>=1.8.0,<2.0.0
jsonschema>=4.20.0,<5.0.0
# Testing dependencies
pytest>=9.0.3,<10.0.0
pytest>=7.4.0,<8.0.0
pytest-cov>=4.1.0,<5.0.0
pytest-mock>=3.11.0,<4.0.0
mypy>=1.5.0,<2.0.0

1
run.py
View File

@@ -51,6 +51,7 @@ if debug_mode:
# Try to import the plugin system directly to get better error info
print("DEBUG: Attempting to import src.plugin_system...", flush=True)
from src.plugin_system import PluginManager
print("DEBUG: Plugin system import successful", flush=True)
except ImportError as e:
print(f"DEBUG: Plugin system import failed: {e}", flush=True)

View File

@@ -9,7 +9,7 @@ and preventing validation errors.
import json
import sys
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
def get_default_for_field(prop: Dict[str, Any]) -> Any:

View File

@@ -9,8 +9,9 @@ Analyze all plugin config schemas to identify issues:
"""
import json
import os
from pathlib import Path
from typing import Dict, List, Any
from typing import Dict, List, Set, Any
import jsonschema
from jsonschema import Draft7Validator

View File

@@ -3,6 +3,8 @@
Check what imports are actually in the app.py file on the Pi
"""
import sys
import os
from pathlib import Path
# Read the app.py file and check the import lines

View File

@@ -203,7 +203,7 @@ link_github_plugin() {
log_info "Repository already exists at $target_dir"
if [[ -d "$target_dir/.git" ]]; then
log_info "Updating repository..."
(cd "$target_dir" && git pull --rebase) || true
(cd "$target_dir" && git pull --rebase || true)
fi
else
# Clone the repository

View File

@@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""
Pillow compatibility smoke test.
Exercises the Pillow APIs used throughout LEDMatrix to verify a new
Pillow version doesn't break image rendering, font handling, or resize ops.
Run after upgrading Pillow:
python3 scripts/dev/test_pillow_compat.py
"""
import sys
def check(label, fn):
try:
result = fn()
print(f"{label}" + (f"{result}" if result is not None else ""))
return True
except Exception as e:
print(f"{label}{type(e).__name__}: {e}", file=sys.stderr)
return False
def main():
from PIL import Image, ImageDraw, ImageFont
import PIL
print(f"Pillow {PIL.__version__} on Python {sys.version.split()[0]}\n")
failures = 0
print("Image creation:")
failures += not check("Image.new RGB",
lambda: Image.new('RGB', (128, 32), (0, 0, 0)).size)
failures += not check("Image.new RGBA",
lambda: Image.new('RGBA', (64, 64), (255, 0, 0, 128)).size)
failures += not check("Image.new 1-bit",
lambda: Image.new('1', (16, 16)).size)
print("\nDraw operations:")
img = Image.new('RGB', (128, 32), (0, 0, 0))
draw = ImageDraw.Draw(img)
font = ImageFont.load_default()
failures += not check("draw.rectangle",
lambda: draw.rectangle([0, 0, 127, 31], outline=(255, 0, 0)))
failures += not check("draw.text",
lambda: draw.text((2, 2), "Hello", fill=(255, 255, 255), font=font))
failures += not check("draw.line",
lambda: draw.line([0, 0, 127, 31], fill=(0, 255, 0)))
print("\nFont metrics (used in text_helper, scroll_helper):")
failures += not check("draw.textlength",
lambda: f"{draw.textlength('Test', font=font):.1f}px")
failures += not check("draw.textbbox",
lambda: draw.textbbox((0, 0), "Test", font=font))
print("\nResampling (used in logo_helper, image_utils, sports base):")
logo = Image.new('RGBA', (200, 200), (255, 128, 0, 200))
failures += not check("Image.Resampling.LANCZOS exists",
lambda: str(Image.Resampling.LANCZOS))
failures += not check("thumbnail with LANCZOS",
lambda: (logo.thumbnail((64, 32), Image.Resampling.LANCZOS), logo.size)[1])
big = Image.new('RGB', (300, 300), (0, 128, 255))
failures += not check("resize with LANCZOS",
lambda: big.resize((128, 32), Image.Resampling.LANCZOS).size)
print("\nComposite / paste (used in display rendering):")
base = Image.new('RGB', (128, 32), (0, 0, 0))
overlay = Image.new('RGBA', (32, 32), (255, 0, 0, 128))
failures += not check("paste RGBA onto RGB",
lambda: (base.paste(overlay.convert('RGB'), (0, 0)), base.size)[1])
failures += not check("Image.alpha_composite",
lambda: Image.alpha_composite(
Image.new('RGBA', (32, 32)), overlay).size)
print("\nImage I/O:")
import io
buf = io.BytesIO()
img.save(buf, format='PNG')
buf.seek(0)
failures += not check("save/load PNG roundtrip",
lambda: Image.open(buf).size)
print()
if failures == 0:
print(f"All checks passed. Pillow {PIL.__version__} is compatible.")
return 0
else:
print(f"{failures} check(s) failed — review output above.", file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -15,6 +15,7 @@ Usage: python tools/validate_python.py <python_file>
import ast
import sys
import os
from pathlib import Path
def validate_file(filepath: str) -> bool:
"""Validate a Python file for common issues."""

View File

@@ -13,6 +13,7 @@ echo ""
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Get the actual user

View File

@@ -41,7 +41,7 @@ if [ -f "$PROJECT_DIR/config/config.json" ]; then
echo -e "${GREEN}✓ Config file found${NC}"
# Check web_display_autostart setting
AUTOSTART=$(grep -o '"web_display_autostart"[[:space:]]*:[[:space:]]*[a-z]*' "$PROJECT_DIR/config/config.json" | grep -o '[a-z]*$')
AUTOSTART=$(cat "$PROJECT_DIR/config/config.json" | grep -o '"web_display_autostart"[[:space:]]*:[[:space:]]*[a-z]*' | grep -o '[a-z]*$')
if [ "$AUTOSTART" == "true" ]; then
echo -e "${GREEN}✓ web_display_autostart: true${NC}"

View File

@@ -16,8 +16,11 @@ YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Check if running as root or with sudo
if [ "$EUID" -ne 0 ]; then
if [ "$EUID" -ne 0 ]; then
echo -e "${YELLOW}Warning: Some checks require sudo. Running what we can...${NC}"
SUDO=""
else
SUDO=""
fi
PROJECT_DIR="${HOME}/LEDMatrix"

View File

@@ -118,7 +118,7 @@ total_count=${#ARCHITECTURES[@]}
for arch in "${!ARCHITECTURES[@]}"; do
if download_binary "$arch" "${ARCHITECTURES[$arch]}"; then
success_count=$((success_count + 1))
((success_count++))
fi
done

View File

@@ -7,6 +7,12 @@ echo "Fixing LEDMatrix assets directory permissions..."
# Get the real user (not root when running with sudo)
REAL_USER=${SUDO_USER:-$USER}
# Resolve the home directory of the real user robustly
if command -v getent >/dev/null 2>&1; then
REAL_HOME=$(getent passwd "$REAL_USER" | cut -d: -f6)
else
REAL_HOME=$(eval echo ~"$REAL_USER")
fi
REAL_GROUP=$(id -gn "$REAL_USER")
# Get the project directory

View File

@@ -14,6 +14,9 @@ else
ACTUAL_USER=$(whoami)
fi
# Get the home directory of the actual user
USER_HOME=$(eval echo ~$ACTUAL_USER)
# Determine the Project Root Directory (parent of scripts/install/)
PROJECT_ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)
@@ -31,8 +34,7 @@ echo "Generating service file with dynamic paths..."
WEB_SERVICE_FILE_CONTENT=$(cat <<EOF
[Unit]
Description=LED Matrix Web Interface Service
After=network-online.target
Wants=network-online.target
After=network.target
[Service]
Type=simple

View File

@@ -81,7 +81,7 @@ def main() -> int:
help='Plugin config as JSON string')
parser.add_argument('--mock-data', '-m', default=None,
help='Path to JSON file with mock cache data')
parser.add_argument('--output', '-o', default='/tmp/plugin_render.png', # nosec B108 - dev script default; user can override
parser.add_argument('--output', '-o', default='/tmp/plugin_render.png',
help='Output PNG path (default: /tmp/plugin_render.png)')
parser.add_argument('--width', type=int, default=128, help='Display width (default: 128)')
parser.add_argument('--height', type=int, default=32, help='Display height (default: 32)')

View File

@@ -7,7 +7,9 @@ Supports both unittest and pytest.
"""
import sys
import os
import argparse
import subprocess
from pathlib import Path
from typing import Optional
@@ -196,14 +198,17 @@ def main():
if runner == 'auto':
# Try pytest first, fall back to unittest
try:
import pytest
runner = 'pytest'
except ImportError:
runner = 'unittest'
# Run tests
if runner == 'pytest':
import importlib.util
return run_pytest_tests(test_files, args.verbose, args.coverage)
else:
import importlib.util
return run_unittest_tests(test_files, args.verbose)

View File

@@ -6,7 +6,9 @@ This script allows manual clearing of specific cache keys or all cache data.
import os
import sys
import json
import argparse
from pathlib import Path
# Add the src directory to the path so we can import our modules
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))

View File

@@ -111,7 +111,7 @@ def main():
# Ensure PYTHONPATH is set correctly if web_interface.py has relative imports to src
# The WorkingDirectory in systemd service should handle this for web_interface.py
print(f"Launching web interface v3: {sys.executable} {WEB_INTERFACE_SCRIPT}")
os.execvp(sys.executable, [sys.executable, WEB_INTERFACE_SCRIPT]) # nosec B606 - both args are fixed constants
os.execvp(sys.executable, [sys.executable, WEB_INTERFACE_SCRIPT])
except Exception as e:
print(f"Failed to exec web interface: {e}")
sys.exit(1) # Failed to start

View File

@@ -7,7 +7,10 @@ where Recent/Upcoming managers consume data from the background service cache.
"""
import time
import logging
from typing import Dict, Optional, Any, Callable
from datetime import datetime
import pytz
class BackgroundCacheMixin:

View File

@@ -14,15 +14,19 @@ Key Features:
- Memory-efficient data storage
"""
import os
import time
import logging
import threading
import requests
from typing import Dict, Any, Optional, Callable
from typing import Dict, Any, Optional, List, Callable, Union
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import json
import queue
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, Future
import weakref
from src.cache_manager import CacheManager
# Configure logging
logger = logging.getLogger(__name__)
@@ -223,7 +227,7 @@ class BackgroundDataService:
self.stats['cache_misses'] += 1
# Submit to executor
self.executor.submit(self._fetch_data_worker, request)
future = self.executor.submit(self._fetch_data_worker, request)
logger.info(f"Submitted background fetch request {request_id} for {sport} {year}")
return request_id
@@ -549,12 +553,13 @@ class BackgroundDataService:
if to_remove:
logger.info(f"Cleared {len(to_remove)} old completed requests")
def shutdown(self, wait: bool = True):
def shutdown(self, wait: bool = True, timeout: int = 30):
"""
Shutdown the background data service.
Args:
wait: Whether to wait for active requests to complete
timeout: Maximum time to wait for shutdown
"""
logger.info("Shutting down BackgroundDataService...")
@@ -565,14 +570,24 @@ class BackgroundDataService:
for request_id in list(self.active_requests.keys()):
self.cancel_request(request_id)
self.executor.shutdown(wait=wait)
# Shutdown executor with compatibility for older Python versions
try:
# Try with timeout parameter (Python 3.9+)
self.executor.shutdown(wait=wait, timeout=timeout)
except TypeError:
# Fallback for older Python versions that don't support timeout
if wait and timeout:
# For older versions, we can't specify timeout, so just wait
self.executor.shutdown(wait=True)
else:
self.executor.shutdown(wait=wait)
logger.info("BackgroundDataService shutdown complete")
def __del__(self):
"""Cleanup when service is destroyed."""
if not self._shutdown:
self.shutdown(wait=False)
self.shutdown(wait=False, timeout=None)
# Global service instance
_background_service: Optional[BackgroundDataService] = None

View File

@@ -7,7 +7,7 @@ fields and data structures.
"""
from abc import ABC, abstractmethod
from typing import Dict, Optional
from typing import Dict, Any, Optional, List
import logging
from datetime import datetime
import pytz
@@ -21,10 +21,12 @@ class APIDataExtractor(ABC):
@abstractmethod
def extract_game_details(self, game_event: Dict) -> Optional[Dict]:
"""Extract common game details from raw API data."""
pass
@abstractmethod
def get_sport_specific_fields(self, game_event: Dict) -> Dict:
"""Extract sport-specific fields (downs, innings, periods, etc.)."""
pass
def _extract_common_details(self, game_event: Dict) -> tuple[Dict | None, Dict | None, Dict | None, Dict | None, Dict | None]:
"""Extract common game details that work across all sports."""

View File

@@ -329,6 +329,7 @@ class Baseball(SportsCore):
return
series_summary = game.get("series_summary", "")
font = self.fonts.get('detail', ImageFont.load_default())
bbox = draw_overlay.textbbox((0, 0), series_summary, font=self.fonts['time'])
height = bbox[3] - bbox[1]
shots_y = (self.display_height - height) // 2

View File

@@ -1,4 +1,6 @@
import logging
import time
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from PIL import Image, ImageDraw, ImageFont

View File

@@ -6,10 +6,11 @@ to support different APIs and data providers.
"""
from abc import ABC, abstractmethod
from typing import Dict, List
from typing import Dict, Any, Optional, List
import requests
import logging
from datetime import datetime
from datetime import datetime, timedelta
import time
class DataSource(ABC):
"""Abstract base class for data sources."""
@@ -34,14 +35,17 @@ class DataSource(ABC):
@abstractmethod
def fetch_live_games(self, sport: str, league: str) -> List[Dict]:
"""Fetch live games for a sport/league."""
pass
@abstractmethod
def fetch_schedule(self, sport: str, league: str, date_range: tuple) -> List[Dict]:
"""Fetch schedule for a sport/league within date range."""
pass
@abstractmethod
def fetch_standings(self, sport: str, league: str) -> Dict:
"""Fetch standings for a sport/league."""
pass
def get_headers(self) -> Dict[str, str]:
"""Get headers for API requests."""
@@ -213,7 +217,7 @@ class MLBAPIDataSource(DataSource):
response.raise_for_status()
data = response.json()
self.logger.debug("Fetched standings from MLB API")
self.logger.debug(f"Fetched standings from MLB API")
return data
except Exception as e:
@@ -292,7 +296,7 @@ class SoccerAPIDataSource(DataSource):
response.raise_for_status()
data = response.json()
self.logger.debug("Fetched standings from soccer API")
self.logger.debug(f"Fetched standings from soccer API")
return data
except Exception as e:

View File

@@ -1,8 +1,10 @@
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from src.display_manager import DisplayManager
from src.cache_manager import CacheManager
from datetime import datetime, timezone, timedelta
import logging
from PIL import Image, ImageDraw, ImageFont
import time
from src.base_classes.data_sources import ESPNDataSource
from src.base_classes.sports import SportsCore, SportsLive

View File

@@ -1,4 +1,6 @@
import logging
import time
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from PIL import Image, ImageDraw, ImageFont
@@ -77,6 +79,8 @@ class Hockey(SportsCore):
away_shots = round(home_team_saves / home_team_saves_per)
if away_team_saves_per > 0:
home_shots = round(away_team_saves / away_team_saves_per)
status_short = status["type"].get("shortDetail", "")
if situation and status["type"]["state"] == "in":
# Detect scoring events from status detail
# status_detail = status["type"].get("detail", "")

View File

@@ -5,7 +5,7 @@ import time
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
import pytz
import requests
@@ -172,8 +172,8 @@ class SportsCore(ABC):
try:
fallbacks.append(Path.home() / ".ledmatrix" / "logos" / self.sport_key)
except RuntimeError as e:
self.logger.debug("Could not resolve home directory (expected for service users): %s", e)
except Exception:
pass
fallbacks.append(Path(tempfile.gettempdir()) / "ledmatrix_logos" / self.sport_key)
@@ -416,6 +416,7 @@ class SportsCore(ABC):
league=self.league,
event_id=game['id'],
update_interval_seconds=update_interval,
is_live=is_live
)
if odds_data:

View File

@@ -11,10 +11,13 @@ Follows LEDMatrix configuration management patterns:
- Maintainable: Changes to odds logic affect all plugins
"""
import time
import logging
import requests
import json
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, Optional, List
import pytz
class BaseOddsManager:

View File

@@ -193,21 +193,19 @@ class CacheStrategy:
Data type string for strategy lookup
"""
key_lower = key.lower()
# Odds data — checked before the generic 'live' block below because
# live-odds cache keys (e.g. odds_espn_basketball_nba_<id>_live) contain
# both 'odds' AND 'live'. Without this ordering the 'live' check below
# would match first and return 'sports_live' (30 s TTL) instead of the
# correct 'odds_live' (120 s TTL).
# Odds data — checked FIRST because odds keys may also contain 'live'/'current'
# (e.g. odds_espn_nba_game_123_live). The odds TTL (120s for live, 1800s for
# upcoming) must win over the generic sports_live TTL (30s) to avoid hitting
# the ESPN odds API every 30 seconds per game.
if 'odds' in key_lower:
# For live games, use shorter cache; for upcoming games, use longer cache
if any(x in key_lower for x in ['live', 'current']):
return 'odds_live' # Live odds change more frequently
return 'odds' # Regular odds for upcoming games
return 'odds_live' # Live odds change more frequently (120s TTL)
return 'odds' # Regular odds for upcoming games (1800s TTL)
# Live sports data
# Live sports data (only reached if key does NOT contain 'odds')
if any(x in key_lower for x in ['live', 'current', 'scoreboard']):
if 'soccer' in key_lower:
return 'sports_live' # Soccer live data is very time-sensitive
return 'sports_live'
# Weather data

View File

@@ -13,6 +13,7 @@ import threading
from typing import Dict, Any, Optional, Protocol
from datetime import datetime
from src.exceptions import CacheError
class CacheStrategyProtocol(Protocol):
@@ -183,7 +184,7 @@ class DiskCache:
os.replace(tmp_path, cache_path)
# Set proper permissions: 660 (rw-rw----) for group-readable cache files
try:
os.chmod(cache_path, 0o660) # nosec B103 - intentional; web UI and service share a group
os.chmod(cache_path, 0o660)
except OSError:
pass # Non-critical if chmod fails
finally:
@@ -201,7 +202,7 @@ class DiskCache:
os.fsync(cache_file.fileno())
# Set proper permissions: 660 (rw-rw----) for group-readable cache files
try:
os.chmod(cache_path, 0o660) # nosec B103 - intentional; web UI and service share a group
os.chmod(cache_path, 0o660)
except OSError:
pass # Non-critical if chmod fails
self.logger.debug("Wrote cache for %s directly (non-atomic)", key)
@@ -209,7 +210,7 @@ class DiskCache:
# If direct write also fails, try fallback location
self.logger.warning("Direct write failed for key '%s' to %s: %s", key, cache_path, write_error)
raise # Re-raise to trigger fallback logic
except (IOError, OSError, PermissionError):
except (IOError, OSError, PermissionError) as e:
# Attempt one-time fallback write to user's home cache directory
try:
# Try user's home cache directory as fallback
@@ -227,7 +228,7 @@ class DiskCache:
json.dump(data, tmp_file, indent=4, cls=DateTimeEncoder)
# Set proper permissions: 660 (rw-rw----) for group-readable cache files
try:
os.chmod(fallback_path, 0o660) # nosec B103 - intentional; web UI and service share a group
os.chmod(fallback_path, 0o660)
except OSError:
pass # Non-critical if chmod fails
self.logger.debug("Cache wrote to fallback location: %s", fallback_path)

View File

@@ -7,6 +7,7 @@ from typing import Any, Dict, List, Optional
import logging
import threading
import tempfile
from pathlib import Path
from src.exceptions import CacheError
from src.cache.memory_cache import MemoryCache
from src.cache.disk_cache import DiskCache
@@ -110,7 +111,7 @@ class CacheManager:
if os.access(system_cache_dir, os.W_OK):
self.logger.info(f"Using system cache directory: {system_cache_dir}")
return system_cache_dir
except (OSError, IOError, PermissionError):
except (OSError, IOError, PermissionError) as perm_error:
# Permission errors are expected when running as non-root
self.logger.debug(f"Could not create system cache directory (permission denied): {system_cache_dir}")
except (OSError, IOError, PermissionError) as e:

View File

@@ -5,10 +5,13 @@ Handles HTTP requests, caching, and ESPN API integration for LED matrix plugins.
Extracted from LEDMatrix core to provide reusable functionality for plugins.
"""
import json
import logging
import time
from datetime import datetime
from typing import Any, Dict, Optional
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlencode
import requests
from requests.adapters import HTTPAdapter

View File

@@ -5,9 +5,11 @@ This example shows how to refactor the basketball plugin to use the
ledmatrix-common package for cleaner, more maintainable code.
"""
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
from PIL import Image, ImageDraw
# Import common helpers
from src.common import (

View File

@@ -42,7 +42,7 @@ def test_utilities(display_width: int, display_height: int):
print(f"Testing LEDMatrix Common utilities with {display_width}x{display_height} display")
try:
from ledmatrix_common import LogoHelper, TextHelper, DisplayHelper, GameHelper, ConfigHelper
from ledmatrix_common import LogoHelper, TextHelper, APIHelper, DisplayHelper, GameHelper, ConfigHelper
# Test LogoHelper
print("Testing LogoHelper...")
@@ -63,12 +63,12 @@ def test_utilities(display_width: int, display_height: int):
# Test GameHelper
print("Testing GameHelper...")
GameHelper()
game_helper = GameHelper()
print("GameHelper initialized")
# Test ConfigHelper
print("Testing ConfigHelper...")
ConfigHelper()
config_helper = ConfigHelper()
print("ConfigHelper initialized")
print("All tests passed!")

View File

@@ -6,7 +6,7 @@ Extracted from LEDMatrix core to provide reusable functionality for plugins.
"""
import logging
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union
from PIL import Image, ImageDraw, ImageFont
@@ -166,7 +166,8 @@ class DisplayHelper:
img = self.create_base_image(background_color)
draw = ImageDraw.Draw(img)
# Start text off-screen to the right
# Calculate text position (start off-screen to the right)
text_width = draw.textlength(text, font=font)
x_position = self.display_width
# Draw text
@@ -215,7 +216,8 @@ class DisplayHelper:
PIL Image with error message
"""
img = self.create_base_image((50, 0, 0)) # Dark red background
draw = ImageDraw.Draw(img)
# Use default font
font = ImageFont.load_default()
@@ -235,6 +237,8 @@ class DisplayHelper:
PIL Image with no data message
"""
img = self.create_base_image((0, 0, 0))
draw = ImageDraw.Draw(img)
font = ImageFont.load_default()
self._draw_centered_text(message, font, (0, 0, 0), (150, 150, 150))

View File

@@ -6,8 +6,10 @@ Extracted from LEDMatrix core to provide reusable functionality for plugins.
"""
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse
import requests
from PIL import Image

View File

@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
# System directories that should never have their permissions modified
# These directories have special system-level permissions that must be preserved
PROTECTED_SYSTEM_DIRECTORIES = { # nosec B108 - these are checked to PREVENT permission changes, not to use as temp paths
PROTECTED_SYSTEM_DIRECTORIES = {
'/tmp',
'/var/tmp',
'/dev',

View File

@@ -1,651 +0,0 @@
"""
Multi-Display Sync Manager
Synchronizes scrolling content across two LED matrix display units over UDP.
Runs at the core framework level — works with any plugin automatically.
Roles:
standalone No sync (default behavior)
leader Drives scroll, sends rendered follower frames via UDP
follower Receives frames from leader; falls back to own plugins when
the leader goes offline
Compatibility rule: rows and cols must match between leader and follower.
chain_length may differ — each display can have a different number of panels.
Port default: 5765 (UDP). Open this port on both Pis if ufw is active:
sudo ufw allow 5765/udp
"""
import io
import json
import os
import socket
import struct
import tempfile
import threading
import time
import logging
from enum import Enum
from typing import Callable, Optional
import numpy as np
from PIL import Image
# Raw-frame wire format: 8-byte magic + 4-byte header + raw RGB pixels
# Much faster than PNG: no encode/decode, negligible CPU, same UDP packet size
_RAW_MAGIC = b'SYNC_RAW'
_RAW_HEADER = struct.Struct('<HH') # width, height (uint16 LE)
SYNC_PORT = 5765
HELLO_INTERVAL = 5.0 # follower broadcasts hello every 5 s
HEARTBEAT_INTERVAL = 2.0 # follower sends heartbeat every 2 s
PEER_TIMEOUT = 6.0 # leader: no heartbeat → follower gone
LEADER_TIMEOUT = 6.0 # follower: no frame → leader gone
STATUS_FILE = os.path.join(tempfile.gettempdir(), "led_matrix_sync_status.json")
class SyncRole(Enum):
STANDALONE = "standalone"
LEADER = "leader"
FOLLOWER = "follower"
class LeaderState(Enum):
NO_PEER = "no_peer"
CONNECTED = "connected"
INCOMPATIBLE = "incompatible"
class FollowerState(Enum):
STANDALONE = "standalone"
FOLLOWER = "follower"
class DisplaySyncManager:
"""
Core sync manager. Instantiated by DisplayController based on config['sync'].
Leader sends compressed PNG frames to the follower after each render cycle.
Follower renders received frames; returns to own plugin stack when leader
goes offline.
"""
def __init__(
self,
role_str: str,
cfg: dict,
hw_config: dict,
logger: logging.Logger,
) -> None:
"""
Args:
role_str: "standalone" | "leader" | "follower"
cfg: config['sync'] dict
hw_config: config['display']['hardware'] dict (this Pi's own config)
logger: framework logger
"""
try:
self.role = SyncRole(role_str)
except ValueError:
logger.warning("Invalid sync role '%s', defaulting to standalone", role_str)
self.role = SyncRole.STANDALONE
self.logger = logger
self.port = int(cfg.get("port", SYNC_PORT))
self._hw_config = hw_config
# Leader state
self._leader_state = LeaderState.NO_PEER
self._peer_ip: Optional[str] = None
self._peer_compatible: bool = False
self._peer_chain: int = 0
self._last_heartbeat_time: float = 0.0
self._leader_width: int = 0 # set by display_controller after init
# Follower state
self._follower_state = FollowerState.STANDALONE
self._latest_frame: Optional[Image.Image] = None # pixel-frame fallback
self._latest_scroll_x: Optional[float] = None # Vegas scroll position
self._last_leader_frame_time: float = 0.0
self._frame_lock = threading.Lock()
self._leader_ip: Optional[str] = None
self._on_new_cycle: Optional[Callable[[], None]] = None # called when leader starts new cycle
self._on_scroll_image: Optional[Callable[[Image.Image], None]] = None # called with Image when received
self._pending_scroll_image: Optional[Image.Image] = None # image received before callback set
self._scroll_image_lock = threading.Lock() # guards _on_scroll_image / _pending_scroll_image
self._img_server_sock = None # TCP server for scroll image transfer
# Leader state additions
self._on_follower_connected: Optional[Callable[[], None]] = None # called when follower connects
self._error_message: Optional[str] = None
self._running = False
self._recv_sock: Optional[socket.socket] = None
self._send_sock: Optional[socket.socket] = None
if self.role == SyncRole.STANDALONE:
return
if self.role == SyncRole.LEADER:
self._start_leader()
elif self.role == SyncRole.FOLLOWER:
self._start_follower()
# ------------------------------------------------------------------ #
# Leader setup #
# ------------------------------------------------------------------ #
def _start_leader(self) -> None:
# Receive socket: listens for hello + heartbeat from follower
self._recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # nosec B104
self._recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._recv_sock.bind(("", self.port)) # nosec B104 — intentional: must receive UDP broadcast on all interfaces
self._recv_sock.settimeout(1.0)
# Send socket: unicast frames + hello_ack to follower
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._running = True
threading.Thread(
target=self._leader_recv_loop, daemon=True, name="sync-leader-recv"
).start()
threading.Thread(
target=self._leader_watchdog, daemon=True, name="sync-leader-watchdog"
).start()
self.logger.info("Sync: leader started on UDP port %d", self.port)
self.write_status_file()
def _leader_recv_loop(self) -> None:
while self._running:
try:
data, addr = self._recv_sock.recvfrom(1024)
sender_ip = addr[0]
try:
msg = json.loads(data.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
continue
t = msg.get("t")
if t == "hello":
self._handle_hello(msg, sender_ip)
elif t == "hb":
if self._peer_ip == sender_ip:
self._last_heartbeat_time = time.time()
except socket.timeout:
continue
except Exception as exc:
self.logger.debug("Sync leader recv error: %s", exc)
def _handle_hello(self, msg: dict, sender_ip: str) -> None:
hw = self._hw_config
local_rows = hw.get("rows", 32)
local_cols = hw.get("cols", 64)
peer_rows = int(msg.get("rows", 0))
peer_cols = int(msg.get("cols", 0))
peer_chain = int(msg.get("chain", 1))
compatible = peer_rows == local_rows and peer_cols == local_cols
self._peer_ip = sender_ip
self._peer_compatible = compatible
self._peer_chain = peer_chain
self._last_heartbeat_time = time.time()
prev_state = self._leader_state
if compatible:
if prev_state != LeaderState.CONNECTED:
self.logger.info(
"Sync: follower connected at %s (chain=%d)", sender_ip, peer_chain
)
self._leader_state = LeaderState.CONNECTED
self._error_message = None
# Send scroll image immediately on new connection so follower has identical content
if prev_state != LeaderState.CONNECTED and self._on_follower_connected:
threading.Thread(
target=self._on_follower_connected,
daemon=True, name="sync-leader-img-push"
).start()
else:
self._leader_state = LeaderState.INCOMPATIBLE
self._error_message = (
f"Incompatible panels: follower is {peer_cols}x{peer_rows}, "
f"leader is {local_cols}x{local_rows}. "
f"rows and cols must match between displays."
)
if prev_state != LeaderState.INCOMPATIBLE:
self.logger.error("Sync: %s", self._error_message)
if self._leader_state != prev_state:
self.write_status_file()
ack = json.dumps({
"t": "hello_ack",
"compatible": compatible,
"leader_width": self._leader_width,
"error": self._error_message,
}).encode("utf-8")
try:
self._send_sock.sendto(ack, (sender_ip, self.port))
except Exception as exc:
self.logger.debug("Sync: hello_ack send failed: %s", exc)
def _leader_watchdog(self) -> None:
while self._running:
time.sleep(1.0)
if self._leader_state == LeaderState.CONNECTED:
if time.time() - self._last_heartbeat_time > PEER_TIMEOUT:
self.logger.info(
"Sync: follower heartbeat timeout — peer disconnected"
)
self._leader_state = LeaderState.NO_PEER
self._peer_ip = None
self._peer_compatible = False
self.write_status_file()
def _image_server_loop(self) -> None:
"""Follower: TCP server that receives the leader's scroll image at each new cycle."""
while self._running:
try:
conn, addr = self._img_server_sock.accept()
conn.settimeout(10.0)
try:
# 4-byte big-endian length prefix
hdr = b""
while len(hdr) < 4:
chunk = conn.recv(4 - len(hdr))
if not chunk:
break
hdr += chunk
if len(hdr) < 4:
continue
length = int.from_bytes(hdr, "big")
_MAX_IMAGE_BYTES = 10 * 1024 * 1024 # 10 MB — well above any real scroll image
if length <= 0 or length > _MAX_IMAGE_BYTES:
self.logger.warning(
"Sync: rejected TCP image with invalid length %d (max %d) from %s",
length, _MAX_IMAGE_BYTES, addr,
)
conn.close()
continue
data = bytearray()
while len(data) < length:
chunk = conn.recv(min(65536, length - len(data)))
if not chunk:
break
data.extend(chunk)
img = Image.open(io.BytesIO(data))
_MAX_W, _MAX_H = 100_000, 256 # generous for any real scroll image
if img.width > _MAX_W or img.height > _MAX_H:
self.logger.warning(
"Sync: rejected oversized scroll image %dx%d (max %dx%d) from %s",
img.width, img.height, _MAX_W, _MAX_H, addr,
)
continue
try:
img.load()
except (Image.DecompressionBombError, ValueError) as exc:
self.logger.warning("Sync: rejected decompression bomb from %s: %s", addr, exc)
continue
self.logger.info(
"Sync: received scroll image %dx%d (%d bytes compressed)",
img.width, img.height, length,
)
with self._scroll_image_lock:
if self._on_scroll_image:
cb = self._on_scroll_image
else:
# Callback not registered yet (startup race) — cache it
self._pending_scroll_image = img
cb = None
if cb:
cb(img)
finally:
conn.close()
except socket.timeout:
continue
except Exception as exc:
self.logger.debug("Sync: image server error: %s", exc)
def send_scroll_image(self, image: Image.Image) -> None:
"""Leader: send the full scroll image to the follower via TCP.
PNG compression typically reduces a 5000×32 image to ~2050KB,
transferring in <20ms on local WiFi. Called at new_cycle and on
first connection so both Pis always have identical cached_arrays.
"""
if self.role != SyncRole.LEADER:
return
if self._leader_state != LeaderState.CONNECTED or not self._peer_ip:
return
try:
buf = io.BytesIO()
image.save(buf, format="PNG", optimize=True)
data = buf.getvalue()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(5.0)
sock.connect((self._peer_ip, self.port + 1))
sock.sendall(len(data).to_bytes(4, "big") + data)
self.logger.info(
"Sync: sent scroll image %dx%d (%d bytes compressed)",
image.width, image.height, len(data),
)
except Exception as exc:
self.logger.debug("Sync: image send error: %s", exc)
def set_on_follower_connected(self, callback: Callable[[], None]) -> None:
"""Leader: callback fired (in a thread) when a compatible follower first connects.
Use this to push the current scroll image immediately.
If a follower is already connected when this is called, fires right away
(handles the race where follower connects during leader startup).
"""
self._on_follower_connected = callback
if self._leader_state == LeaderState.CONNECTED:
threading.Thread(
target=callback, daemon=True, name="sync-leader-img-push-late"
).start()
def set_on_scroll_image(self, callback: Callable[[Image.Image], None]) -> None:
"""Follower: callback fired with the received Image when leader sends scroll image.
If an image was received before this callback was registered (startup race),
fires immediately with that cached image.
"""
with self._scroll_image_lock:
self._on_scroll_image = callback
pending = self._pending_scroll_image
self._pending_scroll_image = None
if pending is not None:
callback(pending)
def send_scroll_x(self, scroll_x: float) -> None:
"""Leader (Vegas mode): broadcast scroll position instead of a pixel frame.
The follower renders from its own local pipeline at scroll_x - display_width.
~20 bytes vs ~18KB for raw frames — eliminates all content-change artifacts.
"""
if self.role != SyncRole.LEADER:
return
if self._leader_state != LeaderState.CONNECTED or not self._peer_ip:
return
try:
msg = json.dumps({"t": "sx", "x": round(scroll_x, 2)}).encode("utf-8")
self._send_sock.sendto(msg, (self._peer_ip, self.port))
except Exception as exc:
self.logger.debug("Sync: scroll_x send error: %s", exc)
def send_new_cycle(self) -> None:
"""Leader: signal that a new scroll cycle has started so follower rebuilds its image."""
if self.role != SyncRole.LEADER:
return
if self._leader_state != LeaderState.CONNECTED or not self._peer_ip:
return
try:
self._send_sock.sendto(b'{"t":"nc"}', (self._peer_ip, self.port))
except Exception as exc:
self.logger.debug("Sync: new_cycle send error: %s", exc)
def send_frame(self, image: Image.Image) -> None:
"""Leader: send a rendered frame to the follower as raw RGB bytes.
Raw format is orders of magnitude faster than PNG on Pi hardware —
no encode on sender, no decode on receiver.
Packet: 8-byte magic + 4-byte (width, height) header + raw RGB bytes.
"""
if self.role != SyncRole.LEADER:
return
if self._leader_state != LeaderState.CONNECTED or not self._peer_ip:
return
try:
arr = np.asarray(image.convert("RGB"), dtype=np.uint8)
header = _RAW_MAGIC + _RAW_HEADER.pack(image.width, image.height)
data = header + arr.tobytes()
if len(data) <= 65000:
self._send_sock.sendto(data, (self._peer_ip, self.port))
elif not getattr(self, '_oversized_frame_warned', False):
self._oversized_frame_warned = True
self.logger.warning(
"Sync: frame too large for UDP (%d bytes, max 65000) — "
"image %dx%d will not be sent; use TCP image sync instead",
len(data), image.width, image.height,
)
except Exception as exc:
self.logger.debug("Sync: frame send error: %s", exc)
def set_leader_width(self, width: int) -> None:
"""Called by DisplayController once display_manager.width is known."""
self._leader_width = width
# ------------------------------------------------------------------ #
# Follower setup #
# ------------------------------------------------------------------ #
def _start_follower(self) -> None:
# Receive socket: listens for frames + hello_ack from leader
self._recv_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._recv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._recv_sock.bind(("", self.port)) # nosec B104 — intentional: must receive UDP broadcast on all interfaces
self._recv_sock.settimeout(0.1)
# Send socket: broadcasts hello + heartbeat
self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._send_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self._running = True
threading.Thread(
target=self._follower_recv_loop, daemon=True, name="sync-follower-recv"
).start()
threading.Thread(
target=self._follower_announce_loop, daemon=True, name="sync-follower-announce"
).start()
threading.Thread(
target=self._follower_watchdog, daemon=True, name="sync-follower-watchdog"
).start()
# TCP server: receives scroll images from leader (port + 1)
self._img_server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # nosec B104
self._img_server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._img_server_sock.bind(("", self.port + 1)) # nosec B104 — intentional: TCP server must accept connections on all interfaces
self._img_server_sock.listen(1)
self._img_server_sock.settimeout(1.0)
threading.Thread(
target=self._image_server_loop, daemon=True, name="sync-image-server"
).start()
self.logger.info(
"Sync: follower started on UDP port %d, image server on TCP %d",
self.port, self.port + 1,
)
self.write_status_file()
def _follower_recv_loop(self) -> None:
while self._running:
try:
data, addr = self._recv_sock.recvfrom(65535)
sender_ip = addr[0]
if data[:8] == _RAW_MAGIC or len(data) > 512:
# Frame data: prefer magic-tagged raw RGB; fall back to legacy PNG
try:
if data[:8] == _RAW_MAGIC:
w, h = _RAW_HEADER.unpack(data[8:12])
raw = data[12:]
img = Image.frombuffer(
"RGB", (w, h), raw, "raw", "RGB", 0, 1
)
else:
# Fallback: try legacy PNG
img = Image.open(io.BytesIO(data))
img.load()
with self._frame_lock:
self._latest_frame = img
self._last_leader_frame_time = time.time()
self._leader_ip = sender_ip
if self._follower_state == FollowerState.STANDALONE:
self._follower_state = FollowerState.FOLLOWER
self.logger.info(
"Sync: leader active at %s — switching to follower mode",
sender_ip,
)
self.write_status_file()
except Exception as exc:
self.logger.debug("Sync: frame decode error: %s", exc)
else:
# Control message
try:
msg = json.loads(data.decode("utf-8"))
t = msg.get("t")
if t == "hello_ack":
self._leader_ip = sender_ip
self._peer_compatible = msg.get("compatible", False)
self._error_message = msg.get("error")
if not self._peer_compatible and self._error_message:
self.logger.error(
"Sync: leader rejected handshake — %s",
self._error_message,
)
self.write_status_file()
elif t == "sx":
# Vegas scroll-position sync — tiny message, renders locally
self._latest_scroll_x = float(msg["x"])
self._last_leader_frame_time = time.time()
self._leader_ip = sender_ip
if self._follower_state == FollowerState.STANDALONE:
self._follower_state = FollowerState.FOLLOWER
self.logger.info(
"Sync: leader active at %s — switching to follower mode",
sender_ip,
)
self.write_status_file()
if self._on_new_cycle:
self._on_new_cycle() # build initial scroll image
elif t == "nc":
# Leader started a new scroll cycle — rebuild local image
if self._on_new_cycle:
self._on_new_cycle()
except (json.JSONDecodeError, UnicodeDecodeError, KeyError):
pass
except socket.timeout:
continue
except Exception as exc:
self.logger.debug("Sync follower recv error: %s", exc)
def _follower_announce_loop(self) -> None:
hw = self._hw_config
hello = json.dumps({
"t": "hello",
"rows": hw.get("rows", 32),
"cols": hw.get("cols", 64),
"chain": hw.get("chain_length", 1),
}).encode("utf-8")
heartbeat = json.dumps({"t": "hb"}).encode("utf-8")
dest = ("<broadcast>", self.port)
last_hello = 0.0
last_hb = 0.0
while self._running:
now = time.time()
if now - last_hello >= HELLO_INTERVAL:
try:
self._send_sock.sendto(hello, dest)
last_hello = now
except Exception as exc:
self.logger.debug("Sync: hello broadcast error: %s", exc)
if now - last_hb >= HEARTBEAT_INTERVAL:
try:
self._send_sock.sendto(heartbeat, dest)
last_hb = now
except Exception as exc:
self.logger.debug("Sync: heartbeat error: %s", exc)
time.sleep(0.5)
def _follower_watchdog(self) -> None:
while self._running:
time.sleep(1.0)
if self._follower_state == FollowerState.FOLLOWER:
if time.time() - self._last_leader_frame_time > LEADER_TIMEOUT:
self.logger.info(
"Sync: leader frame timeout — returning to standalone mode"
)
self._follower_state = FollowerState.STANDALONE
with self._frame_lock:
self._latest_frame = None
self.write_status_file()
# ------------------------------------------------------------------ #
# Public API #
# ------------------------------------------------------------------ #
def is_follower_active(self) -> bool:
"""True when this Pi is in active follower mode (receiving frames)."""
return (
self.role == SyncRole.FOLLOWER
and self._follower_state == FollowerState.FOLLOWER
)
def get_latest_scroll_x(self) -> Optional[float]:
"""Follower: return the most recently received Vegas scroll position, or None."""
return self._latest_scroll_x
def set_on_new_cycle(self, callback: Callable[[], None]) -> None:
"""Follower: register a callback fired when the leader starts a new scroll cycle.
Used to trigger a local start_new_cycle() so both Pis rebuild from same fresh data.
"""
self._on_new_cycle = callback
def get_latest_frame(self) -> Optional[Image.Image]:
"""Follower: return the most recently received pixel frame (non-Vegas fallback)."""
with self._frame_lock:
return self._latest_frame
def get_status(self) -> dict:
"""Return sync state dict for the web API status endpoint."""
hw = self._hw_config
base = {
"role": self.role.value,
"port": self.port,
"local_rows": hw.get("rows", 32),
"local_cols": hw.get("cols", 64),
"local_chain": hw.get("chain_length", 1),
}
if self.role == SyncRole.STANDALONE:
return {**base, "state": "standalone"}
if self.role == SyncRole.LEADER:
return {
**base,
"state": self._leader_state.value,
"peer_ip": self._peer_ip,
"peer_compatible": self._peer_compatible,
"peer_chain": self._peer_chain,
"leader_width": self._leader_width,
"error": self._error_message,
}
# Follower
return {
**base,
"state": self._follower_state.value,
"leader_ip": self._leader_ip,
"peer_compatible": self._peer_compatible,
"error": self._error_message,
}
def write_status_file(self) -> None:
"""Write current sync status to STATUS_FILE for the web UI to read."""
try:
status = self.get_status()
status["ts"] = time.time()
tmp = STATUS_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(status, f)
os.replace(tmp, STATUS_FILE)
except Exception as exc:
self.logger.debug("Sync: status file write error: %s", exc)
def stop(self) -> None:
"""Shut down threads and close sockets."""
self._running = False
for sock in (self._recv_sock, self._send_sock, self._img_server_sock):
if sock:
try:
sock.close()
except Exception as exc:
self.logger.debug("Sync: error closing socket: %s", exc)

View File

@@ -6,6 +6,7 @@ Extracted from LEDMatrix core to provide reusable functionality for plugins.
"""
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

View File

@@ -8,7 +8,7 @@ Extracted from LEDMatrix core to provide reusable functionality for plugins.
import logging
import re
from datetime import datetime, timezone
from typing import Union
from typing import Optional, Tuple, Union
import pytz

View File

@@ -313,8 +313,17 @@ class ConfigManager:
self._merge_template_defaults(self.config, template_config)
# Save migrated config using atomic save to preserve permissions
# Load secrets if they exist to pass to atomic save
secrets_content = {}
if os.path.exists(self.secrets_path):
try:
with open(self.secrets_path, 'r') as f_secrets:
secrets_content = json.load(f_secrets)
except Exception:
pass # Continue without secrets if can't load
# Use atomic save to preserve file permissions
# Note: save_config_atomic handles secrets internally
# Note: save_config_atomic handles secrets internally, no need to pass new_secrets
result = self.save_config_atomic(
new_config_data=self.config,
create_backup=False, # Already created backup above

View File

@@ -12,10 +12,11 @@ This service wraps ConfigManager and adds:
"""
import json
import os
import time
import threading
from pathlib import Path
from typing import Dict, Any, Optional, List, Callable
from typing import Dict, Any, Optional, List, Callable, Set
from datetime import datetime
from collections import defaultdict
import logging
@@ -37,7 +38,7 @@ class ConfigVersion:
config: Configuration dictionary
version: Version number
timestamp: When this version was created
checksum: SHA-256 hex digest of the config (for change detection)
checksum: MD5 checksum of the config
"""
self.config: Dict[str, Any] = config
self.version: int = version
@@ -113,9 +114,9 @@ class ConfigService:
self._start_file_watching()
def _calculate_checksum(self, config: Dict[str, Any]) -> str:
"""Calculate checksum of configuration for change detection."""
"""Calculate MD5 checksum of configuration."""
config_str = json.dumps(config, sort_keys=True)
return hashlib.sha256(config_str.encode()).hexdigest()
return hashlib.md5(config_str.encode()).hexdigest()
def _load_config(self) -> bool:
"""

View File

@@ -1,4 +1,6 @@
import time
import logging
import sys
import os
import json
from pathlib import Path
@@ -14,7 +16,6 @@ from src.config_service import ConfigService
from src.cache_manager import CacheManager
from src.font_manager import FontManager
from src.logging_config import get_logger
from src.common.sync_manager import DisplaySyncManager, SyncRole
# Get logger with consistent configuration
logger = get_logger(__name__)
@@ -31,7 +32,10 @@ class DisplayController:
def __init__(self):
start_time = time.time()
logger.info("Starting DisplayController initialization")
# Throttle tracking for _tick_plugin_updates in high-FPS loops
self._last_plugin_tick_time = 0.0
# Initialize ConfigManager and wrap with ConfigService for hot-reload
config_manager = ConfigManager()
enable_hot_reload = os.environ.get('LEDMATRIX_HOT_RELOAD', 'true').lower() == 'true'
@@ -65,39 +69,7 @@ class DisplayController:
config_time = time.time()
self.display_manager = DisplayManager(self.config)
logger.info("DisplayManager initialized in %.3f seconds", time.time() - config_time)
# Initialize multi-display sync (standalone by default — no-op unless configured)
sync_cfg = self.config.get("sync", {})
hw_cfg = self.config.get("display", {}).get("hardware", {})
self.sync_manager = DisplaySyncManager(
role_str=sync_cfg.get("role", "standalone"),
cfg=sync_cfg,
hw_config=hw_cfg,
logger=logger,
)
# Tell the leader its own physical display width so it can include it in hello_ack
if self.sync_manager.role == SyncRole.LEADER:
self.sync_manager.set_leader_width(self.display_manager.width)
# Follower mode setup
if self.sync_manager.role == SyncRole.FOLLOWER:
# Gate update_display() so background plugin threads cannot write to
# hardware — only our render loop is permitted.
_real_update = self.display_manager.update_display
_dm = self.display_manager
def _follower_gated_update():
# Allow through when the sync render loop has the token, or when
# the leader has gone offline and we've fallen back to standalone.
if getattr(_dm, '_sync_render_allowed', False) or not self.sync_manager.is_follower_active():
_real_update()
self.display_manager.update_display = _follower_gated_update
# Note: _on_new_cycle is NOT registered here. The leader now sends
# its actual scroll image via TCP at each new_cycle, so the follower
# adopts that image directly via set_on_scroll_image(). Registering
# _on_new_cycle would trigger a local rebuild that overwrites the
# leader's just-received image with a different locally-built one.
# Initialize Font Manager
font_time = time.time()
self.font_manager = FontManager(self.config)
@@ -110,7 +82,8 @@ class DisplayController:
logger.info("Display modes initialized in %.3f seconds", time.time() - init_time)
self.force_change = False
self._next_live_priority_check = 0.0 # monotonic timestamp for throttled live priority checks
# All sports and content managers now handled via plugins
logger.info("All sports and content managers now handled via plugin system")
@@ -423,64 +396,20 @@ class DisplayController:
# Set up live priority checker
self.vegas_coordinator.set_live_priority_checker(self._check_live_priority)
# Set up interrupt checker for on-demand/wifi status and follower mode
def _vegas_interrupt():
return self._check_vegas_interrupt() or self.sync_manager.is_follower_active()
# Set up interrupt checker for on-demand/wifi status
self.vegas_coordinator.set_interrupt_checker(
_vegas_interrupt,
self._check_vegas_interrupt,
check_interval=10 # Check every 10 frames (~80ms at 125 FPS)
)
# Run plugin updates inside the Vegas loop so the inter-iteration
# gap is <1 ms (nothing left for _tick_plugin_updates() to do).
self.vegas_coordinator.set_update_callback(self._tick_plugin_updates)
# Wire multi-display sync into Vegas render pipeline
follower_pos = self.config.get("sync", {}).get("follower_position", "left")
self.vegas_coordinator.set_sync_manager(self.sync_manager, follower_pos)
# Set up plugin update tick to keep data fresh during Vegas mode
self.vegas_coordinator.set_update_tick(
self._tick_plugin_updates_for_vegas,
interval=1.0
)
logger.info("Vegas mode coordinator initialized")
# Follower does NOT build its own initial scroll image — the leader
# pushes its image via TCP as soon as set_on_follower_connected fires.
# A local build would create a different (wrong) image that could
# temporarily replace the leader's correct one.
# When the leader sends its scroll image (TCP), update our
# cached_array so both Pis have pixel-identical images.
import numpy as _np
def _on_leader_scroll_image(image):
vc = getattr(self, 'vegas_coordinator', None)
if vc and vc.render_pipeline:
rp = vc.render_pipeline
arr = _np.asarray(image.convert("RGB"), dtype=_np.uint8)
rp.scroll_helper.cached_image = image
rp.scroll_helper.cached_array = arr
rp.scroll_helper.total_scroll_width = image.width
self._follower_pending_new_image = False
logger.info(
"Sync: follower adopted leader scroll image %dx%d",
image.width, image.height,
)
self.sync_manager.set_on_scroll_image(_on_leader_scroll_image)
if self.sync_manager.role == SyncRole.LEADER:
# When a follower first connects, push the current scroll image so
# the follower doesn't have to wait for the next new_cycle event.
# Polls until the image is ready (Vegas may still be composing on startup).
def _on_follower_connected():
import time as _t
for _ in range(300): # up to 30s
vc = getattr(self, 'vegas_coordinator', None)
if vc and vc.render_pipeline:
img = vc.render_pipeline.scroll_helper.cached_image
if img is not None:
self.sync_manager.send_scroll_image(img)
return
_t.sleep(0.1)
logger.warning("Sync: no scroll image available to push to new follower")
self.sync_manager.set_on_follower_connected(_on_follower_connected)
except Exception as e:
logger.error("Failed to initialize Vegas mode: %s", e, exc_info=True)
self.vegas_coordinator = None
@@ -515,16 +444,51 @@ class DisplayController:
return False
def _tick_plugin_updates_for_vegas(self):
"""
Run scheduled plugin updates and return IDs of plugins that were updated.
Called periodically by the Vegas coordinator to keep plugin data fresh
during Vegas mode. Returns a list of plugin IDs whose data changed so
Vegas can refresh their content in the scroll.
Returns:
List of updated plugin IDs, or None if no updates occurred
"""
if not self.plugin_manager or not hasattr(self.plugin_manager, 'plugin_last_update'):
self._tick_plugin_updates()
return None
# Snapshot update timestamps before ticking
old_times = dict(self.plugin_manager.plugin_last_update)
# Run the scheduled updates
self._tick_plugin_updates()
# Detect which plugins were actually updated
updated = []
for plugin_id, new_time in self.plugin_manager.plugin_last_update.items():
if new_time > old_times.get(plugin_id, 0.0):
updated.append(plugin_id)
if updated:
logger.info("Vegas update tick: %d plugin(s) updated: %s", len(updated), updated)
return updated or None
def _check_schedule(self):
"""Check if display should be active based on schedule."""
schedule_config = self.config.get('schedule', {})
# Get fresh config from config_service to support hot-reload
current_config = self.config_service.get_config()
schedule_config = current_config.get('schedule', {})
# If schedule config doesn't exist or is empty, default to always active
if not schedule_config:
self.is_display_active = True
self._was_display_active = True # Track previous state for schedule change detection
return
# Check if schedule is explicitly disabled
# Default to True (schedule enabled) if 'enabled' key is missing for backward compatibility
if 'enabled' in schedule_config and not schedule_config.get('enabled', True):
@@ -534,7 +498,7 @@ class DisplayController:
return
# Get configured timezone, default to UTC
timezone_str = self.config.get('timezone', 'UTC')
timezone_str = current_config.get('timezone', 'UTC')
try:
tz = pytz.timezone(timezone_str)
except pytz.UnknownTimeZoneError:
@@ -632,15 +596,18 @@ class DisplayController:
Target brightness level (dim_brightness if in dim period,
normal brightness otherwise)
"""
# Get fresh config from config_service to support hot-reload
current_config = self.config_service.get_config()
# Get normal brightness from config
normal_brightness = self.config.get('display', {}).get('hardware', {}).get('brightness', 90)
normal_brightness = current_config.get('display', {}).get('hardware', {}).get('brightness', 90)
# If display is OFF via schedule, don't process dim schedule
if not self.is_display_active:
self.is_dimmed = False
return normal_brightness
dim_config = self.config.get('dim_schedule', {})
dim_config = current_config.get('dim_schedule', {})
# If dim schedule doesn't exist or is disabled, use normal brightness
if not dim_config or not dim_config.get('enabled', False):
@@ -648,7 +615,7 @@ class DisplayController:
return normal_brightness
# Get configured timezone
timezone_str = self.config.get('timezone', 'UTC')
timezone_str = current_config.get('timezone', 'UTC')
try:
tz = pytz.timezone(timezone_str)
except pytz.UnknownTimeZoneError:
@@ -755,83 +722,21 @@ class DisplayController:
except Exception: # pylint: disable=broad-except
logger.exception("Error running scheduled plugin updates")
_FOLLOWER_SEND_INTERVAL = 1.0 / 90 # raw bytes are cheap; 90fps > follower render rate
def _tick_plugin_updates_throttled(self, min_interval: float = 0.0):
"""Throttled version of _tick_plugin_updates for high-FPS loops.
def _follower_rebuild_scroll_image(self) -> None:
"""Follower: rebuild the local Vegas scroll image so both Pis render from
the same fresh plugin data. Called at startup (after Vegas initializes)
and each time the leader broadcasts a new-cycle signal. Runs in a daemon
thread so it never blocks the 60fps render loop.
Args:
min_interval: Minimum seconds between calls. When <= 0 the
call passes straight through to _tick_plugin_updates so
plugin-configured update_interval values are never capped.
"""
try:
vc = getattr(self, 'vegas_coordinator', None)
if not vc:
logger.warning("Sync: follower has no vegas_coordinator — cannot build scroll image")
return
rp = vc.render_pipeline
if not rp:
logger.warning("Sync: follower vegas_coordinator has no render_pipeline")
return
logger.info("Sync: follower starting scroll image rebuild")
ok = rp.start_new_cycle()
if ok and rp.scroll_helper.cached_image is not None:
logger.info(
"Sync: follower scroll image ready — %dx%d",
rp.scroll_helper.cached_image.width,
rp.scroll_helper.cached_image.height,
)
else:
logger.warning(
"Sync: follower scroll image rebuild FAILED (ok=%s, cached=%s)",
ok, rp.scroll_helper.cached_image is not None,
)
except Exception as exc:
logger.warning("Sync: follower scroll image rebuild error: %s", exc, exc_info=True)
def _send_follower_frame(self, plugin_instance) -> None:
"""Leader: generate and send the follower's portion of the current frame.
The follower is physically to the LEFT of the leader in a right-to-left
scrolling ticker, so it shows content at scroll_position - display_width
(content that already scrolled off the leader's left edge).
Set sync.follower_position = "right" in config to invert this.
"""
if not (self.sync_manager and self.sync_manager.role == SyncRole.LEADER):
if min_interval <= 0:
self._tick_plugin_updates()
return
# Throttle to ~90fps via _FOLLOWER_SEND_INTERVAL — raw RGB bytes, no encode/decode
now = time.time()
if now - getattr(self, '_last_follower_send', 0) < self._FOLLOWER_SEND_INTERVAL:
return
self._last_follower_send = now
follower_frame = None
width = self.display_manager.width
sync_cfg = self.config.get("sync", {})
sign = -1 if sync_cfg.get("follower_position", "left") == "left" else 1
offset = sign * width
# 1. Explicit hook — plugin opted in with get_offset_frame()
try:
follower_frame = plugin_instance.get_offset_frame(offset)
except AttributeError:
pass # Most plugins don't implement get_offset_frame; that's expected
# 2. Auto-detect — plugin has a scroll_helper (standard pattern for all
# scroll plugins). Works with zero plugin code changes.
if follower_frame is None:
try:
scroll_h = getattr(plugin_instance, 'scroll_helper', None)
if scroll_h is not None:
follower_frame = scroll_h.get_portion_at(scroll_h.scroll_position + offset)
except Exception: # nosec B110 - scroll_helper.get_portion_at is optional; skip on error
pass
# 3. Mirror fallback — static plugins (clock, weather) show same frame
if follower_frame is None:
follower_frame = self.display_manager.image
if follower_frame is not None:
self.sync_manager.send_frame(follower_frame)
if now - self._last_plugin_tick_time >= min_interval:
self._last_plugin_tick_time = now
self._tick_plugin_updates()
def _sleep_with_plugin_updates(self, duration: float, tick_interval: float = 1.0):
"""Sleep while continuing to service plugin update schedules."""
@@ -1468,88 +1373,6 @@ class DisplayController:
# Plugins update on their own schedules - no forced sync updates needed
# Each plugin has its own update_interval and background services
# Multi-display sync: follower mode — render frames received from leader.
# Plugin update() threads still run (via _tick_plugin_updates above) so
# data is fresh when we return to standalone if the leader goes offline.
if self.sync_manager.is_follower_active():
# Dead-reckoning follower render:
# Advance local position at configured speed each tick; snap or
# gently correct toward received scroll_x to absorb UDP jitter.
_now_dr = time.perf_counter()
_dt = _now_dr - getattr(self, '_follower_dr_last_t', _now_dr)
self._follower_dr_last_t = _now_dr
vc = getattr(self, 'vegas_coordinator', None)
rp = vc.render_pipeline if (vc and vc.render_pipeline) else None
width = self.display_manager.width
# Advance local position at Vegas scroll speed (px/s → px/tick)
vegas_speed = (
self.config.get('display', {})
.get('vegas_scroll', {})
.get('scroll_speed', 75)
)
local_x = getattr(self, '_follower_local_x', None)
if local_x is None:
local_x = float(width) # safe start (past pre-roll guard)
local_x += vegas_speed * _dt
# Pull latest position from leader (may be None if no packet yet)
scroll_x = self.sync_manager.get_latest_scroll_x()
if scroll_x is not None:
diff = scroll_x - local_x
total_w = (
rp.scroll_helper.total_scroll_width
if rp and rp.scroll_helper.total_scroll_width
else width * 4
)
if abs(diff) > total_w * 0.5:
# Large jump → cycle reset, snap immediately
local_x = float(scroll_x)
self._follower_pending_new_image = True
elif abs(diff) > 10:
# Moderate drift → 20% correction per tick
local_x += diff * 0.20
else:
# Near → gentle 5% correction
local_x += diff * 0.05
self._follower_local_x = local_x
if rp and rp.scroll_helper.cached_image is not None:
sync_cfg = self.config.get("sync", {})
sign = -1 if sync_cfg.get("follower_position", "left") == "left" else 1
# Hold last frame until TCP image arrives after cycle reset
if not getattr(self, "_follower_pending_new_image", False):
if local_x >= width:
rp.scroll_helper.scroll_position = local_x + sign * width
frame = rp.scroll_helper.get_visible_portion()
if frame is not None:
self._follower_last_frame = frame
elif scroll_x is None:
# Fallback: pixel frame before first scroll_x arrives
frame = self.sync_manager.get_latest_frame()
if frame is not None:
self._follower_last_frame = frame
display_frame = getattr(self, '_follower_last_frame', None)
if display_frame is not None:
self.display_manager.image = display_frame
self.display_manager._sync_render_allowed = True
self.display_manager.update_display()
self.display_manager._sync_render_allowed = False
# Precision deadline timer — keeps render at exactly 60fps
_deadline = getattr(self, '_follower_deadline', None)
_now = time.perf_counter()
if _deadline is None or _now > _deadline + 0.1:
_deadline = _now
_deadline += 1.0 / 60
self._follower_deadline = _deadline
_sleep = _deadline - time.perf_counter()
if _sleep > 0:
time.sleep(_sleep)
continue
# Process any deferred updates that may have accumulated
# This also cleans up expired updates to prevent memory leaks
self.display_manager.process_deferred_updates()
@@ -1923,7 +1746,7 @@ class DisplayController:
)
target_duration = max_duration
start_time = time.time()
start_time = time.monotonic()
def _should_exit_dynamic(elapsed_time: float) -> bool:
if not dynamic_enabled:
@@ -1982,19 +1805,34 @@ class DisplayController:
except Exception: # pylint: disable=broad-except
logger.exception("Error during display update")
# Multi-display sync: send follower frame after each render
self._send_follower_frame(manager_to_display)
time.sleep(display_interval)
self._tick_plugin_updates()
self._tick_plugin_updates_throttled(min_interval=1.0)
self._poll_on_demand_requests()
self._check_on_demand_expiration()
# Check for live priority every ~30s so live
# games can interrupt long display durations
elapsed = time.monotonic() - start_time
now = time.monotonic()
if not self.on_demand_active and now >= self._next_live_priority_check:
self._next_live_priority_check = now + 30.0
live_mode = self._check_live_priority()
if live_mode and live_mode != active_mode:
logger.info("Live priority detected during high-FPS loop: %s", live_mode)
self.current_display_mode = live_mode
self.force_change = True
try:
self.current_mode_index = self.available_modes.index(live_mode)
except ValueError:
pass
# continue the main while loop to skip
# post-loop rotation/sleep logic
break
if self.current_display_mode != active_mode:
logger.debug("Mode changed during high-FPS loop, breaking early")
break
elapsed = time.time() - start_time
if elapsed >= target_duration:
logger.debug(
"Reached high-FPS target duration %.2fs for mode %s",
@@ -2024,7 +1862,7 @@ class DisplayController:
time.sleep(display_interval)
self._tick_plugin_updates()
elapsed = time.time() - start_time
elapsed = time.monotonic() - start_time
if elapsed >= target_duration:
logger.debug(
"Reached standard target duration %.2fs for mode %s",
@@ -2051,11 +1889,25 @@ class DisplayController:
except Exception: # pylint: disable=broad-except
logger.exception("Error during display update")
# Multi-display sync: send follower frame after each render
self._send_follower_frame(manager_to_display)
self._poll_on_demand_requests()
self._check_on_demand_expiration()
# Check for live priority every ~30s so live
# games can interrupt long display durations
now = time.monotonic()
if not self.on_demand_active and now >= self._next_live_priority_check:
self._next_live_priority_check = now + 30.0
live_mode = self._check_live_priority()
if live_mode and live_mode != active_mode:
logger.info("Live priority detected during display loop: %s", live_mode)
self.current_display_mode = live_mode
self.force_change = True
try:
self.current_mode_index = self.available_modes.index(live_mode)
except ValueError:
pass
break
if self.current_display_mode != active_mode:
logger.info("Mode changed during display loop from %s to %s, breaking early", active_mode, self.current_display_mode)
break
@@ -2069,19 +1921,26 @@ class DisplayController:
loop_completed = True
break
# If live priority preempted the display loop, skip
# all post-loop logic (remaining sleep, rotation) and
# restart the main loop so the live mode displays
# immediately.
if self.current_display_mode != active_mode:
continue
# Ensure we honour minimum duration when not dynamic and loop ended early
if (
not dynamic_enabled
and not loop_completed
and not needs_high_fps
):
elapsed = time.time() - start_time
elapsed = time.monotonic() - start_time
remaining_sleep = max(0.0, max_duration - elapsed)
if remaining_sleep > 0:
self._sleep_with_plugin_updates(remaining_sleep)
if dynamic_enabled:
elapsed_total = time.time() - start_time
elapsed_total = time.monotonic() - start_time
cycle_done = self._plugin_cycle_complete(manager_to_display)
# Log cycle completion status and metrics

View File

@@ -1,14 +1,11 @@
import json
import os
import tempfile
if os.getenv("EMULATOR", "false") == "true":
from RGBMatrixEmulator import RGBMatrix, RGBMatrixOptions
else:
from rgbmatrix import RGBMatrix, RGBMatrixOptions
from contextlib import contextmanager
from PIL import Image, ImageDraw, ImageFont
import time
from typing import Dict, Any, List
from typing import Dict, Any, List, Tuple
import logging
import math
import freetype
@@ -31,10 +28,8 @@ class DisplayManager:
self.config = config or {}
self._force_fallback = force_fallback
self._suppress_test_pattern = suppress_test_pattern
# When True, update_display() and clear() skip hardware writes (used during off-screen content capture)
self._capture_mode_active = False
# Snapshot settings for web preview integration (service writes, web reads)
self._snapshot_path = "/tmp/led_matrix_preview.png" # nosec B108 - fixed path intentional; web UI reads same path
self._snapshot_path = "/tmp/led_matrix_preview.png"
self._snapshot_min_interval_sec = 0.2 # max ~5 fps
self._last_snapshot_ts = 0.0
@@ -60,7 +55,8 @@ class DisplayManager:
def _setup_matrix(self):
"""Initialize the RGB matrix with configuration settings."""
_init_error_str = None
setup_start = time.time()
try:
# Allow callers (e.g., web UI) to force non-hardware fallback mode
if getattr(self, '_force_fallback', False):
@@ -90,7 +86,7 @@ class DisplayManager:
options.disable_hardware_pulsing = hardware_config.get('disable_hardware_pulsing', False)
options.show_refresh_rate = hardware_config.get('show_refresh_rate', False)
options.limit_refresh_rate_hz = hardware_config.get('limit_refresh_rate_hz', 90)
options.gpio_slowdown = runtime_config.get('gpio_slowdown', 3)
options.gpio_slowdown = runtime_config.get('gpio_slowdown', 2)
# Disable internal privilege dropping - we manage this via systemd or remain root
# This prevents the library from dropping to 'daemon' user which breaks file permissions
@@ -103,18 +99,6 @@ class DisplayManager:
options.pwm_dither_bits = hardware_config.get('pwm_dither_bits')
if 'inverse_colors' in hardware_config:
options.inverse_colors = hardware_config.get('inverse_colors')
# Pi 5 only: 0=PIO/RP1 coprocessor (default, less CPU),
# 1=RIO/Registered IO (faster; gpio_slowdown effect is inverted in this mode)
if 'rp1_rio' in runtime_config:
if hasattr(options, 'rp1_rio'):
options.rp1_rio = runtime_config.get('rp1_rio')
else:
logger.warning(
"rp1_rio is set in config but the installed rgbmatrix library does "
"not support it — the library was likely built without Pi 5 RP1 "
"support (mmap to 0x3f000000 instead of RP1 chip). "
"Fix: sudo RPI_RGB_FORCE_REBUILD=1 ./first_time_install.sh"
)
logger.info(f"Initializing RGB Matrix with settings: rows={options.rows}, cols={options.cols}, chain_length={options.chain_length}, parallel={options.parallel}, hardware_mapping={options.hardware_mapping}")
@@ -145,7 +129,6 @@ class DisplayManager:
self._draw_test_pattern()
except Exception as e:
_init_error_str = str(e)
logger.error(f"Failed to initialize RGB Matrix: {e}", exc_info=True)
# Create a fallback image for web preview using configured dimensions when available
self.matrix = None
@@ -166,41 +149,12 @@ class DisplayManager:
self.draw.rectangle([0, 0, fallback_width - 1, fallback_height - 1], outline=(255, 0, 0))
self.draw.line([0, 0, fallback_width - 1, fallback_height - 1], fill=(0, 255, 0))
self.draw.text((2, max(0, (fallback_height // 2) - 4)), "Simulation", fill=(0, 128, 255))
except Exception: # nosec B110 - best-effort fallback visualization; drawing errors must not crash startup
except Exception:
# Best-effort; ignore drawing errors in fallback
pass
logger.error(
f"Matrix initialization failed — running in fallback/simulation mode "
f"(size {fallback_width}x{fallback_height}). Error: {e}. "
"On Raspberry Pi 5: ensure rpi-rgb-led-matrix was built from the latest "
"submodule (re-run first_time_install.sh). gpio_slowdown of 23 is typical for Pi 5 PIO mode."
)
logger.error(f"Matrix initialization failed, using fallback mode with size {fallback_width}x{fallback_height}. Error: {e}")
# Do not raise here; allow fallback mode so web preview and non-hardware environments work
# Write hardware status file so the web UI can surface init failures
_hw_status = {"ok": self.matrix is not None, "error": _init_error_str}
_status_path = "/tmp/led_matrix_hw_status.json" # nosec B108
try:
if os.path.islink(_status_path):
logger.warning("Skipping hardware status write: %s is a symlink", _status_path)
else:
_fd, _tmp_path = tempfile.mkstemp(dir="/tmp", prefix=".led_hw_") # nosec B108
try:
with os.fdopen(_fd, "w") as _f:
json.dump(_hw_status, _f)
_f.flush()
os.fsync(_f.fileno())
os.chmod(_tmp_path, 0o600)
os.replace(_tmp_path, _status_path)
except Exception:
try:
os.unlink(_tmp_path)
except OSError:
pass
raise
except Exception:
logger.error("Failed to write hardware status file", exc_info=True)
@property
def width(self):
"""Get the display width."""
@@ -301,22 +255,6 @@ class DisplayManager:
except Exception as e:
logger.error(f"Error drawing test pattern: {e}", exc_info=True)
@contextmanager
def capture_mode(self):
"""Suppress hardware output during off-screen content capture.
Plugins call update_display() as part of their normal display() flow.
When fetching content for Vegas mode the render loop is still running,
so any incidental hardware write causes a visible flash on the matrix.
Entering this context prevents those writes without affecting the PIL
image buffer, which the adapter reads to extract content.
"""
self._capture_mode_active = True
try:
yield
finally:
self._capture_mode_active = False
def update_display(self):
"""Update the display using double buffering with proper sync."""
try:
@@ -326,13 +264,10 @@ class DisplayManager:
# Still write a snapshot so the web UI can preview
self._write_snapshot_if_due()
return
if self._capture_mode_active:
return # Skip hardware write — content is being captured off-screen
# Copy the current image to the offscreen canvas
# Copy the current image to the offscreen canvas
self.offscreen_canvas.SetImage(self.image)
# Swap buffers immediately
self.matrix.SwapOnVSync(self.offscreen_canvas)
@@ -369,23 +304,21 @@ class DisplayManager:
# Create a new black image
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image)
if not self._capture_mode_active:
# Clear both canvases and the underlying matrix to ensure no artifacts.
# Failures are non-fatal — the image buffer is already black above, so
# the next update_display() call will push clean content regardless.
try:
self.offscreen_canvas.Clear()
except (RuntimeError, OSError) as e:
logger.error("Failed to clear offscreen canvas: %s", e)
try:
self.current_canvas.Clear()
except (RuntimeError, OSError) as e:
logger.error("Failed to clear current canvas: %s", e)
try:
self.matrix.Clear()
except (RuntimeError, OSError) as e:
logger.error("Failed to clear matrix front buffer: %s", e)
# Clear both canvases and the underlying matrix to ensure no artifacts
try:
self.offscreen_canvas.Clear()
except Exception:
pass
try:
self.current_canvas.Clear()
except Exception:
pass
try:
# Extra safety: clear the matrix front buffer as well
self.matrix.Clear()
except Exception:
pass
# Note: We do NOT call update_display() here to avoid black flashes.
# The caller should call update_display() after drawing new content.
@@ -781,8 +714,8 @@ class DisplayManager:
try:
self.image = Image.new('RGB', (self.width, self.height))
self.draw = ImageDraw.Draw(self.image)
except (OSError, RuntimeError, ValueError, MemoryError):
logger.debug("Canvas reset during cleanup failed", exc_info=True)
except Exception:
pass
# Reset the singleton state when cleaning up
DisplayManager._instance = None
DisplayManager._initialized = False
@@ -939,7 +872,7 @@ class DisplayManager:
# Never modify /tmp permissions - it has special system permissions (1777)
# that must not be changed or it breaks apt and other system tools
parent_dir = snapshot_path_obj.parent
if parent_dir and str(parent_dir) != '/tmp': # nosec B108 - guard to skip /tmp for permission ops
if parent_dir and str(parent_dir) != '/tmp':
ensure_directory_permissions(parent_dir, get_assets_dir_mode())
# Write atomically: temp then replace
tmp_path = f"{self._snapshot_path}.tmp"

View File

@@ -19,7 +19,8 @@ Usage:
import logging
import time
import requests
from typing import Dict, List
from typing import Dict, List, Set, Optional, Any
from datetime import datetime, timezone
logger = logging.getLogger(__name__)

View File

@@ -3,14 +3,15 @@ import logging
import freetype
import json
import hashlib
import urllib.parse
import urllib.request
import zipfile
import tempfile
import shutil
import time
from pathlib import Path
from PIL import ImageFont
from typing import Dict, Tuple, Optional, Union, Any, List
from functools import lru_cache
logger = logging.getLogger(__name__)
@@ -266,12 +267,9 @@ class FontManager:
logger.info(f"Using cached font: {cache_path}")
return str(cache_path)
# Download font — restrict to http/https to prevent file:// reads
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ('http', 'https'):
raise ValueError(f"Font URL must use http or https, got: {parsed.scheme!r}")
# Download font
logger.info(f"Downloading font from {url}")
urllib.request.urlretrieve(url, cache_path) # nosec B310 - scheme validated above
urllib.request.urlretrieve(url, cache_path)
# Handle zip files
if url.endswith('.zip'):
@@ -701,6 +699,8 @@ class FontManager:
fonts_dir = Path("assets/fonts")
ensure_directory_permissions(fonts_dir, get_assets_dir_mode())
target_path = os.path.join(fonts_dir, f"{family_name}.{font_file_path.rsplit('.', 1)[-1]}")
# Add to catalog
self.font_catalog[family_name] = font_file_path
self.clear_cache()
@@ -746,11 +746,11 @@ class FontManager:
if font_path.endswith('.bdf'):
# Try to load BDF font
freetype.Face(font_path)
face = freetype.Face(font_path)
return {"valid": True, "type": "bdf", "family": "unknown"}
elif font_path.endswith('.ttf'):
# Try to load TTF font
ImageFont.truetype(font_path, 12)
font = ImageFont.truetype(font_path, 12)
return {"valid": True, "type": "ttf", "family": "unknown"}
else:
return {"valid": False, "error": "Unsupported font format"}

View File

@@ -1,6 +1,7 @@
import os
import time
import freetype
from PIL import ImageDraw, ImageFont
from PIL import Image, ImageDraw, ImageFont
import logging
from typing import Dict, Any
from src.display_manager import DisplayManager
@@ -72,6 +73,7 @@ class FontTestManager:
def update(self):
"""No update needed for static display."""
pass
def display(self, force_clear: bool = False):
"""Display the font with sample text."""
@@ -79,6 +81,10 @@ class FontTestManager:
# Clear the display
self.display_manager.clear()
# Get display dimensions
width = self.display_manager.matrix.width
height = self.display_manager.matrix.height
# Draw font name at the top
self.display_manager.draw_text(self.current_config['display_name'], y=2, color=(255, 255, 255))

View File

@@ -7,6 +7,7 @@ version of BackgroundCacheMixin that works for weather, stocks, news, etc.
"""
import time
import logging
from typing import Dict, Optional, Any, Callable

View File

@@ -6,8 +6,9 @@ Handles custom layouts, element positioning, and display composition.
import json
import os
import logging
from typing import Dict, List, Any
from typing import Dict, List, Any, Tuple
from datetime import datetime
from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)

View File

@@ -11,7 +11,7 @@ import time
import logging
import requests
import json
from typing import Dict, List, Optional, Tuple
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from requests.adapters import HTTPAdapter
@@ -191,7 +191,7 @@ class LogoDownloader:
return True
except PermissionError:
logger.error(f"Permission denied: Cannot write to directory {path}")
logger.error("Please run: sudo ./scripts/fix_perms/fix_assets_permissions.sh")
logger.error(f"Please run: sudo ./scripts/fix_perms/fix_assets_permissions.sh")
return False
except Exception as e:
logger.error(f"Failed to test write access to directory {path}: {e}")
@@ -248,7 +248,7 @@ class LogoDownloader:
except PermissionError as e:
logger.error(f"Permission denied downloading logo for {team_abbreviation}: {e}")
logger.error("Please run: sudo ./scripts/fix_perms/fix_assets_permissions.sh")
logger.error(f"Please run: sudo ./scripts/fix_perms/fix_assets_permissions.sh")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download logo for {team_abbreviation}: {e}")

View File

@@ -11,7 +11,7 @@ Builds on existing PluginHealthTracker to provide:
import threading
import time
from typing import Dict, Any, Optional, List, Callable
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum
from dataclasses import dataclass

View File

@@ -7,8 +7,9 @@ status tracking and cancellation support.
import threading
import queue
import time
from typing import Dict, Optional, List, Callable, Any
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
import json

View File

@@ -7,7 +7,7 @@ and their associated data structures.
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from datetime import datetime
import uuid

View File

@@ -6,8 +6,9 @@ error isolation, and performance monitoring.
"""
import time
from typing import Any, Optional, Callable
from threading import Thread
import signal
from typing import Any, Optional, Dict, Callable
from threading import Thread, Event
import logging
from src.exceptions import PluginError
@@ -15,8 +16,9 @@ from src.logging_config import get_logger
from src.error_aggregator import record_error
class PluginTimeoutError(Exception):
class TimeoutError(Exception):
"""Raised when a plugin operation times out."""
pass
class PluginExecutor:
@@ -55,7 +57,7 @@ class PluginExecutor:
Result of operation
Raises:
PluginTimeoutError: If operation times out
TimeoutError: If operation times out
PluginError: If operation raises an exception
"""
timeout = timeout or self.default_timeout
@@ -79,7 +81,7 @@ class PluginExecutor:
if not result_container['completed']:
error_msg = f"{plugin_context} operation timed out after {timeout}s"
self.logger.error(error_msg)
timeout_error = PluginTimeoutError(error_msg)
timeout_error = TimeoutError(error_msg)
record_error(timeout_error, plugin_id=plugin_id, operation="timeout")
raise timeout_error
@@ -126,7 +128,7 @@ class PluginExecutor:
)
return True
except PluginTimeoutError:
except TimeoutError:
self.logger.error("Plugin %s update() timed out", plugin_id)
return False
except PluginError:
@@ -202,7 +204,7 @@ class PluginExecutor:
# For backward compatibility: if plugin returns None or something else, treat as success
self.logger.debug(f"Plugin {plugin_id} display() returned non-boolean: {result}, treating as True")
return True
except PluginTimeoutError:
except TimeoutError:
self.logger.error("Plugin %s display() timed out", plugin_id)
return False
except PluginError:
@@ -245,7 +247,7 @@ class PluginExecutor:
timeout=timeout,
plugin_id=plugin_id
)
except Exception as e: # covers PluginTimeoutError, PluginError, and unexpected errors
except (TimeoutError, PluginError, Exception) as e:
self.logger.warning(
"Plugin %s %s failed, using default return: %s",
plugin_id,

View File

@@ -7,7 +7,10 @@ Handles dynamic plugin loading from the plugins/ directory.
API Version: 1.0.0
"""
import os
import json
import importlib
import importlib.util
import sys
import subprocess
import time

View File

@@ -20,6 +20,7 @@ except ImportError:
class ResourceLimitExceeded(Exception):
"""Raised when a plugin exceeds its resource limits."""
pass
@dataclass
@@ -227,7 +228,7 @@ class PluginResourceMonitor:
except ResourceLimitExceeded:
raise
except Exception:
except Exception as e:
# Still record execution time even on error
execution_time = time.time() - start_time
with self._lock:

View File

@@ -5,6 +5,7 @@ Manages saved GitHub repository URLs for easy plugin discovery and installation.
"""
import json
import os
import logging
from pathlib import Path
from typing import List, Dict, Optional

View File

@@ -8,6 +8,7 @@ Provides utilities for extracting defaults, validating configurations, and manag
import copy
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import jsonschema

View File

@@ -8,12 +8,12 @@ Detects and fixes inconsistencies between:
- State manager state
"""
from typing import Dict, Any, List, Set
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from src.plugin_system.state_manager import PluginStateManager
from src.plugin_system.state_manager import PluginStateManager, PluginState, PluginStateStatus
from src.logging_config import get_logger
@@ -234,7 +234,7 @@ class StateReconciliation:
'version': manifest.get('version'),
'name': manifest.get('name')
}
except Exception: # nosec B110 - corrupt/unreadable manifest; skip this plugin, outer except logs
except Exception:
pass
except Exception as e:
self.logger.warning(f"Error reading disk state: {e}")
@@ -285,6 +285,7 @@ class StateReconciliation:
config = config_state.get(plugin_id, {})
disk = disk_state.get(plugin_id, {})
manager = manager_state.get(plugin_id, {})
state_mgr = state_manager_state.get(plugin_id, {})
# Check: Plugin exists on disk but not in config

View File

@@ -10,7 +10,6 @@ import json
import stat
import subprocess
import shutil
import threading
import zipfile
import tempfile
import requests
@@ -24,6 +23,7 @@ import logging
from src.common.permission_utils import sudo_remove_directory
try:
import jsonschema
from jsonschema import Draft7Validator, ValidationError
JSONSCHEMA_AVAILABLE = True
except ImportError:
@@ -101,10 +101,6 @@ class PluginStoreManager:
# handlers. Bumping the cached-entry timestamp on failure serves
# the stale payload cheaply until the backoff expires.
self._failure_backoff_seconds = 60
# Prevents concurrent callers from each firing a network request when
# the registry cache expires. Only one thread fetches; others wait and
# then get the result from the warm cache (double-checked locking).
self._registry_fetch_lock = threading.Lock()
# Ensure plugins directory exists
self.plugins_dir.mkdir(exist_ok=True)
@@ -437,9 +433,9 @@ class PluginStoreManager:
return stale
if not self.github_token:
self.logger.warning(
"GitHub API rate limit likely exceeded (403). "
"Add a GitHub personal access token to config/config_secrets.json "
"under 'github.api_token' to increase rate limits from 60 to 5000/hour."
f"GitHub API rate limit likely exceeded (403). "
f"Add a GitHub personal access token to config/config_secrets.json "
f"under 'github.api_token' to increase rate limits from 60 to 5000/hour."
)
else:
self.logger.warning(
@@ -580,50 +576,41 @@ class PluginStoreManager:
(current_time - self.registry_cache_time) < self.registry_cache_timeout):
return self.registry_cache
with self._registry_fetch_lock:
# Re-check inside the lock — a concurrent caller that was waiting
# may have already populated the cache while we blocked.
current_time = time.time()
if (self.registry_cache and self.registry_cache_time and
not force_refresh and
(current_time - self.registry_cache_time) < self.registry_cache_timeout):
try:
self.logger.info(f"Fetching plugin registry from {self.REGISTRY_URL}")
response = self._http_get_with_retries(self.REGISTRY_URL, timeout=10)
response.raise_for_status()
self.registry_cache = response.json()
self.registry_cache_time = current_time
self.logger.info(f"Fetched registry with {len(self.registry_cache.get('plugins', []))} plugins")
return self.registry_cache
except requests.RequestException as e:
self.logger.error(f"Error fetching registry: {e}")
if raise_on_failure:
raise
# Prefer stale cache over an empty list so the plugin list UI
# keeps working on a flaky connection (e.g. Pi on WiFi). Bump
# registry_cache_time into a short backoff window so the next
# request serves the stale payload cheaply instead of
# re-hitting the network on every request (matches the
# pattern used by github_cache / commit_info_cache).
if self.registry_cache:
self.logger.warning("Falling back to stale registry cache")
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
try:
self.logger.info(f"Fetching plugin registry from {self.REGISTRY_URL}")
response = self._http_get_with_retries(self.REGISTRY_URL, timeout=10)
response.raise_for_status()
self.registry_cache = response.json()
self.registry_cache_time = current_time
self.logger.info(f"Fetched registry with {len(self.registry_cache.get('plugins', []))} plugins")
return {"plugins": []}
except json.JSONDecodeError as e:
self.logger.error(f"Error parsing registry JSON: {e}")
if raise_on_failure:
raise
if self.registry_cache:
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
except requests.RequestException as e:
self.logger.error(f"Error fetching registry: {e}")
if raise_on_failure:
raise
# Prefer stale cache over an empty list so the plugin list UI
# keeps working on a flaky connection (e.g. Pi on WiFi). Bump
# registry_cache_time into a short backoff window so the next
# request serves the stale payload cheaply instead of
# re-hitting the network on every request (matches the
# pattern used by github_cache / commit_info_cache).
if self.registry_cache:
self.logger.warning("Falling back to stale registry cache")
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
return {"plugins": []}
except json.JSONDecodeError as e:
self.logger.error(f"Error parsing registry JSON: {e}")
if raise_on_failure:
raise
if self.registry_cache:
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
return {"plugins": []}
return {"plugins": []}
def search_plugins(self, query: str = "", category: str = "", tags: List[str] = None, fetch_commit_info: bool = True, include_saved_repos: bool = True, saved_repositories_manager = None) -> List[Dict]:
"""
@@ -1091,7 +1078,7 @@ class PluginStoreManager:
# Get the actual plugin ID from manifest (source of truth)
manifest_plugin_id = manifest.get('id')
if not manifest_plugin_id:
self.logger.error("Plugin manifest missing 'id' field")
self.logger.error(f"Plugin manifest missing 'id' field")
self._safe_remove_directory(plugin_path)
return False
@@ -1742,7 +1729,7 @@ class PluginStoreManager:
try:
self.logger.info(f"Installing dependencies for {plugin_path.name}")
subprocess.run(
result = subprocess.run(
['pip3', 'install', '--break-system-packages', '-r', str(requirements_file)],
check=True,
capture_output=True,
@@ -1863,72 +1850,58 @@ class PluginStoreManager:
return cached[1]
try:
# .git may be a file (worktree / submodule) containing "gitdir: <path>".
# Resolve it to the actual git directory before reading any files.
try:
if git_dir.is_file():
pointer = git_dir.read_text(encoding='utf-8', errors='replace').strip()
if pointer.startswith('gitdir:'):
resolved = (plugin_path / pointer[len('gitdir:'):].strip()).resolve()
if resolved.is_dir():
git_dir = resolved
else:
return None
else:
return None
except (OSError, NotADirectoryError):
return None
# Read branch directly from .git/HEAD (no subprocess).
branch = ''
try:
head_text = (git_dir / 'HEAD').read_text(encoding='utf-8', errors='replace').strip()
if head_text.startswith('ref: refs/heads/'):
branch = head_text[len('ref: refs/heads/'):]
elif head_text.startswith('ref: '):
branch = head_text[len('ref: '):]
# else: detached HEAD — branch stays ''
except (OSError, NotADirectoryError):
pass
# Remote URL from .git/config — parse [remote "origin"] url line.
remote_url = None
try:
config_text = (git_dir / 'config').read_text(encoding='utf-8', errors='replace')
in_origin = False
for line in config_text.splitlines():
stripped = line.strip()
if stripped == '[remote "origin"]':
in_origin = True
elif stripped.startswith('['):
in_origin = False
elif in_origin and stripped.startswith('url') and '=' in stripped:
remote_url = stripped.split('=', 1)[1].strip()
break
except (OSError, NotADirectoryError):
pass
# Single subprocess: SHA + commit date in one call.
log_result = subprocess.run(
['git', '-C', str(plugin_path), 'log', '-1', '--format=%H%n%cI', 'HEAD'],
sha_result = subprocess.run(
['git', '-C', str(plugin_path), 'rev-parse', 'HEAD'],
capture_output=True,
text=True,
timeout=10,
check=True
)
lines = log_result.stdout.strip().splitlines()
sha = lines[0] if lines else ''
commit_date_iso = lines[1] if len(lines) > 1 else ''
sha = sha_result.stdout.strip()
branch_result = subprocess.run(
['git', '-C', str(plugin_path), 'rev-parse', '--abbrev-ref', 'HEAD'],
capture_output=True,
text=True,
timeout=10,
check=True
)
branch = branch_result.stdout.strip()
if branch == 'HEAD':
branch = ''
# Get remote URL
remote_url_result = subprocess.run(
['git', '-C', str(plugin_path), 'config', '--get', 'remote.origin.url'],
capture_output=True,
text=True,
timeout=10,
check=False
)
remote_url = remote_url_result.stdout.strip() if remote_url_result.returncode == 0 else None
# Get commit date in ISO format
date_result = subprocess.run(
['git', '-C', str(plugin_path), 'log', '-1', '--format=%cI', 'HEAD'],
capture_output=True,
text=True,
timeout=10,
check=True
)
commit_date_iso = date_result.stdout.strip()
result = {
'sha': sha,
'short_sha': sha[:7] if sha else '',
'branch': branch,
'branch': branch
}
# Add remote URL if available
if remote_url:
result['remote_url'] = remote_url
# Add commit date if available
if commit_date_iso:
result['date_iso'] = commit_date_iso
result['date'] = self._iso_to_date(commit_date_iso)
@@ -2417,7 +2390,7 @@ class PluginStoreManager:
if not plugin_info_remote:
self.logger.warning(f"Plugin {plugin_id} not found in registry and not a git repository; cannot update automatically")
if not repo_url:
self.logger.warning("Plugin may have been installed via ZIP download. Try reinstalling from GitHub URL to enable updates.")
self.logger.warning(f"Plugin may have been installed via ZIP download. Try reinstalling from GitHub URL to enable updates.")
return False
repo_url = plugin_info_remote.get('repo')

View File

@@ -6,6 +6,7 @@ and plugin_manager for use in plugin unit tests.
"""
from typing import Dict, Any, Optional
from unittest.mock import MagicMock
from PIL import Image

View File

@@ -16,7 +16,7 @@ import math
import os
import time
from pathlib import Path
from typing import Any, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from PIL import Image, ImageDraw, ImageFont
@@ -236,6 +236,7 @@ class VisualTestDisplayManager:
Replicated from DisplayManager._draw_bdf_text().
"""
try:
import freetype
if isinstance(color, list):
color = tuple(color)
face = font if font else self.calendar_font

View File

@@ -6,7 +6,8 @@ Fails fast with clear error messages to prevent runtime issues.
"""
import os
from typing import Any, List, Optional, Tuple
import logging
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
from src.exceptions import ConfigError, PluginError, CacheError
from src.logging_config import get_logger

View File

@@ -6,7 +6,7 @@ plugin ordering, exclusions, scroll speed, and display settings.
"""
import logging
from typing import Dict, Any, List, Set
from typing import Dict, Any, List, Set, Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)

View File

@@ -90,11 +90,13 @@ class VegasModeCoordinator:
self._interrupt_check: Optional[Callable[[], bool]] = None
self._interrupt_check_interval: int = 10 # Check every N frames
# Plugin update callback — fired from a background thread inside the loop
# so the main loop's _tick_plugin_updates() finds nothing due when Vegas
# returns, eliminating the inter-iteration frozen-frame gap.
self._update_callback: Optional[Callable[[], None]] = None
self._update_tick_running: bool = False
# Plugin update tick for keeping data fresh during Vegas mode
self._update_tick: Optional[Callable[[], Optional[List[str]]]] = None
self._update_tick_interval: float = 1.0 # Tick every 1 second
self._update_thread: Optional[threading.Thread] = None
self._update_results: Optional[List[str]] = None
self._update_results_lock = threading.Lock()
self._last_update_tick_time: float = 0.0
# Config update tracking
self._config_version = 0
@@ -137,25 +139,6 @@ class VegasModeCoordinator:
"""Check if Vegas mode is currently running."""
return self._is_active
def set_sync_manager(self, sync_manager, follower_position: str = "left") -> None:
"""
Attach a DisplaySyncManager so Vegas mode sends the follower's portion
of the ticker to the second display on every rendered frame.
Args:
sync_manager: DisplaySyncManager instance, or None to disable sync
follower_position: "left" (default) or "right" — physical position of
the follower display relative to the leader
"""
if self.render_pipeline:
# Don't expose a standalone (no-op) manager to the pipeline — treat it as None
if sync_manager is not None and hasattr(sync_manager, 'role'):
from src.common.sync_manager import SyncRole
if sync_manager.role == SyncRole.STANDALONE:
sync_manager = None
self.render_pipeline.sync_manager = sync_manager
self.render_pipeline.sync_follower_left = (follower_position == "left")
def set_live_priority_checker(self, checker: Callable[[], Optional[str]]) -> None:
"""
Set the callback for checking live priority content.
@@ -183,19 +166,24 @@ class VegasModeCoordinator:
self._interrupt_check = checker
self._interrupt_check_interval = max(1, check_interval)
def set_update_callback(self, callback: Callable[[], None]) -> None:
def set_update_tick(
self,
callback: Callable[[], Optional[List[str]]],
interval: float = 1.0
) -> None:
"""
Set a callback for running plugin updates from inside the Vegas loop.
Set the callback for periodic plugin update ticking during Vegas mode.
Fired in a daemon background thread every ~4 s so plugin data stays
fresh without blocking the render loop. The main loop's
_tick_plugin_updates() then finds all intervals already satisfied and
returns immediately, collapsing the inter-iteration gap to <1 ms.
This keeps plugin data fresh while the Vegas render loop is running.
The callback should run scheduled plugin updates and return a list of
plugin IDs that were actually updated, or None/empty if no updates occurred.
Args:
callback: Callable with no arguments (typically _tick_plugin_updates)
callback: Callable that returns list of updated plugin IDs or None
interval: Seconds between update tick calls (default 1.0)
"""
self._update_callback = callback
self._update_tick = callback
self._update_tick_interval = max(0.5, interval)
def start(self) -> bool:
"""
@@ -249,6 +237,9 @@ class VegasModeCoordinator:
self.stats['total_runtime_seconds'] += time.time() - self._start_time
self._start_time = None
# Wait for in-flight background update before tearing down state
self._drain_update_thread()
# Cleanup components
self.render_pipeline.reset()
self.stream_manager.reset()
@@ -344,101 +335,83 @@ class VegasModeCoordinator:
last_fps_log_time = start_time
fps_frame_count = 0
self._last_update_tick_time = start_time
logger.info("Starting Vegas iteration for %.1fs", duration)
while True:
# Check for STATIC mode plugin that should pause scroll
static_plugin = self._check_static_plugin_trigger()
if static_plugin:
if not self._handle_static_pause(static_plugin):
# Static pause was interrupted
return False
# After static pause, skip this segment and continue
self.stream_manager.get_next_segment() # Consume the segment
continue
# Run frame
if not self.run_frame():
# Check why we stopped
with self._state_lock:
if self._should_stop:
return False
if self._is_paused:
# Paused for live priority - let caller handle
try:
while True:
# Check for STATIC mode plugin that should pause scroll
static_plugin = self._check_static_plugin_trigger()
if static_plugin:
if not self._handle_static_pause(static_plugin):
# Static pause was interrupted
return False
# After static pause, skip this segment and continue
self.stream_manager.get_next_segment() # Consume the segment
continue
# Sleep for frame interval
time.sleep(frame_interval)
# Run frame
if not self.run_frame():
# Check why we stopped
with self._state_lock:
if self._should_stop:
return False
if self._is_paused:
# Paused for live priority - let caller handle
return False
# Increment frame count and check for interrupt periodically
frame_count += 1
fps_frame_count += 1
# Sleep for frame interval
time.sleep(frame_interval)
# Periodic FPS logging
current_time = time.time()
if current_time - last_fps_log_time >= fps_log_interval:
fps = fps_frame_count / (current_time - last_fps_log_time)
logger.info(
"Vegas FPS: %.1f (target: %d, frames: %d)",
fps, self.vegas_config.target_fps, fps_frame_count
)
last_fps_log_time = current_time
fps_frame_count = 0
# Increment frame count and check for interrupt periodically
frame_count += 1
fps_frame_count += 1
if (self._interrupt_check and
frame_count % self._interrupt_check_interval == 0):
try:
if self._interrupt_check():
logger.debug(
"Vegas interrupted by callback after %d frames",
frame_count
)
return False
except Exception:
# Log but don't let interrupt check errors stop Vegas
logger.exception("Interrupt check failed")
# Periodic FPS logging
current_time = time.time()
if current_time - last_fps_log_time >= fps_log_interval:
fps = fps_frame_count / (current_time - last_fps_log_time)
logger.info(
"Vegas FPS: %.1f (target: %d, frames: %d)",
fps, self.vegas_config.target_fps, fps_frame_count
)
last_fps_log_time = current_time
fps_frame_count = 0
# Fire plugin update tick in a background thread every ~4 s.
# Running it here (rather than only between iterations) means the
# main loop's _tick_plugin_updates() finds all intervals already
# satisfied on return, so the inter-iteration gap is <1 ms and the
# display never shows a frozen frame between iterations.
_UPDATE_TICK_FRAMES = max(1, int(self.vegas_config.target_fps * 4)) # every 4 s regardless of FPS
if (self._update_callback and
frame_count % _UPDATE_TICK_FRAMES == 0 and
not self._update_tick_running):
self._update_tick_running = True
def _run_tick(cb=self._update_callback):
# Periodic plugin update tick to keep data fresh (non-blocking)
self._drive_background_updates()
if (self._interrupt_check and
frame_count % self._interrupt_check_interval == 0):
try:
cb()
finally:
self._update_tick_running = False
threading.Thread(
target=_run_tick, daemon=True, name="vegas-plugin-tick"
).start()
if self._interrupt_check():
logger.debug(
"Vegas interrupted by callback after %d frames",
frame_count
)
return False
except Exception:
# Log but don't let interrupt check errors stop Vegas
logger.exception("Interrupt check failed")
# Check elapsed time
elapsed = time.time() - start_time
if elapsed >= duration:
break
# Check elapsed time
elapsed = time.time() - start_time
if elapsed >= duration:
break
# NOTE: do NOT break on is_cycle_complete() here.
# When multi-display sync is active, breaking exits run_iteration()
# which causes a 2-3s delay before start_new_cycle() is called on
# the next run_iteration(). During that gap the scroll advances into
# the pre-roll zone, then start_new_cycle() resets it — producing a
# second visible jump on the follower display ~2.5s after the first.
#
# Instead, run_frame() handles cycle completion directly (it calls
# start_new_cycle() in the very next frame, 8ms later), collapsing
# the two events into a single clean transition.
#
# Without sync, the iteration now runs to its full duration and may
# cycle content multiple times within one iteration — acceptable for
# a continuous ticker.
# Check for cycle completion
if self.render_pipeline.is_cycle_complete():
break
logger.info("Vegas iteration completed after %.1fs", time.time() - start_time)
return True
logger.info("Vegas iteration completed after %.1fs", time.time() - start_time)
return True
finally:
# Ensure background update thread finishes before the main loop
# resumes its own _tick_plugin_updates() calls, preventing concurrent
# run_scheduled_updates() execution.
self._drain_update_thread()
def _check_live_priority(self) -> bool:
"""
@@ -527,6 +500,71 @@ class VegasModeCoordinator:
if self._pending_config is None:
self._pending_config_update = False
def _run_update_tick_background(self) -> None:
"""Run the plugin update tick in a background thread.
Stores results for the render loop to pick up on its next iteration,
so the scroll never blocks on API calls.
"""
try:
updated_plugins = self._update_tick()
if updated_plugins:
with self._update_results_lock:
# Accumulate rather than replace to avoid losing notifications
# if a previous result hasn't been picked up yet
if self._update_results is None:
self._update_results = updated_plugins
else:
self._update_results.extend(updated_plugins)
except Exception:
logger.exception("Background plugin update tick failed")
def _drain_update_thread(self, timeout: float = 2.0) -> None:
"""Wait for any in-flight background update thread to finish.
Called when transitioning out of Vegas mode so the main-loop
``_tick_plugin_updates`` call doesn't race with a still-running
background thread.
"""
if self._update_thread is not None and self._update_thread.is_alive():
self._update_thread.join(timeout=timeout)
if self._update_thread.is_alive():
logger.warning(
"Background update thread did not finish within %.1fs", timeout
)
def _drive_background_updates(self) -> None:
"""Collect finished background update results and launch new ticks.
Safe to call from both the main render loop and the static-pause
wait loop so that plugin data stays fresh regardless of which
code path is active.
"""
# 1. Collect results from a previously completed background update
with self._update_results_lock:
ready_results = self._update_results
self._update_results = None
if ready_results:
for pid in ready_results:
self.mark_plugin_updated(pid)
# 2. Kick off a new background update if interval elapsed and none running
current_time = time.time()
if (self._update_tick and
current_time - self._last_update_tick_time >= self._update_tick_interval):
thread_alive = (
self._update_thread is not None
and self._update_thread.is_alive()
)
if not thread_alive:
self._last_update_tick_time = current_time
self._update_thread = threading.Thread(
target=self._run_update_tick_background,
daemon=True,
name="vegas-update-tick",
)
self._update_thread.start()
def mark_plugin_updated(self, plugin_id: str) -> None:
"""
Notify that a plugin's data has been updated.
@@ -645,10 +683,8 @@ class VegasModeCoordinator:
logger.info("Static pause interrupted by live priority")
return False
# Yield immediately if multi-display follower mode becomes active
if self._interrupt_check and self._interrupt_check():
logger.info("Static pause interrupted by sync follower mode")
return False
# Keep plugin data fresh during static pause
self._drive_background_updates()
# Sleep in small increments to remain responsive
time.sleep(0.1)

View File

@@ -329,51 +329,50 @@ class PluginAdapter:
# Save display state to restore after
original_image = self.display_manager.image.copy()
with self.display_manager.capture_mode():
# Method 1: Try _create_scrolling_display (stocks pattern)
if hasattr(plugin, '_create_scrolling_display'):
logger.info(
"[%s] Triggering via _create_scrolling_display()",
plugin_id
)
try:
plugin._create_scrolling_display()
cached_image = getattr(scroll_helper, 'cached_image', None)
if cached_image is not None and isinstance(cached_image, Image.Image):
logger.info(
"[%s] _create_scrolling_display() SUCCESS: %dx%d",
plugin_id, cached_image.width, cached_image.height
)
return cached_image
except (AttributeError, TypeError, ValueError, OSError):
logger.exception(
"[%s] _create_scrolling_display() failed", plugin_id
)
# Method 2: Try display(force_clear=True) which typically builds scroll content
if hasattr(plugin, 'display'):
logger.info(
"[%s] Triggering via display(force_clear=True)",
plugin_id
)
try:
self.display_manager.clear()
plugin.display(force_clear=True)
cached_image = getattr(scroll_helper, 'cached_image', None)
if cached_image is not None and isinstance(cached_image, Image.Image):
logger.info(
"[%s] display(force_clear=True) SUCCESS: %dx%d",
plugin_id, cached_image.width, cached_image.height
)
return cached_image
# Method 1: Try _create_scrolling_display (stocks pattern)
if hasattr(plugin, '_create_scrolling_display'):
logger.info(
"[%s] Triggering via _create_scrolling_display()",
plugin_id
)
try:
plugin._create_scrolling_display()
cached_image = getattr(scroll_helper, 'cached_image', None)
if cached_image is not None and isinstance(cached_image, Image.Image):
logger.info(
"[%s] display(force_clear=True) did not populate cached_image",
plugin_id
"[%s] _create_scrolling_display() SUCCESS: %dx%d",
plugin_id, cached_image.width, cached_image.height
)
except (AttributeError, TypeError, ValueError, OSError):
logger.exception(
"[%s] display(force_clear=True) failed", plugin_id
return cached_image
except (AttributeError, TypeError, ValueError, OSError):
logger.exception(
"[%s] _create_scrolling_display() failed", plugin_id
)
# Method 2: Try display(force_clear=True) which typically builds scroll content
if hasattr(plugin, 'display'):
logger.info(
"[%s] Triggering via display(force_clear=True)",
plugin_id
)
try:
self.display_manager.clear()
plugin.display(force_clear=True)
cached_image = getattr(scroll_helper, 'cached_image', None)
if cached_image is not None and isinstance(cached_image, Image.Image):
logger.info(
"[%s] display(force_clear=True) SUCCESS: %dx%d",
plugin_id, cached_image.width, cached_image.height
)
return cached_image
logger.info(
"[%s] display(force_clear=True) did not populate cached_image",
plugin_id
)
except (AttributeError, TypeError, ValueError, OSError):
logger.exception(
"[%s] display(force_clear=True) failed", plugin_id
)
logger.info(
"[%s] Could not trigger scroll content generation",
@@ -409,7 +408,10 @@ class PluginAdapter:
original_image = self.display_manager.image.copy()
logger.info("[%s] Fallback: saved original display state", plugin_id)
# Ensure plugin has fresh data before capturing
# Lightweight in-memory data refresh before capturing.
# Full update() is intentionally skipped here — the background
# update tick in the Vegas coordinator handles periodic API
# refreshes so we don't block the content-fetch thread.
has_update_data = hasattr(plugin, 'update_data')
logger.info("[%s] Fallback: has update_data=%s", plugin_id, has_update_data)
if has_update_data:
@@ -419,24 +421,21 @@ class PluginAdapter:
except (AttributeError, RuntimeError, OSError):
logger.exception("[%s] Fallback: update_data() failed", plugin_id)
# Clear and call plugin display — use capture_mode to suppress hardware writes
# that plugins may trigger internally via update_display().
with self.display_manager.capture_mode():
self.display_manager.clear()
logger.info("[%s] Fallback: display cleared, calling display()", plugin_id)
# Clear and call plugin display
self.display_manager.clear()
logger.info("[%s] Fallback: display cleared, calling display()", plugin_id)
# First try without force_clear (some plugins behave better this way)
try:
plugin.display()
logger.info("[%s] Fallback: display() called successfully", plugin_id)
except TypeError:
# Plugin may require force_clear argument
logger.info("[%s] Fallback: display() failed, trying with force_clear=True", plugin_id)
plugin.display(force_clear=True)
# Capture the result
captured = self.display_manager.image.copy()
# First try without force_clear (some plugins behave better this way)
try:
plugin.display()
logger.info("[%s] Fallback: display() called successfully", plugin_id)
except TypeError:
# Plugin may require force_clear argument
logger.info("[%s] Fallback: display() failed, trying with force_clear=True", plugin_id)
plugin.display(force_clear=True)
# Capture the result
captured = self.display_manager.image.copy()
logger.info(
"[%s] Fallback: captured frame %dx%d, mode=%s",
plugin_id, captured.width, captured.height, captured.mode
@@ -455,10 +454,9 @@ class PluginAdapter:
plugin_id
)
# Try once more with force_clear=True
with self.display_manager.capture_mode():
self.display_manager.clear()
plugin.display(force_clear=True)
captured = self.display_manager.image.copy()
self.display_manager.clear()
plugin.display(force_clear=True)
captured = self.display_manager.image.copy()
is_blank, bright_ratio = self._is_blank_image(captured, return_ratio=True)
logger.info(
@@ -587,6 +585,28 @@ class PluginAdapter:
else:
self._content_cache.clear()
def invalidate_plugin_scroll_cache(self, plugin: 'BasePlugin', plugin_id: str) -> None:
"""
Clear a plugin's scroll_helper cache so Vegas re-fetches fresh visuals.
Uses scroll_helper.clear_cache() to reset all cached state (cached_image,
cached_array, total_scroll_width, scroll_position, etc.) — not just the
image. Without this, plugins that use scroll_helper (stocks, news,
odds-ticker, etc.) would keep serving stale scroll images even after
their data refreshes.
Args:
plugin: Plugin instance
plugin_id: Plugin identifier
"""
scroll_helper = getattr(plugin, 'scroll_helper', None)
if scroll_helper is None:
return
if getattr(scroll_helper, 'cached_image', None) is not None:
scroll_helper.clear_cache()
logger.debug("[%s] Cleared scroll_helper cache", plugin_id)
def get_content_type(self, plugin: 'BasePlugin', plugin_id: str) -> str:
"""
Get the type of content a plugin provides.

View File

@@ -11,10 +11,11 @@ import threading
from collections import deque
from typing import Optional, List, Any, Dict, Deque, TYPE_CHECKING
from PIL import Image
import numpy as np
from src.common.scroll_helper import ScrollHelper
from src.vegas_mode.config import VegasModeConfig
from src.vegas_mode.stream_manager import StreamManager
from src.vegas_mode.stream_manager import StreamManager, ContentSegment
if TYPE_CHECKING:
pass
@@ -51,10 +52,6 @@ class RenderPipeline:
self.config = config
self.display_manager = display_manager
self.stream_manager = stream_manager
self.sync_manager = None # Optional DisplaySyncManager — set by coordinator
self.sync_follower_left = True # True = follower is LEFT of leader (default)
self._sync_send_interval = 1.0 / 90 # raw bytes are cheap; 90fps > follower render rate
self._last_sync_send = 0.0
# Display dimensions (handle both property and method access patterns)
self.display_width = (
@@ -205,26 +202,8 @@ class RenderPipeline:
# Update scroll position
self.scroll_helper.update_scroll_position()
# Determine if the cycle is done.
#
# scroll_helper considers a cycle complete only after
# total_distance_scrolled >= total_scroll_width + display_width.
# That extra display_width of travel causes a "wrap-around" phase
# where scroll_position resets to ~0 and the first plugin's content
# re-enters from the right — the user sees this 2-3 s of re-entry
# as "a plugin partially displaying before the next one starts."
#
# We end the cycle as soon as total_distance_scrolled reaches
# total_scroll_width (the wrap-around point), before any second-pass
# content becomes visible. The scroll_helper's own is_scroll_complete()
# check is kept as a fallback for any edge-cases where that threshold
# is never hit.
at_wrap_point = (
not self._cycle_complete and
self.scroll_helper.total_distance_scrolled >= self.scroll_helper.total_scroll_width
)
if at_wrap_point or self.scroll_helper.is_scroll_complete():
# Check if cycle is complete
if self.scroll_helper.is_scroll_complete():
if not self._cycle_complete:
self._cycle_complete = True
self.stats['scroll_cycles'] += 1
@@ -232,17 +211,6 @@ class RenderPipeline:
"Scroll cycle complete after %.1fs",
time.time() - self._cycle_start_time
)
# Push blank immediately so the hardware never shows any
# post-wrap content while the coordinator recomposes the
# next cycle (~100 ms).
try:
from PIL import Image as _Image
blank = _Image.new('RGB', (self.display_width, self.display_height))
self.display_manager.image = blank
self.display_manager.update_display()
except Exception:
logger.exception("Failed to write blank frame to display at cycle end")
return True # Cycle done; coordinator starts new cycle next frame
# Get visible portion
visible_frame = self.scroll_helper.get_visible_portion()
@@ -253,15 +221,6 @@ class RenderPipeline:
self.display_manager.image = visible_frame
self.display_manager.update_display()
# Multi-display sync: send scroll position to follower.
# The follower renders from its own cached_array (kept identical to the
# leader's via TCP image transfer at each new_cycle) at scroll_x ± display_width.
if self.sync_manager:
now = time.time()
if now - self._last_sync_send >= self._sync_send_interval:
self._last_sync_send = now
self.sync_manager.send_scroll_x(self.scroll_helper.scroll_position)
# Update scrolling state
self.display_manager.set_scrolling_state(True)
@@ -301,38 +260,33 @@ class RenderPipeline:
if self._cycle_complete:
return True
# When multi-display sync is active, defer mid-cycle hot swaps until the
# cycle ends naturally. Hot swaps block the render loop for 15-30ms while
# the image is rebuilt, causing a freeze+jump that the follower perceives
# as a speed-up. Deferring to cycle boundaries keeps transitions clean.
# Staging buffer content is still pre-loaded; it just applies at cycle end.
if self.sync_manager is not None:
return False
# Check if we need more content in the buffer
buffer_status = self.stream_manager.get_buffer_status()
if buffer_status['staging_count'] > 0:
return True
# Trigger recompose when pending updates affect visible segments
if self.stream_manager.has_pending_updates_for_visible_segments():
return True
return False
def hot_swap_content(self) -> bool:
"""
Hot-swap to new composed content.
Called when staging buffer has updated content.
Swaps atomically to prevent visual glitches.
Called when staging buffer has updated content or pending updates exist.
Preserves scroll position for mid-cycle updates to prevent visual jumps.
Returns:
True if swap occurred
"""
try:
# Snapshot position before swap so we can reposition after.
# The new image has completely different content — if scroll_position
# is left unchanged it lands at an arbitrary mid-content point in the
# new image, causing a visible jump on both displays.
old_width = self.scroll_helper.total_scroll_width
old_pos = self.scroll_helper.scroll_position
# Save scroll position for mid-cycle updates
saved_position = self.scroll_helper.scroll_position
saved_total_distance = self.scroll_helper.total_distance_scrolled
saved_total_width = max(1, self.scroll_helper.total_scroll_width)
was_mid_cycle = not self._cycle_complete
# Process any pending updates
self.stream_manager.process_updates()
@@ -340,24 +294,20 @@ class RenderPipeline:
# Recompose with updated content
if self.compose_scroll_content():
# Map scroll position proportionally into the new image width so
# we resume at the same relative progress through the content.
# This keeps the visual tempo consistent and avoids the jump that
# occurred when old scroll_position landed arbitrarily in new image.
new_width = self.scroll_helper.total_scroll_width
if old_width > 0 and new_width > 0:
ratio = (old_pos % old_width) / old_width
self.scroll_helper.scroll_position = ratio * new_width
else:
self.scroll_helper.scroll_position = 0.0
self.stats['hot_swaps'] += 1
logger.debug(
"Hot-swap completed: scroll repositioned %.0f%.0f (%.1f%% of new %dpx image)",
old_pos, self.scroll_helper.scroll_position,
(self.scroll_helper.scroll_position / new_width * 100) if new_width else 0,
new_width,
)
# Restore scroll position for mid-cycle updates so the
# scroll continues from where it was instead of jumping to 0
if was_mid_cycle:
new_total_width = max(1, self.scroll_helper.total_scroll_width)
progress_ratio = min(saved_total_distance / saved_total_width, 0.999)
self.scroll_helper.total_distance_scrolled = progress_ratio * new_total_width
self.scroll_helper.scroll_position = min(
saved_position,
float(new_total_width - 1)
)
self.scroll_helper.scroll_complete = False
self._cycle_complete = False
logger.debug("Hot-swap completed (mid_cycle_restore=%s)", was_mid_cycle)
return True
return False
@@ -392,29 +342,7 @@ class RenderPipeline:
return False
# Compose new scroll content
result = self.compose_scroll_content()
if result and self.sync_manager:
# When sync is active, start the leader at display_width instead of 0.
# This skips the initial black gap so the leader immediately shows content.
# The follower starts at position 0 (the gap) which looks like a clean
# blank transition rather than near-end content wrapping around.
self.scroll_helper.scroll_position = float(self.display_width)
if result and self.sync_manager:
# Signal follower that a new cycle started (triggers its own rebuild)
self.sync_manager.send_new_cycle()
# Push the actual scroll image over TCP so follower has identical pixels.
# Done in a background thread to not block the render loop (~15ms transfer).
if self.scroll_helper.cached_image is not None:
import threading as _t
_t.Thread(
target=self.sync_manager.send_scroll_image,
args=(self.scroll_helper.cached_image,),
daemon=True, name="sync-image-push"
).start()
return result
return self.compose_scroll_content()
def get_current_scroll_info(self) -> Dict[str, Any]:
"""Get current scroll state information."""

View File

@@ -14,7 +14,7 @@ Supports three display modes:
import logging
import threading
import time
from typing import Optional, List, Dict, Any, Deque, TYPE_CHECKING
from typing import Optional, List, Dict, Any, Deque, Tuple, TYPE_CHECKING
from collections import deque
from dataclasses import dataclass, field
from PIL import Image
@@ -24,6 +24,7 @@ from src.vegas_mode.plugin_adapter import PluginAdapter
from src.plugin_system.base_plugin import VegasDisplayMode
if TYPE_CHECKING:
from src.plugin_system.base_plugin import BasePlugin
from src.plugin_system.plugin_manager import PluginManager
logger = logging.getLogger(__name__)

View File

@@ -5,11 +5,11 @@ Provides consistent API response formatting across all endpoints.
"""
import time
from typing import Any, Optional, Dict, Tuple
from typing import Any, Optional, Dict, Tuple, Union
from flask import jsonify, request
from src.web_interface.error_handler import create_error_response, create_success_response
from src.web_interface.errors import ErrorCode
from src.web_interface.errors import ErrorCode, ErrorCategory
def success_response(

View File

@@ -5,6 +5,7 @@ Provides decorators and helpers for consistent error handling across API endpoin
"""
import functools
import traceback
from typing import Callable, Any, Optional
from flask import jsonify

View File

@@ -3,6 +3,7 @@ Input validation utilities for the web interface.
Provides validation functions for user inputs to prevent XSS, invalid data, and security issues.
"""
import re
import os
from typing import Optional, Tuple, List
from urllib.parse import urlparse
from pathlib import Path
@@ -55,7 +56,7 @@ def validate_image_url(url: str) -> Tuple[bool, Optional[str]]:
parsed = urlparse(url)
allowed_protocols = ['http', 'https']
if parsed.scheme not in allowed_protocols:
return False, "Only http:// and https:// protocols are allowed"
return False, f"Only http:// and https:// protocols are allowed"
return True, None
except Exception as e:
return False, f"Invalid URL format: {str(e)}"

View File

@@ -37,7 +37,7 @@ import time
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@@ -689,7 +689,7 @@ class WiFiManager:
# Helpers
# ---------------------------------------------------------------------------
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved")
def _validate_ap_config(self) -> Tuple[str, int]:
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
@@ -890,14 +890,14 @@ class WiFiManager:
"""
try:
content = f"# LEDMatrix captive portal: resolve all hostnames to AP\naddress=/#/{ap_ip}\n"
with open("/tmp/ledmatrix-nm-dnsmasq.conf", "w") as f: # nosec B108 - named file matches sudoers allowlist; single-user device
with open("/tmp/ledmatrix-nm-dnsmasq.conf", "w") as f:
f.write(content)
subprocess.run(
["sudo", "mkdir", "-p", str(NM_DNSMASQ_SHARED_DIR)],
capture_output=True, timeout=5
)
subprocess.run(
["sudo", "cp", "/tmp/ledmatrix-nm-dnsmasq.conf", str(NM_DNSMASQ_SHARED_CONF)], # nosec B108
["sudo", "cp", "/tmp/ledmatrix-nm-dnsmasq.conf", str(NM_DNSMASQ_SHARED_CONF)],
capture_output=True, timeout=5
)
logger.info(f"Wrote NM dnsmasq captive-portal config: {NM_DNSMASQ_SHARED_CONF}")
@@ -937,7 +937,7 @@ class WiFiManager:
pass
try:
import urllib.request as _ureq
_ureq.urlopen("http://connectivity-check.ubuntu.com/", timeout=timeout) # nosec B310 - hardcoded URL, no user input
_ureq.urlopen("http://connectivity-check.ubuntu.com/", timeout=timeout)
logger.debug("Internet connectivity confirmed via HTTP check")
return True
except OSError:
@@ -1314,7 +1314,7 @@ class WiFiManager:
# This ensures a clean switch between networks
if original_ssid and original_ssid != ssid:
logger.info(f"Switching networks: disconnecting from {original_ssid} before connecting to {ssid}")
self._show_led_message("Switching networks...", duration=3)
self._show_led_message(f"Switching networks...", duration=3)
# Skip AP mode check since we're about to connect to a new network
disconnect_success, disconnect_msg = self.disconnect_from_network(skip_ap_check=True)
if disconnect_success:
@@ -1370,7 +1370,7 @@ class WiFiManager:
ap_success, ap_msg = self.enable_ap_mode()
if ap_success:
logger.info("AP mode enabled as failsafe")
return False, "Connection failed and restoration failed. AP mode enabled."
return False, f"Connection failed and restoration failed. AP mode enabled."
else:
logger.error(f"Failed to enable AP mode: {ap_msg}")
return False, f"Connection failed, restoration failed, and AP mode failed: {ap_msg}"
@@ -1382,7 +1382,7 @@ class WiFiManager:
ap_success, ap_msg = self.enable_ap_mode()
if ap_success:
logger.info("AP mode enabled as failsafe")
return False, "Connection failed. AP mode enabled."
return False, f"Connection failed. AP mode enabled."
else:
return False, f"Connection failed and AP mode failed: {ap_msg}"
@@ -1401,8 +1401,8 @@ class WiFiManager:
# Last resort: enable AP mode
try:
self.enable_ap_mode()
except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging
logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True)
except Exception:
pass
return False, str(e)
def _restore_original_connection(self, connection_name: str, ssid: str) -> bool:
@@ -1797,7 +1797,7 @@ class WiFiManager:
logger.info("WiFi radio enabled and verified successfully")
return True
elif attempt < max_retries - 1:
logger.warning("WiFi radio enable command succeeded but not verified, will retry...")
logger.warning(f"WiFi radio enable command succeeded but not verified, will retry...")
time.sleep(1)
continue
else:
@@ -2324,12 +2324,12 @@ ignore_broadcast_ssid=0
"""
# Write config (requires sudo)
with open("/tmp/hostapd.conf", 'w') as f: # nosec B108 - named file matches sudoers allowlist; single-user device
with open("/tmp/hostapd.conf", 'w') as f:
f.write(config_content)
# Copy to final location with sudo
subprocess.run(
["sudo", "cp", "/tmp/hostapd.conf", str(HOSTAPD_CONFIG_PATH)], # nosec B108
["sudo", "cp", "/tmp/hostapd.conf", str(HOSTAPD_CONFIG_PATH)],
timeout=10
)
@@ -2394,12 +2394,12 @@ address=/detectportal.firefox.com/192.168.4.1
"""
# Write config (requires sudo)
with open("/tmp/dnsmasq.conf", 'w') as f: # nosec B108 - named file matches sudoers allowlist; single-user device
with open("/tmp/dnsmasq.conf", 'w') as f:
f.write(config_content)
# Copy to final location with sudo
subprocess.run(
["sudo", "cp", "/tmp/dnsmasq.conf", str(DNSMASQ_CONFIG_PATH)], # nosec B108
["sudo", "cp", "/tmp/dnsmasq.conf", str(DNSMASQ_CONFIG_PATH)],
timeout=10
)

View File

@@ -7,8 +7,7 @@
[Unit]
Description=LED Matrix Web Interface Service
After=network-online.target
Wants=network-online.target
After=network.target
[Service]
Type=simple

View File

@@ -1,7 +1,6 @@
[Unit]
Description=LED Matrix Display Service
After=network-online.target
Wants=network-online.target
After=network.target
[Service]
Type=simple

View File

@@ -5,6 +5,7 @@ Provides common fixtures for mocking core components and test setup.
"""
import pytest
import os
import sys
from pathlib import Path
from unittest.mock import Mock, MagicMock

View File

@@ -3,6 +3,7 @@
Diagnostic script to examine NBA API data structure and identify the missing 'id' field issue.
"""
import requests
import json
import logging
from typing import Dict, Any

View File

@@ -8,7 +8,7 @@ import sys
import json
from pathlib import Path
from unittest.mock import MagicMock, Mock
from typing import Any, Dict
from typing import Any, Dict, Generator, Optional
# Add project root to path
project_root = Path(__file__).parent.parent.parent

View File

@@ -6,7 +6,9 @@ Provides common test functionality for all plugins.
import pytest
import json
from pathlib import Path
from typing import Dict, Any
from unittest.mock import MagicMock
from src.plugin_system.plugin_loader import PluginLoader
from src.plugin_system.base_plugin import BasePlugin

View File

@@ -5,6 +5,7 @@ Verifies that the visual display manager actually renders pixels,
loads fonts, and can save snapshots.
"""
import pytest
from PIL import Image
from src.plugin_system.testing import VisualTestDisplayManager

View File

@@ -1,342 +0,0 @@
"""
Tests for src/base_classes/api_extractors.py
Covers ESPNFootballExtractor, ESPNBaseballExtractor, ESPNHockeyExtractor,
SoccerAPIExtractor, and the shared _extract_common_details logic.
"""
import logging
import pytest
from src.base_classes.api_extractors import (
ESPNFootballExtractor,
ESPNBaseballExtractor,
ESPNHockeyExtractor,
SoccerAPIExtractor,
)
# ---------------------------------------------------------------------------
# Shared test data factories
# ---------------------------------------------------------------------------
def _make_espn_event(state: str = "in", home_abbr: str = "KC", away_abbr: str = "BUF",
home_score: str = "14", away_score: str = "7",
date_str: str = "2024-01-15T20:00:00Z",
include_situation: bool = False,
situation: dict | None = None,
status_detail: str = "2nd Qtr 8:42",
period: int = 2) -> dict:
"""Build a minimal ESPN-style game event dict."""
comp_status = {
"type": {
"state": state,
"shortDetail": status_detail,
"detail": status_detail,
"name": "STATUS_IN_PROGRESS",
},
"period": period,
"displayClock": "8:42",
}
comp = {
"status": comp_status,
"competitors": [
{
"homeAway": "home",
"team": {"abbreviation": home_abbr, "displayName": f"{home_abbr} Team"},
"score": home_score,
},
{
"homeAway": "away",
"team": {"abbreviation": away_abbr, "displayName": f"{away_abbr} Team"},
"score": away_score,
},
],
}
if include_situation:
comp["situation"] = situation or {}
return {
"id": "test-game-1",
"date": date_str,
"competitions": [comp],
}
def _make_logger() -> logging.Logger:
return logging.getLogger("test_extractor")
# ---------------------------------------------------------------------------
# ESPNFootballExtractor
# ---------------------------------------------------------------------------
class TestESPNFootballExtractor:
def setup_method(self):
self.extractor = ESPNFootballExtractor(_make_logger())
def test_extract_live_game_basic_fields(self):
event = _make_espn_event(state="in", home_score="14", away_score="7")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "KC"
assert result["away_abbr"] == "BUF"
assert result["home_score"] == "14"
assert result["away_score"] == "7"
assert result["is_live"] is True
assert result["is_final"] is False
assert result["is_upcoming"] is False
def test_extract_final_game(self):
event = _make_espn_event(state="post")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_final"] is True
assert result["is_live"] is False
def test_extract_upcoming_game(self):
event = _make_espn_event(state="pre")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_upcoming"] is True
def test_sport_specific_fields_default_when_pregame(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "down" in fields
assert "distance" in fields
assert "possession" in fields
assert "is_redzone" in fields
assert fields["is_redzone"] is False
def test_sport_specific_fields_live_with_situation(self):
situation = {
"down": 3,
"distance": 7,
"possession": "KC",
"isRedZone": True,
"homeTimeouts": 2,
"awayTimeouts": 1,
}
event = _make_espn_event(state="in", include_situation=True, situation=situation)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["down"] == 3
assert fields["distance"] == 7
assert fields["is_redzone"] is True
assert fields["home_timeouts"] == 2
assert fields["away_timeouts"] == 1
def test_scoring_event_detected(self):
# situation must be non-empty (truthy) for the live block to execute
situation = {"down": 1, "distance": 10}
event = _make_espn_event(
state="in",
include_situation=True,
situation=situation,
status_detail="touchdown scored",
)
fields = self.extractor.get_sport_specific_fields(event)
assert "touchdown" in fields.get("scoring_event", "").lower()
def test_returns_none_on_empty_event(self):
assert self.extractor.extract_game_details({}) is None
def test_returns_none_when_teams_missing(self):
event = {
"id": "x",
"date": "2024-01-15T20:00:00Z",
"competitions": [
{
"status": {"type": {"state": "in", "shortDetail": "", "detail": "", "name": ""}},
"competitors": [], # no competitors
}
],
}
assert self.extractor.extract_game_details(event) is None
def test_date_z_suffix_parsed(self):
event = _make_espn_event(date_str="2024-01-15T20:00:00Z")
result = self.extractor.extract_game_details(event)
# Should not raise and should return a result
assert result is not None
def test_id_propagated(self):
event = _make_espn_event()
result = self.extractor.extract_game_details(event)
assert result["id"] == "test-game-1"
# ---------------------------------------------------------------------------
# ESPNBaseballExtractor
# ---------------------------------------------------------------------------
class TestESPNBaseballExtractor:
def setup_method(self):
self.extractor = ESPNBaseballExtractor(_make_logger())
def test_extract_live_game(self):
event = _make_espn_event(
state="in", home_abbr="NYY", away_abbr="BOS",
home_score="3", away_score="2"
)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "NYY"
assert result["is_live"] is True
def test_baseball_sport_fields_defaults(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "inning" in fields
assert "outs" in fields
assert "bases" in fields
assert "strikes" in fields
assert "balls" in fields
def test_baseball_sport_fields_live(self):
situation = {
"inning": 7,
"outs": 2,
"bases": "110",
"strikes": 2,
"balls": 3,
"pitcher": "Smith",
"batter": "Jones",
}
event = _make_espn_event(state="in", include_situation=True, situation=situation)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["inning"] == 7
assert fields["outs"] == 2
assert fields["strikes"] == 2
assert fields["pitcher"] == "Smith"
def test_returns_none_on_empty(self):
assert self.extractor.extract_game_details({}) is None
# ---------------------------------------------------------------------------
# ESPNHockeyExtractor
# ---------------------------------------------------------------------------
class TestESPNHockeyExtractor:
def setup_method(self):
self.extractor = ESPNHockeyExtractor(_make_logger())
def test_extract_live_game(self):
event = _make_espn_event(
state="in", home_abbr="BOS", away_abbr="TOR",
home_score="2", away_score="1"
)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_live"] is True
def test_hockey_period_text_p1(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=1
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P1"
def test_hockey_period_text_p2(self):
situation = {"isPowerPlay": False} # non-empty so the live block executes
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=2
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P2"
def test_hockey_period_text_p3(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=3
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P3"
def test_hockey_period_text_ot(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=4
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "OT1"
def test_hockey_power_play(self):
situation = {"isPowerPlay": True, "homeShots": 12, "awayShots": 8}
event = _make_espn_event(state="in", include_situation=True, situation=situation, period=2)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["power_play"] is True
assert fields["shots_on_goal"]["home"] == 12
assert fields["shots_on_goal"]["away"] == 8
def test_hockey_fields_defaults_pregame(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "period" in fields
assert "power_play" in fields
assert fields["power_play"] is False
def test_returns_none_on_empty(self):
assert self.extractor.extract_game_details({}) is None
# ---------------------------------------------------------------------------
# SoccerAPIExtractor
# ---------------------------------------------------------------------------
class TestSoccerAPIExtractor:
def setup_method(self):
self.extractor = SoccerAPIExtractor(_make_logger())
def _make_soccer_event(self, is_live: bool = True) -> dict:
return {
"id": "soccer-1",
"home_team": {"abbreviation": "ARS", "name": "Arsenal"},
"away_team": {"abbreviation": "CHE", "name": "Chelsea"},
"home_score": "2",
"away_score": "1",
"status": "LIVE",
"is_live": is_live,
"is_final": not is_live,
"is_upcoming": False,
"half": "1",
"stoppage_time": "2",
"home_yellow_cards": 1,
"away_yellow_cards": 2,
"home_red_cards": 0,
"away_red_cards": 0,
"home_possession": 55,
"away_possession": 45,
}
def test_extract_live_game(self):
event = self._make_soccer_event(is_live=True)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "ARS"
assert result["away_abbr"] == "CHE"
assert result["is_live"] is True
def test_sport_specific_cards(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["cards"]["home_yellow"] == 1
assert fields["cards"]["away_yellow"] == 2
assert fields["cards"]["home_red"] == 0
def test_sport_specific_possession(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["possession"]["home"] == 55
assert fields["possession"]["away"] == 45
def test_sport_specific_half(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["half"] == "1"
def test_scores_as_strings(self):
event = self._make_soccer_event()
result = self.extractor.extract_game_details(event)
assert result["home_score"] == "2"
assert result["away_score"] == "1"

View File

@@ -1,299 +0,0 @@
"""
Tests for src/background_data_service.py
Covers BackgroundDataService: submit_fetch_request, get_result,
is_request_complete, get_request_status, cancel_request, get_statistics,
_cleanup_completed_requests, shutdown, and get_background_service singleton.
"""
import time
import pytest
from unittest.mock import MagicMock, patch, Mock
from concurrent.futures import Future
from src.background_data_service import (
BackgroundDataService,
FetchStatus,
FetchResult,
FetchRequest,
get_background_service,
shutdown_background_service,
)
import src.background_data_service as bds_module
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_global_service():
"""Ensure each test starts with no global singleton."""
shutdown_background_service()
yield
shutdown_background_service()
@pytest.fixture
def mock_cache_manager():
m = MagicMock()
m.get.return_value = None
m.set.return_value = None
m.generate_sport_cache_key.return_value = "test_key"
return m
@pytest.fixture
def service(mock_cache_manager):
svc = BackgroundDataService(mock_cache_manager, max_workers=2, request_timeout=5)
yield svc
svc.shutdown(wait=False)
# ---------------------------------------------------------------------------
# Initialisation
# ---------------------------------------------------------------------------
class TestInitialisation:
def test_stats_zeroed(self, service):
stats = service.get_statistics()
assert stats["total_requests"] == 0
assert stats["completed_requests"] == 0
assert stats["failed_requests"] == 0
def test_no_active_requests(self, service):
assert len(service.active_requests) == 0
def test_not_shutdown(self, service):
assert service._shutdown is False
# ---------------------------------------------------------------------------
# Cache hit path
# ---------------------------------------------------------------------------
class TestCacheHit:
def test_cache_hit_returns_request_id(self, service, mock_cache_manager):
mock_cache_manager.get.return_value = {"events": [{"id": "1"}]}
req_id = service.submit_fetch_request(
sport="nfl", year=2024,
url="https://example.com/nfl",
cache_key="nfl_key",
)
assert req_id is not None
# Request should be immediately complete due to cache hit
result = service.get_result(req_id)
assert result is not None
assert result.success is True
assert result.cached is True
def test_cache_hit_increments_stat(self, service, mock_cache_manager):
mock_cache_manager.get.return_value = {"events": []}
service.submit_fetch_request(sport="nba", year=2024, url="https://x.com", cache_key="k")
stats = service.get_statistics()
assert stats["cached_hits"] == 1
# ---------------------------------------------------------------------------
# Actual fetch path (mocked HTTP)
# ---------------------------------------------------------------------------
class TestFetchPath:
def _valid_payload(self) -> dict:
return {"events": [{"id": "g1"}, {"id": "g2"}]}
def test_successful_fetch_completes(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024,
url="https://example.com/nfl",
cache_key="nfl_test",
)
# Wait for the background thread
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
result = service.get_result(req_id)
assert result is not None
assert result.success is True
assert result.data == self._valid_payload()
def test_failed_fetch_records_error(self, service, mock_cache_manager):
with patch.object(service.session, "get", side_effect=Exception("network error")):
req_id = service.submit_fetch_request(
sport="nba", year=2024,
url="https://example.com/nba",
cache_key="nba_test",
max_retries=0,
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
result = service.get_result(req_id)
assert result is not None
assert result.success is False
assert result.error is not None
def test_cache_miss_increments_stat(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="new_key",
)
stats = service.get_statistics()
assert stats["cache_misses"] == 1
def test_callback_called_on_success(self, service, mock_cache_manager):
callback = Mock()
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com",
cache_key="cb_key", callback=callback, max_retries=0,
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
callback.assert_called_once()
call_arg = callback.call_args[0][0]
assert isinstance(call_arg, FetchResult)
def test_data_cached_after_successful_fetch(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="cache_after_key",
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
mock_cache_manager.set.assert_called()
# ---------------------------------------------------------------------------
# Request status / cancel
# ---------------------------------------------------------------------------
class TestRequestStatusAndCancel:
def test_unknown_request_status_is_none(self, service):
assert service.get_request_status("nonexistent") is None
def test_cancel_active_request(self, service, mock_cache_manager):
# Manually insert an active request
req = FetchRequest(
id="r1", sport="nfl", year=2024,
cache_key="k", url="https://x.com",
)
req.status = FetchStatus.PENDING
service.active_requests["r1"] = req
result = service.cancel_request("r1")
assert result is True
assert "r1" not in service.active_requests
def test_cancel_nonexistent_request(self, service):
assert service.cancel_request("does-not-exist") is False
def test_is_request_complete_false_for_active(self, service, mock_cache_manager):
req = FetchRequest(
id="r2", sport="mlb", year=2024,
cache_key="k2", url="https://x.com",
)
service.active_requests["r2"] = req
assert service.is_request_complete("r2") is False
def test_is_request_complete_true_for_done(self, service):
result = FetchResult(request_id="r3", success=True)
service.completed_requests["r3"] = result
assert service.is_request_complete("r3") is True
def test_get_result_returns_none_for_unknown(self, service):
assert service.get_result("unknown") is None
# ---------------------------------------------------------------------------
# Shutdown
# ---------------------------------------------------------------------------
class TestShutdown:
def test_shutdown_sets_flag(self, service):
service.shutdown(wait=False)
assert service._shutdown is True
def test_submit_after_shutdown_raises(self, service, mock_cache_manager):
service.shutdown(wait=False)
with pytest.raises(RuntimeError, match="shutting down"):
service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="k"
)
# ---------------------------------------------------------------------------
# Cleanup
# ---------------------------------------------------------------------------
class TestCleanup:
def test_cleanup_removes_old_requests(self, service):
old_result = FetchResult(request_id="old", success=True)
old_result.completed_at = time.time() - 7200 # 2 hours ago
service.completed_requests["old"] = old_result
service._last_completed_requests_cleanup = 0 # force cleanup
removed = service._cleanup_completed_requests(force=True)
assert removed >= 1
assert "old" not in service.completed_requests
def test_cleanup_respects_interval(self, service):
old_result = FetchResult(request_id="r", success=True)
old_result.completed_at = time.time() - 7200
service.completed_requests["r"] = old_result
# Cleanup interval not passed, should skip
service._last_completed_requests_cleanup = time.time()
removed = service._cleanup_completed_requests(force=False)
assert removed == 0
def test_size_limit_enforcement(self, service):
service._max_completed_requests = 3
for i in range(5):
result = FetchResult(request_id=str(i), success=True)
result.completed_at = time.time() - (5 - i) * 100 # oldest first
service.completed_requests[str(i)] = result
service._last_completed_requests_cleanup = 0
service._cleanup_completed_requests(force=True)
assert len(service.completed_requests) <= 3
# ---------------------------------------------------------------------------
# Singleton get_background_service
# ---------------------------------------------------------------------------
class TestGetBackgroundService:
def test_first_call_requires_cache_manager(self):
with pytest.raises(ValueError, match="cache_manager is required"):
get_background_service()
def test_creates_singleton(self, mock_cache_manager):
svc1 = get_background_service(mock_cache_manager)
svc2 = get_background_service()
assert svc1 is svc2
def test_shutdown_clears_singleton(self, mock_cache_manager):
get_background_service(mock_cache_manager)
shutdown_background_service()
with pytest.raises(ValueError):
get_background_service()

View File

@@ -6,7 +6,10 @@ Tests cache functionality including memory cache, disk cache, strategy, and metr
import pytest
import time
from unittest.mock import patch
import json
import tempfile
from pathlib import Path
from unittest.mock import Mock, MagicMock, patch
from src.cache_manager import CacheManager
from src.cache.memory_cache import MemoryCache
from src.cache.disk_cache import DiskCache

View File

@@ -7,6 +7,9 @@ Tests configuration loading, migration, secrets handling, and validation.
import pytest
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import Mock, patch, mock_open
from src.config_manager import ConfigManager

View File

@@ -1,6 +1,11 @@
import time
import pytest
import threading
import json
from unittest.mock import MagicMock, patch
import os
import shutil
from pathlib import Path
from unittest.mock import Mock, MagicMock, patch
from src.config_service import ConfigService
from src.config_manager import ConfigManager

View File

@@ -10,7 +10,11 @@ Tests scenarios that commonly cause user configuration errors:
"""
import pytest
import json
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import tempfile
import os
# Add project root to path
import sys

View File

@@ -1,209 +0,0 @@
"""
Tests for src/base_classes/data_sources.py
Covers ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource.
All HTTP calls are mocked to avoid network access.
"""
import logging
from datetime import datetime, date
from unittest.mock import MagicMock, patch, Mock
import pytest
import requests
from src.base_classes.data_sources import ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource
def _make_logger() -> logging.Logger:
return logging.getLogger("test_data_sources")
def _mock_response(json_data: dict, status_code: int = 200):
resp = Mock(spec=requests.Response)
resp.status_code = status_code
resp.json.return_value = json_data
resp.raise_for_status = Mock()
if status_code >= 400:
resp.raise_for_status.side_effect = requests.HTTPError(response=resp)
return resp
# ---------------------------------------------------------------------------
# ESPNDataSource
# ---------------------------------------------------------------------------
class TestESPNDataSource:
def setup_method(self):
self.source = ESPNDataSource(_make_logger())
def test_get_headers(self):
headers = self.source.get_headers()
assert headers["Accept"] == "application/json"
assert "LEDMatrix" in headers["User-Agent"]
def test_fetch_live_games_returns_live_events(self):
live_event = {
"competitions": [{"status": {"type": {"state": "in"}}}]
}
non_live_event = {
"competitions": [{"status": {"type": {"state": "pre"}}}]
}
payload = {"events": [live_event, non_live_event]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("football", "nfl")
assert len(result) == 1
assert result[0] is live_event
def test_fetch_live_games_empty_when_none_live(self):
payload = {"events": [
{"competitions": [{"status": {"type": {"state": "post"}}}]}
]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("football", "nfl")
assert result == []
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("network failure")):
result = self.source.fetch_live_games("football", "nfl")
assert result == []
def test_fetch_schedule_returns_all_events(self):
events = [{"id": "1"}, {"id": "2"}]
payload = {"events": events}
start = datetime(2024, 1, 1)
end = datetime(2024, 1, 7)
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("football", "nfl", (start, end))
assert len(result) == 2
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("timeout")):
result = self.source.fetch_schedule("football", "nfl", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"standings": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("football", "nfl")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("error")):
result = self.source.fetch_standings("football", "nfl")
assert result == {}
def test_base_url_set_correctly(self):
assert "espn.com" in self.source.base_url
# ---------------------------------------------------------------------------
# MLBAPIDataSource
# ---------------------------------------------------------------------------
class TestMLBAPIDataSource:
def setup_method(self):
self.source = MLBAPIDataSource(_make_logger())
def test_fetch_live_games_filters_live(self):
live_game = {"status": {"abstractGameState": "Live"}}
final_game = {"status": {"abstractGameState": "Final"}}
payload = {"dates": [{"games": [live_game, final_game]}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("baseball", "mlb")
assert len(result) == 1
assert result[0] is live_game
def test_fetch_live_games_empty_dates(self):
payload = {"dates": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("baseball", "mlb")
assert result == []
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_live_games("baseball", "mlb")
assert result == []
def test_fetch_schedule_aggregates_all_dates(self):
payload = {
"dates": [
{"games": [{"id": "1"}, {"id": "2"}]},
{"games": [{"id": "3"}]},
]
}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
assert len(result) == 3
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"records": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("baseball", "mlb")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_standings("baseball", "mlb")
assert result == {}
# ---------------------------------------------------------------------------
# SoccerAPIDataSource
# ---------------------------------------------------------------------------
class TestSoccerAPIDataSource:
def setup_method(self):
self.source = SoccerAPIDataSource(_make_logger(), api_key="test-key-123")
def test_headers_include_api_key(self):
headers = self.source.get_headers()
assert headers["X-Auth-Token"] == "test-key-123"
def test_headers_without_api_key(self):
source = SoccerAPIDataSource(_make_logger())
headers = source.get_headers()
assert "X-Auth-Token" not in headers
def test_fetch_live_games_success(self):
payload = {"matches": [{"id": "m1"}, {"id": "m2"}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("soccer", "eng.1")
assert len(result) == 2
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_live_games("soccer", "eng.1")
assert result == []
def test_fetch_schedule_success(self):
payload = {"matches": [{"id": "m1"}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
assert len(result) == 1
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"standings": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("soccer", "PL")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_standings("soccer", "PL")
assert result == {}

View File

@@ -1,7 +1,7 @@
import time
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
import time
from unittest.mock import MagicMock, patch, ANY
from src.display_controller import DisplayController
class TestDisplayControllerInitialization:
"""Test DisplayController initialization and setup."""
@@ -15,12 +15,19 @@ class TestDisplayControllerInitialization:
assert test_display_controller.plugin_manager is not None
assert test_display_controller.available_modes == []
@pytest.mark.skip(reason="No assertions; init logic is covered by test_init_success and fixture setup")
def test_plugin_discovery_and_loading(self, test_display_controller):
"""Test plugin discovery and loading during initialization."""
# Mock plugin manager behavior
pm = test_display_controller.plugin_manager
pm.discover_plugins.return_value = ["plugin1", "plugin2"]
pm.get_plugin.return_value = MagicMock()
# Manually trigger the plugin loading logic that happens in __init__
# Since we're using a fixture that mocks __init__ partially, we need to verify
# the interactions or simulate the loading if we want to test that specific logic
pass
# Note: Testing __init__ logic is tricky with the fixture.
# We rely on the fixture to give us a usable controller.
class TestDisplayControllerModeRotation:
@@ -244,3 +251,5 @@ class TestDisplayControllerSchedule:
with patch.object(controller.config_service, 'get_config', return_value=schedule_config):
controller._check_schedule()
assert controller.is_display_active is False
from datetime import datetime

View File

@@ -1,6 +1,7 @@
import pytest
from unittest.mock import MagicMock, patch
from PIL import ImageDraw
import time
from unittest.mock import MagicMock, patch, ANY
from PIL import Image, ImageDraw
from src.display_manager import DisplayManager
@pytest.fixture

View File

@@ -9,8 +9,11 @@ Tests:
- Thread safety
"""
import pytest
import time
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import Mock, patch
import threading
import sys
@@ -26,7 +29,7 @@ from src.error_aggregator import (
get_error_aggregator,
record_error
)
from src.exceptions import PluginError
from src.exceptions import PluginError, ConfigError
class TestErrorRecording:

Some files were not shown because too many files have changed in this diff Show More