1 Commits

Author SHA1 Message Date
Chuck
45bf5db2b1 chore: remove march-madness from bundled plugin-repos
March Madness is now available in the ledmatrix-plugins monorepo store
(ChuckBuilds/ledmatrix-plugins/plugins/march-madness) and should be
installed via the Plugin Store like any other plugin.

Removing the bundled copy so new installs don't automatically include it.
Existing users keep their installed version until they choose to uninstall.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 19:58:10 -04:00
27 changed files with 587 additions and 3632 deletions

View File

@@ -1,10 +1,5 @@
# LEDMatrix
[![License](https://img.shields.io/badge/license-GPL--3.0-green)](LICENSE)
[![Discord](https://img.shields.io/badge/Discord-community-5865F2?logo=discord&logoColor=white)](https://discord.gg/RdrC37rEag)
[![GitHub Stars](https://img.shields.io/github/stars/ChuckBuilds/ledmatrix?style=flat&color=yellow)](https://github.com/ChuckBuilds/ledmatrix)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/77fc9b446a5948e5b0aed7a7aaeb1bab)](https://app.codacy.com/gh/ChuckBuilds/LEDMatrix/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_grade)
## Welcome to LEDMatrix!
Welcome to the LEDMatrix Project! This open-source project enables you to run an information-rich display on a Raspberry Pi connected to an LED RGB Matrix panel. Whether you want to see your calendar, weather forecasts, sports scores, stock prices, or any other information at a glance, LEDMatrix brings it all together.
@@ -132,15 +127,10 @@ The system supports live, recent, and upcoming game information for multiple spo
| This project can be finnicky! RGB LED Matrix displays are not built the same or to a high-quality standard. We have seen many displays arrive dead or partially working in our discord. Please purchase from a reputable vendor. |
### Raspberry Pi
- Raspberry Pi Zero's don't have enough processing power for this project.
- **Raspberry Pi 3B, 4, or 5**
- Raspberry Pi Zero's don't have enough processing power for this project and the Pi 5 is unsupported due to new GPIO output.
- **Raspberry Pi 3B or 4 (NOT RPi 5!)**
[Amazon Affiliate Link Raspberry Pi 4 4GB RAM](https://amzn.to/4dJixuX)
[Amazon Affiliate Link Raspberry Pi 4 8GB RAM](https://amzn.to/4qbqY7F)
- **Pi 5 users**: the installer automatically detects Pi 5 and builds the `rpi-rgb-led-matrix` library with RP1 support. If you previously installed on a Pi 4 and migrated the SD card, or if you see `mmap` errors in the logs, force a fresh library build:
```bash
sudo RPI_RGB_FORCE_REBUILD=1 ./first_time_install.sh
```
- Pi 5 config: leave `rp1_rio` at `0` (PIO mode, default) and set `gpio_slowdown` to `1` or `2`.
### RGB Matrix Bonnet / HAT
@@ -592,7 +582,7 @@ These settings control runtime behavior and GPIO timing:
- **Critical setting**: Must match your Raspberry Pi model for stability
- **Raspberry Pi 3**: Use 3
- **Raspberry Pi 4**: Use 4
- **Raspberry Pi 5**: Use 12 in PIO mode (`rp1_rio: 0`, the default); start with `1` and increase if you see flickering
- **Raspberry Pi 5**: Use 5 (or higher if needed)
- **Raspberry Pi Zero/1**: Use 1-2
- Incorrect values can cause display corruption, flickering, or system instability
- If you experience issues, try adjusting this value up or down by 1

View File

@@ -36,17 +36,9 @@ if [ -r /proc/device-tree/model ]; then
DEVICE_MODEL=$(tr -d '\0' </proc/device-tree/model)
echo "Detected device: $DEVICE_MODEL"
else
DEVICE_MODEL=""
echo "⚠ Could not detect Raspberry Pi model (continuing anyway)"
fi
# Detect Pi 5 for hardware-specific install decisions (RP1 library verification)
IS_PI5=0
if echo "${DEVICE_MODEL:-}" | grep -qi "Raspberry Pi 5"; then
IS_PI5=1
echo "Raspberry Pi 5 detected — will verify RP1 library support."
fi
# Check OS version - must be Raspberry Pi OS Lite (Trixie)
echo ""
echo "Checking operating system requirements..."
@@ -791,28 +783,9 @@ CURRENT_STEP="Build and install rpi-rgb-led-matrix"
echo "Step 6: Building and installing rpi-rgb-led-matrix..."
echo "-----------------------------------------------------"
# On Pi 5, also check that the installed library has rp1_rio support.
# A library built before Pi 5 support was added imports fine but maps to the
# Pi 3 peripheral bus address (0x3f000000) instead of the RP1 chip at runtime.
_HAS_RP1=0
if python3 -c 'from rgbmatrix import RGBMatrixOptions; assert hasattr(RGBMatrixOptions(), "rp1_rio")' >/dev/null 2>&1; then
_HAS_RP1=1
fi
_SKIP_BUILD=0
# If already installed and not forcing rebuild, skip expensive build
if python3 -c 'from rgbmatrix import RGBMatrix, RGBMatrixOptions' >/dev/null 2>&1 && [ "${RPI_RGB_FORCE_REBUILD:-0}" != "1" ]; then
if [ "$IS_PI5" = "1" ] && [ "$_HAS_RP1" = "0" ]; then
echo "⚠ Pi 5 detected: installed rgbmatrix lacks rp1_rio support (older build)."
echo " Forcing rebuild to get Pi 5 RP1 support..."
else
_SKIP_BUILD=1
fi
fi
if [ "$_SKIP_BUILD" = "1" ]; then
_skip_suffix=""
if [ "$IS_PI5" = "1" ]; then _skip_suffix=" with Pi 5 RP1 support"; fi
echo "rgbmatrix already installed${_skip_suffix}; skipping build (set RPI_RGB_FORCE_REBUILD=1 to force rebuild)."
echo "rgbmatrix Python package already available; skipping build (set RPI_RGB_FORCE_REBUILD=1 to force rebuild)."
else
# Ensure rpi-rgb-led-matrix submodule is initialized
if [ ! -d "$PROJECT_ROOT_DIR/rpi-rgb-led-matrix-master" ]; then
@@ -879,17 +852,6 @@ except Exception as e:
PY
then
echo "✓ rpi-rgb-led-matrix installed and verified"
# Pi 5: confirm the freshly-built library has rp1_rio support
if [ "$IS_PI5" = "1" ]; then
if python3 -c 'from rgbmatrix import RGBMatrixOptions; assert hasattr(RGBMatrixOptions(), "rp1_rio")' >/dev/null 2>&1; then
echo "✓ Pi 5 RP1 (rp1_rio) support confirmed"
else
echo "⚠ rp1_rio not found after rebuild — the submodule may be an older version."
echo " Try updating the submodule and rebuilding:"
echo " git submodule update --remote rpi-rgb-led-matrix-master"
echo " sudo RPI_RGB_FORCE_REBUILD=1 ./first_time_install.sh"
fi
fi
else
echo "✗ rpi-rgb-led-matrix import test failed"
exit 1

View File

@@ -67,9 +67,8 @@ def main():
print(" 📍 Will run on: http://0.0.0.0:5000")
print(" ⏹️ Press Ctrl+C to stop")
# Run the app (debug mode controlled by env var to satisfy security scanners)
_debug = os.environ.get('LEDMATRIX_FLASK_DEBUG', '0') == '1'
app.run(host='0.0.0.0', port=5000, debug=_debug)
# Run the app (this should start the server)
app.run(host='0.0.0.0', port=5000, debug=True)
except KeyboardInterrupt:
print("\n ⏹️ Server stopped by user")

View File

@@ -410,8 +410,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
try:
manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8")
manifest = json.loads(manifest_raw)
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
return False, "Invalid manifest.json", {}
except (OSError, UnicodeDecodeError, json.JSONDecodeError) as e:
return False, f"Invalid manifest.json: {e}", {}
if not isinstance(manifest, dict) or "schema_version" not in manifest:
return False, "Invalid manifest structure", {}
@@ -456,8 +456,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
return True, "", result_manifest
except zipfile.BadZipFile:
return False, "File is not a valid ZIP archive", {}
except OSError:
return False, "Could not read backup", {}
except OSError as e:
return False, f"Could not read backup: {e}", {}
# ---------------------------------------------------------------------------

View File

@@ -110,10 +110,9 @@ class DisplayManager:
options.rp1_rio = runtime_config.get('rp1_rio')
else:
logger.warning(
"rp1_rio is set in config but the installed rgbmatrix library does "
"not support it — the library was likely built without Pi 5 RP1 "
"support (mmap to 0x3f000000 instead of RP1 chip). "
"Fix: sudo RPI_RGB_FORCE_REBUILD=1 ./first_time_install.sh"
"rp1_rio is set in config but the current RGBMatrixOptions "
"implementation does not support it (RGBMatrixEmulator or older "
"library version) — value will be ignored"
)
logger.info(f"Initializing RGB Matrix with settings: rows={options.rows}, cols={options.cols}, chain_length={options.chain_length}, parallel={options.parallel}, hardware_mapping={options.hardware_mapping}")
@@ -190,7 +189,7 @@ class DisplayManager:
json.dump(_hw_status, _f)
_f.flush()
os.fsync(_f.fileno())
os.chmod(_tmp_path, 0o644)
os.chmod(_tmp_path, 0o600)
os.replace(_tmp_path, _status_path)
except Exception:
try:

View File

@@ -8,7 +8,6 @@ Extracted from PluginManager to improve separation of concerns.
import json
import importlib
import importlib.util
import os
import sys
import subprocess
import threading
@@ -69,11 +68,6 @@ class PluginLoader:
Returns:
Path to plugin directory or None if not found
"""
# Sanitize plugin_id — os.path.basename is a CodeQL-recognized path sanitizer
plugin_id = os.path.basename(plugin_id or '')
if not plugin_id:
return None
# Strategy 1: Use mapping from discovery
if plugin_directories and plugin_id in plugin_directories:
plugin_dir = plugin_directories[plugin_id]
@@ -81,16 +75,14 @@ class PluginLoader:
self.logger.debug("Using plugin directory from discovery mapping: %s", plugin_dir)
return plugin_dir
# Strategy 2: Direct paths — resolve and validate they stay within plugins_dir
plugins_dir_resolved = plugins_dir.resolve()
for _candidate_name in (plugin_id, f"ledmatrix-{plugin_id}"):
_candidate = (plugins_dir_resolved / _candidate_name).resolve()
try:
_candidate.relative_to(plugins_dir_resolved)
except ValueError:
continue
if _candidate.exists():
return _candidate
# Strategy 2: Direct paths
plugin_dir = plugins_dir / plugin_id
if plugin_dir.exists():
return plugin_dir
plugin_dir = plugins_dir / f"ledmatrix-{plugin_id}"
if plugin_dir.exists():
return plugin_dir
# Strategy 3: Case-insensitive search
normalized_id = plugin_id.lower()
@@ -151,21 +143,12 @@ class PluginLoader:
Returns:
True if dependencies installed or not needed, False on error
"""
plugin_id = os.path.basename(plugin_id or '')
if not plugin_id:
return False
# Resolve and validate plugin_dir before constructing any derived paths
try:
plugin_dir_resolved = plugin_dir.resolve(strict=True)
except OSError:
self.logger.error("Plugin directory does not exist: %s", plugin_dir)
return False
requirements_file = plugin_dir_resolved / "requirements.txt"
requirements_file = plugin_dir / "requirements.txt"
if not requirements_file.exists():
return True # No dependencies needed
marker_path = plugin_dir_resolved / ".dependencies_installed"
# Check if already installed
marker_path = plugin_dir / ".dependencies_installed"
if marker_path.exists():
self.logger.debug("Dependencies already installed for %s", plugin_id)
return True
@@ -188,24 +171,10 @@ class PluginLoader:
self.logger.info("Dependencies installed successfully for %s", plugin_id)
return True
else:
stderr = result.stderr or ""
# uninstall-no-record-file means the package is already present at the
# system level (e.g. installed via dnf/apt without a pip RECORD file).
# pip can't replace it, but it IS installed — write the marker so we
# don't retry on every restart.
if "uninstall-no-record-file" in stderr:
self.logger.warning(
"Dependencies for %s include system-managed packages (no pip RECORD). "
"Assuming they are satisfied: %s",
plugin_id, stderr.strip()
)
marker_path.touch()
ensure_file_permissions(marker_path, get_plugin_file_mode())
return True
self.logger.warning(
"Dependency installation returned non-zero exit code for %s: %s",
plugin_id,
stderr
result.stderr
)
return False
except subprocess.TimeoutExpired:
@@ -380,20 +349,9 @@ class PluginLoader:
Returns:
Loaded module or None on error
"""
plugin_id = os.path.basename(plugin_id or '')
if not plugin_id:
raise PluginError("Invalid plugin ID")
try:
plugin_dir_resolved = plugin_dir.resolve(strict=True)
except OSError:
raise PluginError("Plugin directory not found", plugin_id=plugin_id)
entry_file = (plugin_dir_resolved / entry_point).resolve()
try:
entry_file.relative_to(plugin_dir_resolved)
except ValueError:
raise PluginError("Invalid entry point path", plugin_id=plugin_id)
entry_file = plugin_dir / entry_point
if not entry_file.exists():
error_msg = f"Entry point file not found for plugin {plugin_id}"
error_msg = f"Entry point file not found: {entry_file} for plugin {plugin_id}"
self.logger.error(error_msg)
raise PluginError(error_msg, plugin_id=plugin_id, context={'entry_file': str(entry_file)})

View File

@@ -10,7 +10,6 @@ import json
import stat
import subprocess
import shutil
import threading
import zipfile
import tempfile
import requests
@@ -21,8 +20,6 @@ from pathlib import Path
from typing import List, Dict, Optional, Any, Tuple
import logging
from urllib.parse import urlparse
from src.common.permission_utils import sudo_remove_directory
try:
@@ -103,10 +100,6 @@ class PluginStoreManager:
# handlers. Bumping the cached-entry timestamp on failure serves
# the stale payload cheaply until the backoff expires.
self._failure_backoff_seconds = 60
# Prevents concurrent callers from each firing a network request when
# the registry cache expires. Only one thread fetches; others wait and
# then get the result from the warm cache (double-checked locking).
self._registry_fetch_lock = threading.Lock()
# Ensure plugins directory exists
self.plugins_dir.mkdir(exist_ok=True)
@@ -358,8 +351,7 @@ class PluginStoreManager:
# Extract owner/repo from URL
try:
# Handle different URL formats
_parsed_url = urlparse(repo_url)
if _parsed_url.hostname in ('github.com', 'www.github.com'):
if 'github.com' in repo_url:
parts = repo_url.strip('/').split('/')
if len(parts) >= 2:
owner = parts[-2]
@@ -523,8 +515,7 @@ class PluginStoreManager:
registry_urls = []
# Extract owner/repo from URL
_parsed_repo_url = urlparse(repo_url)
if _parsed_repo_url.hostname in ('github.com', 'www.github.com'):
if 'github.com' in repo_url:
parts = repo_url.split('/')
if len(parts) >= 2:
owner = parts[-2]
@@ -584,50 +575,41 @@ class PluginStoreManager:
(current_time - self.registry_cache_time) < self.registry_cache_timeout):
return self.registry_cache
with self._registry_fetch_lock:
# Re-check inside the lock — a concurrent caller that was waiting
# may have already populated the cache while we blocked.
current_time = time.time()
if (self.registry_cache and self.registry_cache_time and
not force_refresh and
(current_time - self.registry_cache_time) < self.registry_cache_timeout):
try:
self.logger.info(f"Fetching plugin registry from {self.REGISTRY_URL}")
response = self._http_get_with_retries(self.REGISTRY_URL, timeout=10)
response.raise_for_status()
self.registry_cache = response.json()
self.registry_cache_time = current_time
self.logger.info(f"Fetched registry with {len(self.registry_cache.get('plugins', []))} plugins")
return self.registry_cache
except requests.RequestException as e:
self.logger.error(f"Error fetching registry: {e}")
if raise_on_failure:
raise
# Prefer stale cache over an empty list so the plugin list UI
# keeps working on a flaky connection (e.g. Pi on WiFi). Bump
# registry_cache_time into a short backoff window so the next
# request serves the stale payload cheaply instead of
# re-hitting the network on every request (matches the
# pattern used by github_cache / commit_info_cache).
if self.registry_cache:
self.logger.warning("Falling back to stale registry cache")
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
try:
self.logger.info(f"Fetching plugin registry from {self.REGISTRY_URL}")
response = self._http_get_with_retries(self.REGISTRY_URL, timeout=10)
response.raise_for_status()
self.registry_cache = response.json()
self.registry_cache_time = current_time
self.logger.info(f"Fetched registry with {len(self.registry_cache.get('plugins', []))} plugins")
return {"plugins": []}
except json.JSONDecodeError as e:
self.logger.error(f"Error parsing registry JSON: {e}")
if raise_on_failure:
raise
if self.registry_cache:
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
except requests.RequestException as e:
self.logger.error(f"Error fetching registry: {e}")
if raise_on_failure:
raise
# Prefer stale cache over an empty list so the plugin list UI
# keeps working on a flaky connection (e.g. Pi on WiFi). Bump
# registry_cache_time into a short backoff window so the next
# request serves the stale payload cheaply instead of
# re-hitting the network on every request (matches the
# pattern used by github_cache / commit_info_cache).
if self.registry_cache:
self.logger.warning("Falling back to stale registry cache")
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
return {"plugins": []}
except json.JSONDecodeError as e:
self.logger.error(f"Error parsing registry JSON: {e}")
if raise_on_failure:
raise
if self.registry_cache:
self.registry_cache_time = (
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
)
return self.registry_cache
return {"plugins": []}
return {"plugins": []}
def search_plugins(self, query: str = "", category: str = "", tags: List[str] = None, fetch_commit_info: bool = True, include_saved_repos: bool = True, saved_repositories_manager = None) -> List[Dict]:
"""
@@ -779,8 +761,7 @@ class PluginStoreManager:
try:
# Convert repo URL to raw content URL
# https://github.com/user/repo -> https://raw.githubusercontent.com/user/repo/branch/manifest.json
_parsed_manifest_url = urlparse(repo_url)
if _parsed_manifest_url.hostname in ('github.com', 'www.github.com'):
if 'github.com' in repo_url:
# Handle different URL formats
repo_url = repo_url.rstrip('/')
if repo_url.endswith('.git'):

View File

@@ -151,18 +151,6 @@ class WiFiManager:
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
# Once per process: remove a stale force-AP flag left by a prior crash.
# Guard with a class-level flag so the nmcli AP-state check only runs
# once even though WiFiManager is instantiated per-request.
if not WiFiManager._startup_cleanup_done:
WiFiManager._startup_cleanup_done = True
if self._FORCE_AP_FLAG_PATH.exists() and not self._is_ap_mode_active():
try:
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
logger.debug("Removed stale force-AP flag on startup (AP not active)")
except OSError as exc:
logger.warning(f"Could not remove stale force-AP flag: {exc}")
def _show_led_message(self, message: str, duration: int = 5):
"""
Show a WiFi status message on the LED display.
@@ -486,10 +474,7 @@ class WiFiManager:
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if '/' in line:
# nmcli -t output is "IP4.ADDRESS[1]:x.x.x.x/prefix";
# bare "x.x.x.x/prefix" is also accepted defensively.
_, sep, rest = line.partition(':')
ip_address = (rest if sep else line).split('/')[0].strip()
ip_address = line.split('/')[0].strip()
break
# Final fallback: Get signal strength by matching SSID in WiFi list
@@ -515,13 +500,6 @@ class WiFiManager:
# Check if AP mode is active
ap_active = self._is_ap_mode_active()
# wlan0 shows as "connected" in AP mode; clear client-station fields so
# callers don't mistake the AP for an outbound WiFi connection.
if ap_active and wifi_connected:
wifi_connected = False
ssid = None
ip_address = None
logger.debug(f"{wlan_device} is in AP mode — overriding wifi_connected to False")
return WiFiStatus(
connected=wifi_connected,
@@ -712,10 +690,6 @@ class WiFiManager:
# ---------------------------------------------------------------------------
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi
# Written when AP mode is manually force-enabled; prevents daemon auto-disable
_FORCE_AP_FLAG_PATH = Path("/tmp/ledmatrix_force_ap_active") # nosec B108 - process-specific named file; device is single-user RPi
# Ensures the startup stale-flag cleanup runs once per process, not per instantiation
_startup_cleanup_done: bool = False
def _validate_ap_config(self) -> Tuple[str, int]:
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
@@ -1393,7 +1367,7 @@ class WiFiManager:
logger.error(f"Failed to restore original connection: {original_ssid}")
# Trigger AP mode as last resort
self._show_led_message("Enabling AP mode...", duration=5)
ap_success, ap_msg = self.enable_ap_mode(force=True)
ap_success, ap_msg = self.enable_ap_mode()
if ap_success:
logger.info("AP mode enabled as failsafe")
return False, "Connection failed and restoration failed. AP mode enabled."
@@ -1405,7 +1379,7 @@ class WiFiManager:
elif not success:
logger.warning(f"Connection to {ssid} failed and no original connection to restore")
self._show_led_message("Enabling AP mode...", duration=5)
ap_success, ap_msg = self.enable_ap_mode(force=True)
ap_success, ap_msg = self.enable_ap_mode()
if ap_success:
logger.info("AP mode enabled as failsafe")
return False, "Connection failed. AP mode enabled."
@@ -1426,7 +1400,7 @@ class WiFiManager:
logger.error(f"Failed to restore after exception: {restore_error}")
# Last resort: enable AP mode
try:
self.enable_ap_mode(force=True)
self.enable_ap_mode()
except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging
logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True)
return False, str(e)
@@ -1490,29 +1464,26 @@ class WiFiManager:
# Show LED message
self._show_led_message(f"Connecting to {ssid}...", duration=10)
# Find existing NM connection for this SSID.
# 802-11-wireless.ssid is not a valid column in 'nmcli connection show',
# so list all wifi connections then query each one's SSID individually.
list_result = subprocess.run( # nosec B603 B607 - fixed args, no user input
["nmcli", "-t", "-f", "NAME,TYPE", "connection", "show"],
capture_output=True, text=True, timeout=5
# First, check if connection already exists and try to activate it
# NetworkManager connection names might not match SSID exactly, so search by SSID
check_result = subprocess.run(
["nmcli", "-t", "-f", "NAME,802-11-wireless.ssid", "connection", "show"],
capture_output=True,
text=True,
timeout=5
)
existing_conn_name = None
if list_result.returncode == 0:
for line in list_result.stdout.strip().split('\n'):
if ':' not in line:
continue
parts = line.split(':')
if len(parts) < 2 or parts[1].strip() != '802-11-wireless':
continue
conn_name = parts[0].strip()
ssid_r = subprocess.run( # nosec B603 B607 - conn_name from nmcli output, not user input
["nmcli", "-g", "802-11-wireless.ssid", "connection", "show", conn_name],
capture_output=True, text=True, timeout=5
)
if ssid_r.returncode == 0 and ssid_r.stdout.strip() == ssid:
existing_conn_name = conn_name
break
if check_result.returncode == 0:
for line in check_result.stdout.strip().split('\n'):
if ':' in line:
parts = line.split(':')
if len(parts) >= 2:
conn_name = parts[0].strip()
conn_ssid = parts[1].strip() if len(parts) > 1 else ""
if conn_ssid == ssid:
existing_conn_name = conn_name
break
# Also try direct lookup by SSID (in case connection name matches SSID)
if not existing_conn_name:
@@ -1884,7 +1855,7 @@ class WiFiManager:
logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts")
return False
def enable_ap_mode(self, force: bool = False) -> Tuple[bool, str]:
def enable_ap_mode(self) -> Tuple[bool, str]:
"""
Enable access point mode
@@ -1906,29 +1877,20 @@ class WiFiManager:
if not self._ensure_wifi_radio_enabled():
return False, "WiFi radio is disabled and could not be enabled"
# Check if WiFi is connected (skip when force=True)
# Check if WiFi is connected
status = self.get_wifi_status()
if not force and status.connected:
if status.connected:
return False, "Cannot enable AP mode while WiFi is connected"
# Check if Ethernet is connected (skip when force=True)
if not force and self._is_ethernet_connected():
# Check if Ethernet is connected
if self._is_ethernet_connected():
return False, "Cannot enable AP mode while Ethernet is connected"
if force:
logger.debug(f"enable_ap_mode: force=True — WiFi/Ethernet guards bypassed; will create {self._FORCE_AP_FLAG_PATH}")
# Try hostapd/dnsmasq first (captive portal mode)
if self.has_hostapd and self.has_dnsmasq:
result = self._enable_ap_mode_hostapd()
if result[0]:
self._ap_enabled_at = time.time()
if force:
try:
self._FORCE_AP_FLAG_PATH.touch()
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
except OSError as exc:
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
return result
# Fallback to nmcli hotspot (simpler, no captive portal)
@@ -1938,12 +1900,6 @@ class WiFiManager:
result = self._enable_ap_mode_nmcli_hotspot()
if result[0]:
self._ap_enabled_at = time.time()
if force:
try:
self._FORCE_AP_FLAG_PATH.touch()
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
except OSError as exc:
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
return result
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
@@ -2135,14 +2091,8 @@ class WiFiManager:
self._clear_led_message()
return False, "AP started but captive-portal redirect setup failed"
# Verify the AP is actually running (retry up to 5x with 2s delay for NM async activation)
status = {}
for _attempt in range(5):
status = self._get_ap_status_nmcli()
if status.get('active'):
break
logger.debug(f"AP verification attempt {_attempt + 1}/5 not yet active, waiting 2s")
time.sleep(2)
# Verify the AP is actually running
status = self._get_ap_status_nmcli()
if status.get('active'):
ip = status.get('ip', '192.168.4.1')
logger.info(f"AP mode confirmed active at {ip} (open network, no password)")
@@ -2340,7 +2290,6 @@ class WiFiManager:
logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
self._ap_enabled_at = None
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
logger.info("AP mode disabled successfully")
return True, "AP mode disabled"
except Exception as e:
@@ -2529,29 +2478,22 @@ address=/detectportal.firefox.com/192.168.4.1
else:
logger.warning(f"Failed to enable AP mode: {message}")
elif not should_have_ap and ap_active:
# Should not have AP but do - check if it was manually force-enabled
force_active = self._FORCE_AP_FLAG_PATH.exists()
if status.connected:
# WiFi connected: always disable AP (user successfully configured WiFi)
# Should not have AP but do - disable AP mode
# Always disable if WiFi or Ethernet connects, regardless of auto_enable setting
if status.connected or ethernet_connected:
success, message = self.disable_ap_mode()
if success:
logger.info("Auto-disabled AP mode (WiFi connected)")
self._disconnected_checks = 0
if status.connected:
logger.info("Auto-disabled AP mode (WiFi connected)")
elif ethernet_connected:
logger.info("Auto-disabled AP mode (Ethernet connected)")
self._disconnected_checks = 0 # Reset counter
return True
else:
logger.warning(f"Failed to auto-disable AP mode: {message}")
elif ethernet_connected and not force_active:
# Ethernet connected, AP not manually forced: auto-disable
success, message = self.disable_ap_mode()
if success:
logger.info("Auto-disabled AP mode (Ethernet connected)")
self._disconnected_checks = 0
return True
else:
logger.warning(f"Failed to auto-disable AP mode: {message}")
elif ethernet_connected and force_active:
logger.debug("AP mode is force-active; Ethernet connected but auto-disable suppressed")
elif not auto_enable:
# AP is active but auto_enable is disabled - this means it was manually enabled
# Don't disable it automatically, let it stay active
logger.debug("AP mode is active (manually enabled), keeping active")
# Idle-timeout check: disable AP if no client has connected within the window.

View File

@@ -1,342 +0,0 @@
"""
Tests for src/base_classes/api_extractors.py
Covers ESPNFootballExtractor, ESPNBaseballExtractor, ESPNHockeyExtractor,
SoccerAPIExtractor, and the shared _extract_common_details logic.
"""
import logging
import pytest
from src.base_classes.api_extractors import (
ESPNFootballExtractor,
ESPNBaseballExtractor,
ESPNHockeyExtractor,
SoccerAPIExtractor,
)
# ---------------------------------------------------------------------------
# Shared test data factories
# ---------------------------------------------------------------------------
def _make_espn_event(state: str = "in", home_abbr: str = "KC", away_abbr: str = "BUF",
home_score: str = "14", away_score: str = "7",
date_str: str = "2024-01-15T20:00:00Z",
include_situation: bool = False,
situation: dict | None = None,
status_detail: str = "2nd Qtr 8:42",
period: int = 2) -> dict:
"""Build a minimal ESPN-style game event dict."""
comp_status = {
"type": {
"state": state,
"shortDetail": status_detail,
"detail": status_detail,
"name": "STATUS_IN_PROGRESS",
},
"period": period,
"displayClock": "8:42",
}
comp = {
"status": comp_status,
"competitors": [
{
"homeAway": "home",
"team": {"abbreviation": home_abbr, "displayName": f"{home_abbr} Team"},
"score": home_score,
},
{
"homeAway": "away",
"team": {"abbreviation": away_abbr, "displayName": f"{away_abbr} Team"},
"score": away_score,
},
],
}
if include_situation:
comp["situation"] = situation or {}
return {
"id": "test-game-1",
"date": date_str,
"competitions": [comp],
}
def _make_logger() -> logging.Logger:
return logging.getLogger("test_extractor")
# ---------------------------------------------------------------------------
# ESPNFootballExtractor
# ---------------------------------------------------------------------------
class TestESPNFootballExtractor:
def setup_method(self):
self.extractor = ESPNFootballExtractor(_make_logger())
def test_extract_live_game_basic_fields(self):
event = _make_espn_event(state="in", home_score="14", away_score="7")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "KC"
assert result["away_abbr"] == "BUF"
assert result["home_score"] == "14"
assert result["away_score"] == "7"
assert result["is_live"] is True
assert result["is_final"] is False
assert result["is_upcoming"] is False
def test_extract_final_game(self):
event = _make_espn_event(state="post")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_final"] is True
assert result["is_live"] is False
def test_extract_upcoming_game(self):
event = _make_espn_event(state="pre")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_upcoming"] is True
def test_sport_specific_fields_default_when_pregame(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "down" in fields
assert "distance" in fields
assert "possession" in fields
assert "is_redzone" in fields
assert fields["is_redzone"] is False
def test_sport_specific_fields_live_with_situation(self):
situation = {
"down": 3,
"distance": 7,
"possession": "KC",
"isRedZone": True,
"homeTimeouts": 2,
"awayTimeouts": 1,
}
event = _make_espn_event(state="in", include_situation=True, situation=situation)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["down"] == 3
assert fields["distance"] == 7
assert fields["is_redzone"] is True
assert fields["home_timeouts"] == 2
assert fields["away_timeouts"] == 1
def test_scoring_event_detected(self):
# situation must be non-empty (truthy) for the live block to execute
situation = {"down": 1, "distance": 10}
event = _make_espn_event(
state="in",
include_situation=True,
situation=situation,
status_detail="touchdown scored",
)
fields = self.extractor.get_sport_specific_fields(event)
assert "touchdown" in fields.get("scoring_event", "").lower()
def test_returns_none_on_empty_event(self):
assert self.extractor.extract_game_details({}) is None
def test_returns_none_when_teams_missing(self):
event = {
"id": "x",
"date": "2024-01-15T20:00:00Z",
"competitions": [
{
"status": {"type": {"state": "in", "shortDetail": "", "detail": "", "name": ""}},
"competitors": [], # no competitors
}
],
}
assert self.extractor.extract_game_details(event) is None
def test_date_z_suffix_parsed(self):
event = _make_espn_event(date_str="2024-01-15T20:00:00Z")
result = self.extractor.extract_game_details(event)
# Should not raise and should return a result
assert result is not None
def test_id_propagated(self):
event = _make_espn_event()
result = self.extractor.extract_game_details(event)
assert result["id"] == "test-game-1"
# ---------------------------------------------------------------------------
# ESPNBaseballExtractor
# ---------------------------------------------------------------------------
class TestESPNBaseballExtractor:
def setup_method(self):
self.extractor = ESPNBaseballExtractor(_make_logger())
def test_extract_live_game(self):
event = _make_espn_event(
state="in", home_abbr="NYY", away_abbr="BOS",
home_score="3", away_score="2"
)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "NYY"
assert result["is_live"] is True
def test_baseball_sport_fields_defaults(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "inning" in fields
assert "outs" in fields
assert "bases" in fields
assert "strikes" in fields
assert "balls" in fields
def test_baseball_sport_fields_live(self):
situation = {
"inning": 7,
"outs": 2,
"bases": "110",
"strikes": 2,
"balls": 3,
"pitcher": "Smith",
"batter": "Jones",
}
event = _make_espn_event(state="in", include_situation=True, situation=situation)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["inning"] == 7
assert fields["outs"] == 2
assert fields["strikes"] == 2
assert fields["pitcher"] == "Smith"
def test_returns_none_on_empty(self):
assert self.extractor.extract_game_details({}) is None
# ---------------------------------------------------------------------------
# ESPNHockeyExtractor
# ---------------------------------------------------------------------------
class TestESPNHockeyExtractor:
def setup_method(self):
self.extractor = ESPNHockeyExtractor(_make_logger())
def test_extract_live_game(self):
event = _make_espn_event(
state="in", home_abbr="BOS", away_abbr="TOR",
home_score="2", away_score="1"
)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_live"] is True
def test_hockey_period_text_p1(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=1
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P1"
def test_hockey_period_text_p2(self):
situation = {"isPowerPlay": False} # non-empty so the live block executes
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=2
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P2"
def test_hockey_period_text_p3(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=3
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P3"
def test_hockey_period_text_ot(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=4
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "OT1"
def test_hockey_power_play(self):
situation = {"isPowerPlay": True, "homeShots": 12, "awayShots": 8}
event = _make_espn_event(state="in", include_situation=True, situation=situation, period=2)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["power_play"] is True
assert fields["shots_on_goal"]["home"] == 12
assert fields["shots_on_goal"]["away"] == 8
def test_hockey_fields_defaults_pregame(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "period" in fields
assert "power_play" in fields
assert fields["power_play"] is False
def test_returns_none_on_empty(self):
assert self.extractor.extract_game_details({}) is None
# ---------------------------------------------------------------------------
# SoccerAPIExtractor
# ---------------------------------------------------------------------------
class TestSoccerAPIExtractor:
def setup_method(self):
self.extractor = SoccerAPIExtractor(_make_logger())
def _make_soccer_event(self, is_live: bool = True) -> dict:
return {
"id": "soccer-1",
"home_team": {"abbreviation": "ARS", "name": "Arsenal"},
"away_team": {"abbreviation": "CHE", "name": "Chelsea"},
"home_score": "2",
"away_score": "1",
"status": "LIVE",
"is_live": is_live,
"is_final": not is_live,
"is_upcoming": False,
"half": "1",
"stoppage_time": "2",
"home_yellow_cards": 1,
"away_yellow_cards": 2,
"home_red_cards": 0,
"away_red_cards": 0,
"home_possession": 55,
"away_possession": 45,
}
def test_extract_live_game(self):
event = self._make_soccer_event(is_live=True)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "ARS"
assert result["away_abbr"] == "CHE"
assert result["is_live"] is True
def test_sport_specific_cards(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["cards"]["home_yellow"] == 1
assert fields["cards"]["away_yellow"] == 2
assert fields["cards"]["home_red"] == 0
def test_sport_specific_possession(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["possession"]["home"] == 55
assert fields["possession"]["away"] == 45
def test_sport_specific_half(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["half"] == "1"
def test_scores_as_strings(self):
event = self._make_soccer_event()
result = self.extractor.extract_game_details(event)
assert result["home_score"] == "2"
assert result["away_score"] == "1"

View File

@@ -1,299 +0,0 @@
"""
Tests for src/background_data_service.py
Covers BackgroundDataService: submit_fetch_request, get_result,
is_request_complete, get_request_status, cancel_request, get_statistics,
_cleanup_completed_requests, shutdown, and get_background_service singleton.
"""
import time
import pytest
from unittest.mock import MagicMock, patch, Mock
from concurrent.futures import Future
from src.background_data_service import (
BackgroundDataService,
FetchStatus,
FetchResult,
FetchRequest,
get_background_service,
shutdown_background_service,
)
import src.background_data_service as bds_module
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_global_service():
"""Ensure each test starts with no global singleton."""
shutdown_background_service()
yield
shutdown_background_service()
@pytest.fixture
def mock_cache_manager():
m = MagicMock()
m.get.return_value = None
m.set.return_value = None
m.generate_sport_cache_key.return_value = "test_key"
return m
@pytest.fixture
def service(mock_cache_manager):
svc = BackgroundDataService(mock_cache_manager, max_workers=2, request_timeout=5)
yield svc
svc.shutdown(wait=False)
# ---------------------------------------------------------------------------
# Initialisation
# ---------------------------------------------------------------------------
class TestInitialisation:
def test_stats_zeroed(self, service):
stats = service.get_statistics()
assert stats["total_requests"] == 0
assert stats["completed_requests"] == 0
assert stats["failed_requests"] == 0
def test_no_active_requests(self, service):
assert len(service.active_requests) == 0
def test_not_shutdown(self, service):
assert service._shutdown is False
# ---------------------------------------------------------------------------
# Cache hit path
# ---------------------------------------------------------------------------
class TestCacheHit:
def test_cache_hit_returns_request_id(self, service, mock_cache_manager):
mock_cache_manager.get.return_value = {"events": [{"id": "1"}]}
req_id = service.submit_fetch_request(
sport="nfl", year=2024,
url="https://example.com/nfl",
cache_key="nfl_key",
)
assert req_id is not None
# Request should be immediately complete due to cache hit
result = service.get_result(req_id)
assert result is not None
assert result.success is True
assert result.cached is True
def test_cache_hit_increments_stat(self, service, mock_cache_manager):
mock_cache_manager.get.return_value = {"events": []}
service.submit_fetch_request(sport="nba", year=2024, url="https://x.com", cache_key="k")
stats = service.get_statistics()
assert stats["cached_hits"] == 1
# ---------------------------------------------------------------------------
# Actual fetch path (mocked HTTP)
# ---------------------------------------------------------------------------
class TestFetchPath:
def _valid_payload(self) -> dict:
return {"events": [{"id": "g1"}, {"id": "g2"}]}
def test_successful_fetch_completes(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024,
url="https://example.com/nfl",
cache_key="nfl_test",
)
# Wait for the background thread
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
result = service.get_result(req_id)
assert result is not None
assert result.success is True
assert result.data == self._valid_payload()
def test_failed_fetch_records_error(self, service, mock_cache_manager):
with patch.object(service.session, "get", side_effect=Exception("network error")):
req_id = service.submit_fetch_request(
sport="nba", year=2024,
url="https://example.com/nba",
cache_key="nba_test",
max_retries=0,
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
result = service.get_result(req_id)
assert result is not None
assert result.success is False
assert result.error is not None
def test_cache_miss_increments_stat(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="new_key",
)
stats = service.get_statistics()
assert stats["cache_misses"] == 1
def test_callback_called_on_success(self, service, mock_cache_manager):
callback = Mock()
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com",
cache_key="cb_key", callback=callback, max_retries=0,
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
callback.assert_called_once()
call_arg = callback.call_args[0][0]
assert isinstance(call_arg, FetchResult)
def test_data_cached_after_successful_fetch(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="cache_after_key",
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
mock_cache_manager.set.assert_called()
# ---------------------------------------------------------------------------
# Request status / cancel
# ---------------------------------------------------------------------------
class TestRequestStatusAndCancel:
def test_unknown_request_status_is_none(self, service):
assert service.get_request_status("nonexistent") is None
def test_cancel_active_request(self, service, mock_cache_manager):
# Manually insert an active request
req = FetchRequest(
id="r1", sport="nfl", year=2024,
cache_key="k", url="https://x.com",
)
req.status = FetchStatus.PENDING
service.active_requests["r1"] = req
result = service.cancel_request("r1")
assert result is True
assert "r1" not in service.active_requests
def test_cancel_nonexistent_request(self, service):
assert service.cancel_request("does-not-exist") is False
def test_is_request_complete_false_for_active(self, service, mock_cache_manager):
req = FetchRequest(
id="r2", sport="mlb", year=2024,
cache_key="k2", url="https://x.com",
)
service.active_requests["r2"] = req
assert service.is_request_complete("r2") is False
def test_is_request_complete_true_for_done(self, service):
result = FetchResult(request_id="r3", success=True)
service.completed_requests["r3"] = result
assert service.is_request_complete("r3") is True
def test_get_result_returns_none_for_unknown(self, service):
assert service.get_result("unknown") is None
# ---------------------------------------------------------------------------
# Shutdown
# ---------------------------------------------------------------------------
class TestShutdown:
def test_shutdown_sets_flag(self, service):
service.shutdown(wait=False)
assert service._shutdown is True
def test_submit_after_shutdown_raises(self, service, mock_cache_manager):
service.shutdown(wait=False)
with pytest.raises(RuntimeError, match="shutting down"):
service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="k"
)
# ---------------------------------------------------------------------------
# Cleanup
# ---------------------------------------------------------------------------
class TestCleanup:
def test_cleanup_removes_old_requests(self, service):
old_result = FetchResult(request_id="old", success=True)
old_result.completed_at = time.time() - 7200 # 2 hours ago
service.completed_requests["old"] = old_result
service._last_completed_requests_cleanup = 0 # force cleanup
removed = service._cleanup_completed_requests(force=True)
assert removed >= 1
assert "old" not in service.completed_requests
def test_cleanup_respects_interval(self, service):
old_result = FetchResult(request_id="r", success=True)
old_result.completed_at = time.time() - 7200
service.completed_requests["r"] = old_result
# Cleanup interval not passed, should skip
service._last_completed_requests_cleanup = time.time()
removed = service._cleanup_completed_requests(force=False)
assert removed == 0
def test_size_limit_enforcement(self, service):
service._max_completed_requests = 3
for i in range(5):
result = FetchResult(request_id=str(i), success=True)
result.completed_at = time.time() - (5 - i) * 100 # oldest first
service.completed_requests[str(i)] = result
service._last_completed_requests_cleanup = 0
service._cleanup_completed_requests(force=True)
assert len(service.completed_requests) <= 3
# ---------------------------------------------------------------------------
# Singleton get_background_service
# ---------------------------------------------------------------------------
class TestGetBackgroundService:
def test_first_call_requires_cache_manager(self):
with pytest.raises(ValueError, match="cache_manager is required"):
get_background_service()
def test_creates_singleton(self, mock_cache_manager):
svc1 = get_background_service(mock_cache_manager)
svc2 = get_background_service()
assert svc1 is svc2
def test_shutdown_clears_singleton(self, mock_cache_manager):
get_background_service(mock_cache_manager)
shutdown_background_service()
with pytest.raises(ValueError):
get_background_service()

View File

@@ -1,209 +0,0 @@
"""
Tests for src/base_classes/data_sources.py
Covers ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource.
All HTTP calls are mocked to avoid network access.
"""
import logging
from datetime import datetime, date
from unittest.mock import MagicMock, patch, Mock
import pytest
import requests
from src.base_classes.data_sources import ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource
def _make_logger() -> logging.Logger:
return logging.getLogger("test_data_sources")
def _mock_response(json_data: dict, status_code: int = 200):
resp = Mock(spec=requests.Response)
resp.status_code = status_code
resp.json.return_value = json_data
resp.raise_for_status = Mock()
if status_code >= 400:
resp.raise_for_status.side_effect = requests.HTTPError(response=resp)
return resp
# ---------------------------------------------------------------------------
# ESPNDataSource
# ---------------------------------------------------------------------------
class TestESPNDataSource:
def setup_method(self):
self.source = ESPNDataSource(_make_logger())
def test_get_headers(self):
headers = self.source.get_headers()
assert headers["Accept"] == "application/json"
assert "LEDMatrix" in headers["User-Agent"]
def test_fetch_live_games_returns_live_events(self):
live_event = {
"competitions": [{"status": {"type": {"state": "in"}}}]
}
non_live_event = {
"competitions": [{"status": {"type": {"state": "pre"}}}]
}
payload = {"events": [live_event, non_live_event]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("football", "nfl")
assert len(result) == 1
assert result[0] is live_event
def test_fetch_live_games_empty_when_none_live(self):
payload = {"events": [
{"competitions": [{"status": {"type": {"state": "post"}}}]}
]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("football", "nfl")
assert result == []
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("network failure")):
result = self.source.fetch_live_games("football", "nfl")
assert result == []
def test_fetch_schedule_returns_all_events(self):
events = [{"id": "1"}, {"id": "2"}]
payload = {"events": events}
start = datetime(2024, 1, 1)
end = datetime(2024, 1, 7)
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("football", "nfl", (start, end))
assert len(result) == 2
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("timeout")):
result = self.source.fetch_schedule("football", "nfl", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"standings": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("football", "nfl")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("error")):
result = self.source.fetch_standings("football", "nfl")
assert result == {}
def test_base_url_set_correctly(self):
assert "espn.com" in self.source.base_url
# ---------------------------------------------------------------------------
# MLBAPIDataSource
# ---------------------------------------------------------------------------
class TestMLBAPIDataSource:
def setup_method(self):
self.source = MLBAPIDataSource(_make_logger())
def test_fetch_live_games_filters_live(self):
live_game = {"status": {"abstractGameState": "Live"}}
final_game = {"status": {"abstractGameState": "Final"}}
payload = {"dates": [{"games": [live_game, final_game]}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("baseball", "mlb")
assert len(result) == 1
assert result[0] is live_game
def test_fetch_live_games_empty_dates(self):
payload = {"dates": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("baseball", "mlb")
assert result == []
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_live_games("baseball", "mlb")
assert result == []
def test_fetch_schedule_aggregates_all_dates(self):
payload = {
"dates": [
{"games": [{"id": "1"}, {"id": "2"}]},
{"games": [{"id": "3"}]},
]
}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
assert len(result) == 3
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"records": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("baseball", "mlb")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_standings("baseball", "mlb")
assert result == {}
# ---------------------------------------------------------------------------
# SoccerAPIDataSource
# ---------------------------------------------------------------------------
class TestSoccerAPIDataSource:
def setup_method(self):
self.source = SoccerAPIDataSource(_make_logger(), api_key="test-key-123")
def test_headers_include_api_key(self):
headers = self.source.get_headers()
assert headers["X-Auth-Token"] == "test-key-123"
def test_headers_without_api_key(self):
source = SoccerAPIDataSource(_make_logger())
headers = source.get_headers()
assert "X-Auth-Token" not in headers
def test_fetch_live_games_success(self):
payload = {"matches": [{"id": "m1"}, {"id": "m2"}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("soccer", "eng.1")
assert len(result) == 2
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_live_games("soccer", "eng.1")
assert result == []
def test_fetch_schedule_success(self):
payload = {"matches": [{"id": "m1"}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
assert len(result) == 1
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"standings": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("soccer", "PL")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_standings("soccer", "PL")
assert result == {}

View File

@@ -1,317 +0,0 @@
"""
Tests for src/common/game_helper.py
Covers GameHelper: extract_game_details, filter_*, sort_games_by_time,
process_games, get_game_summary, and all private helpers.
"""
import logging
import pytest
from datetime import datetime, timezone, timedelta
from src.common.game_helper import GameHelper
def _make_logger() -> logging.Logger:
return logging.getLogger("test_game_helper")
def _make_espn_event(
state: str = "in",
home_abbr: str = "LAL",
away_abbr: str = "BOS",
home_score: str = "105",
away_score: str = "98",
date_str: str = "2024-01-15T20:00:00Z",
period: int = 4,
status_name: str = "STATUS_IN_PROGRESS",
home_record: str = "30-10",
away_record: str = "25-15",
event_id: str = "game-1",
) -> dict:
return {
"id": event_id,
"date": date_str,
"competitions": [
{
"status": {
"type": {
"state": state,
"shortDetail": "Q4 2:30",
"name": status_name,
},
"period": period,
"displayClock": "2:30",
},
"competitors": [
{
"homeAway": "home",
"id": "h1",
"team": {"abbreviation": home_abbr, "displayName": f"{home_abbr} Team"},
"score": home_score,
"records": [{"summary": home_record}],
},
{
"homeAway": "away",
"id": "a1",
"team": {"abbreviation": away_abbr, "displayName": f"{away_abbr} Team"},
"score": away_score,
"records": [{"summary": away_record}],
},
],
}
],
}
@pytest.fixture
def helper():
return GameHelper(timezone_str="UTC", logger=_make_logger())
# ---------------------------------------------------------------------------
# extract_game_details
# ---------------------------------------------------------------------------
class TestExtractGameDetails:
def test_live_game(self, helper):
event = _make_espn_event(state="in")
result = helper.extract_game_details(event)
assert result is not None
assert result["is_live"] is True
assert result["is_final"] is False
assert result["is_upcoming"] is False
def test_final_game(self, helper):
event = _make_espn_event(state="post")
result = helper.extract_game_details(event)
assert result["is_final"] is True
def test_upcoming_game(self, helper):
event = _make_espn_event(state="pre")
result = helper.extract_game_details(event)
assert result["is_upcoming"] is True
def test_halftime_detection(self, helper):
event = _make_espn_event(state="halftime", status_name="STATUS_HALFTIME")
result = helper.extract_game_details(event)
assert result["is_halftime"] is True
def test_basic_fields_present(self, helper):
event = _make_espn_event()
result = helper.extract_game_details(event)
for key in ("id", "home_abbr", "away_abbr", "home_score", "away_score",
"home_record", "away_record", "start_time_utc"):
assert key in result
def test_team_abbreviations(self, helper):
event = _make_espn_event(home_abbr="MIA", away_abbr="PHX")
result = helper.extract_game_details(event)
assert result["home_abbr"] == "MIA"
assert result["away_abbr"] == "PHX"
def test_scores_as_strings(self, helper):
event = _make_espn_event(home_score="110", away_score="99")
result = helper.extract_game_details(event)
assert result["home_score"] == "110"
assert result["away_score"] == "99"
def test_returns_none_on_empty(self, helper):
assert helper.extract_game_details({}) is None
assert helper.extract_game_details(None) is None
def test_returns_none_when_no_competitors(self, helper):
event = _make_espn_event()
event["competitions"][0]["competitors"] = []
assert helper.extract_game_details(event) is None
def test_date_z_suffix_parsed(self, helper):
event = _make_espn_event(date_str="2024-06-01T19:30:00Z")
result = helper.extract_game_details(event)
assert result["start_time_utc"] is not None
assert result["start_time_utc"].tzinfo is not None
def test_zero_zero_record_suppressed(self, helper):
event = _make_espn_event(home_record="0-0", away_record="0-0-0")
result = helper.extract_game_details(event)
assert result["home_record"] == ""
assert result["away_record"] == ""
def test_basketball_sport_fields(self, helper):
event = _make_espn_event(period=3)
result = helper.extract_game_details(event, sport="basketball")
assert result["period_text"] == "Q3"
assert "clock" in result
def test_basketball_overtime_period(self, helper):
event = _make_espn_event(period=5)
result = helper.extract_game_details(event, sport="basketball")
assert result["period_text"] == "OT1"
def test_football_sport_fields(self, helper):
event = _make_espn_event(period=2)
result = helper.extract_game_details(event, sport="football")
assert result["period_text"] == "Q2"
def test_hockey_sport_fields_period_1(self, helper):
event = _make_espn_event(period=1)
result = helper.extract_game_details(event, sport="hockey")
assert result["period_text"] == "P1"
def test_hockey_sport_fields_ot(self, helper):
event = _make_espn_event(period=4)
result = helper.extract_game_details(event, sport="hockey")
assert result["period_text"] == "OT1"
def test_baseball_sport_fields(self, helper):
event = _make_espn_event(period=7)
result = helper.extract_game_details(event, sport="baseball")
assert result["period_text"] == "INN 7"
# ---------------------------------------------------------------------------
# Filter methods
# ---------------------------------------------------------------------------
class TestFilterMethods:
def _make_games(self):
now = datetime.now(timezone.utc)
return [
{"is_live": True, "is_final": False, "is_upcoming": False, "home_abbr": "LAL", "away_abbr": "BOS", "start_time_utc": now},
{"is_live": False, "is_final": True, "is_upcoming": False, "home_abbr": "MIA", "away_abbr": "PHX", "start_time_utc": now - timedelta(hours=3)},
{"is_live": False, "is_final": False, "is_upcoming": True, "home_abbr": "DAL", "away_abbr": "CHI", "start_time_utc": now + timedelta(hours=2)},
]
def test_filter_live_games(self, helper):
games = self._make_games()
result = helper.filter_live_games(games)
assert len(result) == 1
assert result[0]["home_abbr"] == "LAL"
def test_filter_final_games(self, helper):
games = self._make_games()
result = helper.filter_final_games(games)
assert len(result) == 1
assert result[0]["home_abbr"] == "MIA"
def test_filter_upcoming_games(self, helper):
games = self._make_games()
result = helper.filter_upcoming_games(games)
assert len(result) == 1
assert result[0]["home_abbr"] == "DAL"
def test_filter_favorite_teams_match(self, helper):
games = self._make_games()
result = helper.filter_favorite_teams(games, ["LAL"])
assert len(result) == 1
assert result[0]["home_abbr"] == "LAL"
def test_filter_favorite_teams_empty_list_returns_all(self, helper):
games = self._make_games()
result = helper.filter_favorite_teams(games, [])
assert len(result) == 3
def test_filter_favorite_teams_away_match(self, helper):
games = self._make_games()
result = helper.filter_favorite_teams(games, ["BOS"])
assert len(result) == 1
def test_filter_recent_games_within_window(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now - timedelta(days=2), "is_final": True},
{"start_time_utc": now - timedelta(days=10), "is_final": True},
]
result = helper.filter_recent_games(games, days_back=7)
assert len(result) == 1
def test_filter_recent_games_all_within(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now - timedelta(days=1)},
{"start_time_utc": now - timedelta(days=3)},
]
result = helper.filter_recent_games(games, days_back=7)
assert len(result) == 2
def test_sort_games_ascending(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now + timedelta(hours=2), "id": "late"},
{"start_time_utc": now + timedelta(hours=1), "id": "early"},
]
result = helper.sort_games_by_time(games)
assert result[0]["id"] == "early"
def test_sort_games_descending(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now + timedelta(hours=1), "id": "early"},
{"start_time_utc": now + timedelta(hours=2), "id": "late"},
]
result = helper.sort_games_by_time(games, reverse=True)
assert result[0]["id"] == "late"
# ---------------------------------------------------------------------------
# process_games
# ---------------------------------------------------------------------------
class TestProcessGames:
def test_processes_valid_events(self, helper):
events = [
_make_espn_event(event_id="1"),
_make_espn_event(event_id="2"),
]
result = helper.process_games(events)
assert len(result) == 2
def test_skips_invalid_events(self, helper):
events = [
_make_espn_event(event_id="1"),
{}, # invalid
]
result = helper.process_games(events)
assert len(result) == 1
def test_empty_events(self, helper):
assert helper.process_games([]) == []
# ---------------------------------------------------------------------------
# get_game_summary
# ---------------------------------------------------------------------------
class TestGetGameSummary:
def test_live_summary(self, helper):
game = {
"home_abbr": "LAL", "away_abbr": "BOS",
"home_score": "105", "away_score": "98",
"status_text": "Q4 2:30",
"is_live": True, "is_final": False,
}
summary = helper.get_game_summary(game)
assert "BOS" in summary
assert "LAL" in summary
assert "98" in summary
assert "105" in summary
def test_final_summary(self, helper):
game = {
"home_abbr": "LAL", "away_abbr": "BOS",
"home_score": "110", "away_score": "102",
"status_text": "Final",
"is_live": False, "is_final": True,
}
summary = helper.get_game_summary(game)
assert "Final" in summary
def test_upcoming_summary(self, helper):
game = {
"home_abbr": "LAL", "away_abbr": "BOS",
"home_score": "0", "away_score": "0",
"status_text": "7:30 PM",
"is_live": False, "is_final": False,
}
summary = helper.get_game_summary(game)
assert "7:30 PM" in summary

View File

@@ -1,307 +0,0 @@
"""
Tests for src/plugin_system/health_monitor.py
Covers PluginHealthMonitor: get_plugin_health_status, get_plugin_health_metrics,
get_all_plugin_health, _get_recovery_suggestions, start/stop_monitoring,
register_health_check.
"""
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
from src.plugin_system.health_monitor import (
PluginHealthMonitor,
HealthStatus,
HealthMetrics,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_health_tracker(
summary: dict | None = None,
all_summaries: dict | None = None,
):
"""Return a mock PluginHealthTracker."""
tracker = MagicMock()
tracker.get_health_summary.return_value = summary
tracker.get_all_health_summaries.return_value = all_summaries or {}
return tracker
def _healthy_summary() -> dict:
return {
"success_rate": 100.0,
"circuit_state": "closed",
"consecutive_failures": 0,
"total_failures": 0,
"total_successes": 50,
"last_success_time": datetime.now().isoformat(),
"last_error": None,
}
def _degraded_summary() -> dict:
return {
"success_rate": 40.0, # 60% error rate
"circuit_state": "closed",
"consecutive_failures": 3,
"total_failures": 6,
"total_successes": 4,
"last_success_time": None,
"last_error": "timeout occurred",
}
def _unhealthy_summary() -> dict:
return {
"success_rate": 10.0, # 90% error rate
"circuit_state": "open",
"consecutive_failures": 10,
"total_failures": 9,
"total_successes": 1,
"last_success_time": None,
"last_error": "ImportError: missing module",
}
@pytest.fixture
def monitor():
tracker = _make_health_tracker(_healthy_summary())
return PluginHealthMonitor(health_tracker=tracker)
# ---------------------------------------------------------------------------
# get_plugin_health_status
# ---------------------------------------------------------------------------
class TestGetPluginHealthStatus:
def test_healthy_status(self):
tracker = _make_health_tracker(_healthy_summary())
monitor = PluginHealthMonitor(tracker)
status = monitor.get_plugin_health_status("plugin_a")
assert status == HealthStatus.HEALTHY
def test_degraded_status(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
status = monitor.get_plugin_health_status("plugin_b")
assert status == HealthStatus.DEGRADED
def test_unhealthy_status(self):
tracker = _make_health_tracker(_unhealthy_summary())
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
status = monitor.get_plugin_health_status("plugin_c")
assert status == HealthStatus.UNHEALTHY
def test_open_circuit_breaker_is_unhealthy(self):
summary = _healthy_summary()
summary["circuit_state"] = "open"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker)
status = monitor.get_plugin_health_status("plugin_d")
assert status == HealthStatus.UNHEALTHY
def test_unknown_when_no_tracker(self):
monitor = PluginHealthMonitor(health_tracker=None)
status = monitor.get_plugin_health_status("plugin_e")
assert status == HealthStatus.UNKNOWN
def test_unknown_when_no_summary(self):
tracker = _make_health_tracker(None)
monitor = PluginHealthMonitor(tracker)
status = monitor.get_plugin_health_status("plugin_f")
assert status == HealthStatus.UNKNOWN
# ---------------------------------------------------------------------------
# get_plugin_health_metrics
# ---------------------------------------------------------------------------
class TestGetPluginHealthMetrics:
def test_healthy_metrics(self):
tracker = _make_health_tracker(_healthy_summary())
monitor = PluginHealthMonitor(tracker)
metrics = monitor.get_plugin_health_metrics("plugin_a")
assert isinstance(metrics, HealthMetrics)
assert metrics.status == HealthStatus.HEALTHY
assert metrics.success_rate == pytest.approx(1.0)
assert metrics.error_rate == pytest.approx(0.0)
def test_degraded_metrics(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
metrics = monitor.get_plugin_health_metrics("plugin_b")
assert metrics.status == HealthStatus.DEGRADED
assert metrics.consecutive_failures == 3
def test_unhealthy_metrics(self):
tracker = _make_health_tracker(_unhealthy_summary())
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
metrics = monitor.get_plugin_health_metrics("plugin_c")
assert metrics.status == HealthStatus.UNHEALTHY
assert metrics.circuit_breaker_state == "open"
assert metrics.last_error is not None
def test_metrics_without_tracker(self):
monitor = PluginHealthMonitor(health_tracker=None)
metrics = monitor.get_plugin_health_metrics("plugin_d")
assert metrics.status == HealthStatus.UNKNOWN
assert metrics.plugin_id == "plugin_d"
def test_metrics_without_summary(self):
tracker = _make_health_tracker(None)
monitor = PluginHealthMonitor(tracker)
metrics = monitor.get_plugin_health_metrics("plugin_e")
assert metrics.status == HealthStatus.UNKNOWN
def test_last_successful_update_parsed(self):
summary = _healthy_summary()
summary["last_success_time"] = "2024-06-01T12:00:00"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker)
metrics = monitor.get_plugin_health_metrics("plugin_a")
assert metrics.last_successful_update is not None
assert isinstance(metrics.last_successful_update, datetime)
def test_invalid_last_success_time_handled(self):
summary = _healthy_summary()
summary["last_success_time"] = "not-a-date"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker)
# Should not raise
metrics = monitor.get_plugin_health_metrics("plugin_a")
assert metrics.last_successful_update is None
def test_total_successes_failures(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
metrics = monitor.get_plugin_health_metrics("plugin_b")
assert metrics.total_failures == 6
assert metrics.total_successes == 4
# ---------------------------------------------------------------------------
# get_all_plugin_health
# ---------------------------------------------------------------------------
class TestGetAllPluginHealth:
def test_returns_empty_without_tracker(self):
monitor = PluginHealthMonitor(health_tracker=None)
result = monitor.get_all_plugin_health()
assert result == {}
def test_returns_metrics_for_each_plugin(self):
all_summaries = {
"plugin_a": _healthy_summary(),
"plugin_b": _degraded_summary(),
}
tracker = MagicMock()
tracker.get_all_health_summaries.return_value = all_summaries
tracker.get_health_summary.side_effect = lambda pid: all_summaries.get(pid)
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
result = monitor.get_all_plugin_health()
assert "plugin_a" in result
assert "plugin_b" in result
assert isinstance(result["plugin_a"], HealthMetrics)
def test_returns_empty_when_no_summaries(self):
tracker = _make_health_tracker(all_summaries={})
monitor = PluginHealthMonitor(tracker)
result = monitor.get_all_plugin_health()
assert result == {}
# ---------------------------------------------------------------------------
# _get_recovery_suggestions
# ---------------------------------------------------------------------------
class TestGetRecoverySuggestions:
def test_healthy_plugin_suggestion(self):
tracker = _make_health_tracker(_healthy_summary())
monitor = PluginHealthMonitor(tracker)
suggestions = monitor._get_recovery_suggestions("p", _healthy_summary(), HealthStatus.HEALTHY)
assert any("healthy" in s.lower() for s in suggestions)
def test_unhealthy_suggestions(self):
tracker = _make_health_tracker(_unhealthy_summary())
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", _unhealthy_summary(), HealthStatus.UNHEALTHY)
assert len(suggestions) > 0
assert any("unhealthy" in s.lower() for s in suggestions)
def test_open_circuit_breaker_suggestion(self):
summary = _unhealthy_summary()
summary["circuit_state"] = "open"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
assert any("circuit" in s.lower() for s in suggestions)
def test_timeout_error_suggestion(self):
summary = _degraded_summary()
summary["last_error"] = "connection timeout occurred"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.DEGRADED)
assert any("timeout" in s.lower() for s in suggestions)
def test_import_error_suggestion(self):
summary = _unhealthy_summary()
summary["last_error"] = "ImportError: missing module"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
assert any("dependencies" in s.lower() or "import" in s.lower() or "missing" in s.lower()
for s in suggestions)
def test_permission_error_suggestion(self):
summary = _unhealthy_summary()
summary["last_error"] = "permission denied to access resource"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
assert any("permission" in s.lower() for s in suggestions)
def test_degraded_suggestions_include_error_rate(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", _degraded_summary(), HealthStatus.DEGRADED)
assert any("%" in s for s in suggestions)
# ---------------------------------------------------------------------------
# start / stop monitoring
# ---------------------------------------------------------------------------
class TestMonitorLifecycle:
def test_start_monitoring(self, monitor):
monitor.start_monitoring()
try:
assert monitor._monitor_thread is not None
assert monitor._monitor_thread.is_alive()
finally:
monitor.stop_monitoring()
def test_stop_monitoring(self, monitor):
monitor.start_monitoring()
monitor.stop_monitoring()
# Thread should no longer be alive
assert not monitor._monitor_thread.is_alive()
def test_double_start_no_duplicate_threads(self, monitor):
monitor.start_monitoring()
try:
thread1 = monitor._monitor_thread
monitor.start_monitoring() # should be idempotent
assert monitor._monitor_thread is thread1
finally:
monitor.stop_monitoring()
def test_register_health_check(self, monitor):
callback = MagicMock()
monitor.register_health_check(callback)
assert callback in monitor._health_check_callbacks

View File

@@ -1,129 +0,0 @@
"""
Tests for src/logo_downloader.py
Focuses on the pure/static methods that don't require network calls:
normalize_abbreviation, get_logo_filename_variations, get_logo_directory,
ensure_logo_directory, and the download_missing_logo function path
(with HTTP mocked).
"""
import os
import pytest
from pathlib import Path
from unittest.mock import patch, Mock, MagicMock
from src.logo_downloader import LogoDownloader
# ---------------------------------------------------------------------------
# normalize_abbreviation
# ---------------------------------------------------------------------------
class TestNormalizeAbbreviation:
def test_basic_lowercase(self):
result = LogoDownloader.normalize_abbreviation("lal")
assert result == "LAL"
def test_uppercases(self):
result = LogoDownloader.normalize_abbreviation("bos")
assert result == "BOS"
def test_ampersand_replaced(self):
result = LogoDownloader.normalize_abbreviation("TA&M")
assert "&" not in result
assert "AND" in result
def test_forward_slash_replaced(self):
result = LogoDownloader.normalize_abbreviation("A/B")
assert "/" not in result
def test_empty_returns_empty(self):
result = LogoDownloader.normalize_abbreviation("")
assert result == ""
# ---------------------------------------------------------------------------
# get_logo_filename_variations
# ---------------------------------------------------------------------------
class TestGetLogoFilenameVariations:
def test_returns_list(self):
result = LogoDownloader.get_logo_filename_variations("LAL")
assert isinstance(result, list)
assert len(result) > 0
def test_includes_png(self):
result = LogoDownloader.get_logo_filename_variations("KC")
filenames = " ".join(result)
assert ".png" in filenames
def test_includes_original(self):
result = LogoDownloader.get_logo_filename_variations("LAL")
assert any("LAL" in f for f in result)
def test_ampersand_variation(self):
result = LogoDownloader.get_logo_filename_variations("TA&M")
# Should produce at least the normalized version
assert len(result) > 0
def test_empty_string_no_crash(self):
result = LogoDownloader.get_logo_filename_variations("")
assert isinstance(result, list)
# ---------------------------------------------------------------------------
# get_logo_directory
# ---------------------------------------------------------------------------
class TestGetLogoDirectory:
def test_known_sport_returns_string(self):
downloader = LogoDownloader()
result = downloader.get_logo_directory("nfl")
assert isinstance(result, str)
assert len(result) > 0
def test_known_sport_nba(self):
downloader = LogoDownloader()
result = downloader.get_logo_directory("nba")
assert "nba" in result.lower() or "sports" in result.lower()
def test_unknown_sport_returns_string(self):
downloader = LogoDownloader()
result = downloader.get_logo_directory("unknown_sport_xyz")
assert isinstance(result, str)
# ---------------------------------------------------------------------------
# ensure_logo_directory
# ---------------------------------------------------------------------------
class TestEnsureLogoDirectory:
def test_creates_writable_directory(self, tmp_path):
downloader = LogoDownloader()
test_dir = str(tmp_path / "logos" / "nfl")
result = downloader.ensure_logo_directory(test_dir)
assert result is True
assert Path(test_dir).is_dir()
def test_existing_writable_directory(self, tmp_path):
downloader = LogoDownloader()
test_dir = str(tmp_path)
result = downloader.ensure_logo_directory(test_dir)
assert result is True
def test_returns_false_when_write_test_fails(self, tmp_path):
"""Simulate a directory that exists but raises PermissionError on write."""
downloader = LogoDownloader()
test_dir = str(tmp_path / "logos")
import builtins
original_open = builtins.open
def mock_open(path, *args, **kwargs):
if ".write_test" in str(path):
raise PermissionError("no write access")
return original_open(path, *args, **kwargs)
with patch("builtins.open", side_effect=mock_open):
result = downloader.ensure_logo_directory(test_dir)
assert result is False

View File

@@ -1,317 +0,0 @@
"""
Tests for src/common/scroll_helper.py
Covers ScrollHelper: create_scrolling_image, update_scroll_position,
get_visible_portion, calculate_dynamic_duration, set_* methods,
reset_scroll, clear_cache, get_scroll_info.
"""
import pytest
import time
from unittest.mock import patch
from PIL import Image
from src.common.scroll_helper import ScrollHelper
DISPLAY_W = 64
DISPLAY_H = 32
@pytest.fixture
def helper():
return ScrollHelper(display_width=DISPLAY_W, display_height=DISPLAY_H)
def _make_image(width: int = 64, height: int = 32, color=(255, 0, 0)) -> Image.Image:
img = Image.new("RGB", (width, height), color)
return img
# ---------------------------------------------------------------------------
# __init__ / initial state
# ---------------------------------------------------------------------------
class TestScrollHelperInit:
def test_initial_scroll_position(self, helper):
assert helper.scroll_position == 0.0
def test_initial_scroll_complete_false(self, helper):
assert helper.scroll_complete is False
def test_display_dimensions(self, helper):
assert helper.display_width == DISPLAY_W
assert helper.display_height == DISPLAY_H
# ---------------------------------------------------------------------------
# create_scrolling_image
# ---------------------------------------------------------------------------
class TestCreateScrollingImage:
def test_empty_content_returns_blank_image(self, helper):
result = helper.create_scrolling_image([])
assert isinstance(result, Image.Image)
assert helper.total_scroll_width == 0
def test_single_item_creates_image(self, helper):
img = _make_image(width=100)
result = helper.create_scrolling_image([img])
assert isinstance(result, Image.Image)
assert result.width > DISPLAY_W # includes leading gap
def test_multiple_items_wider_image(self, helper):
items = [_make_image(width=50), _make_image(width=50)]
result = helper.create_scrolling_image(items)
# Should be wider than two items alone
assert result.width > 100
def test_scroll_position_reset(self, helper):
helper.scroll_position = 500.0
helper.create_scrolling_image([_make_image()])
assert helper.scroll_position == 0.0
def test_cached_array_set(self, helper):
helper.create_scrolling_image([_make_image()])
assert helper.cached_array is not None
def test_scroll_complete_reset(self, helper):
helper.scroll_complete = True
helper.create_scrolling_image([_make_image()])
assert helper.scroll_complete is False
def test_total_scroll_width_matches_image(self, helper):
img = _make_image(width=200)
result = helper.create_scrolling_image([img])
assert helper.total_scroll_width == result.width
# ---------------------------------------------------------------------------
# set_scrolling_image
# ---------------------------------------------------------------------------
class TestSetScrollingImage:
def test_sets_cached_image(self, helper):
img = _make_image(width=200)
helper.set_scrolling_image(img)
assert helper.cached_image is img
def test_sets_cached_array(self, helper):
img = _make_image(width=200)
helper.set_scrolling_image(img)
assert helper.cached_array is not None
def test_scroll_width_matches_image(self, helper):
img = _make_image(width=300)
helper.set_scrolling_image(img)
assert helper.total_scroll_width == 300
def test_none_clears_cache(self, helper):
helper.set_scrolling_image(_make_image())
helper.set_scrolling_image(None)
assert helper.cached_image is None
# ---------------------------------------------------------------------------
# update_scroll_position (time-based mode)
# ---------------------------------------------------------------------------
class TestUpdateScrollPosition:
def test_position_advances_over_time(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
helper.scroll_speed = 100.0 # 100 px/s
helper.last_update_time = time.time() - 0.1 # pretend 100ms elapsed
initial = helper.scroll_position
helper.update_scroll_position()
assert helper.scroll_position > initial
def test_no_advance_without_image(self, helper):
helper.update_scroll_position() # no image, should not crash
assert helper.scroll_position == 0.0
def test_zero_width_content_stays_zero(self, helper):
helper.create_scrolling_image([]) # empty → width 0
helper.update_scroll_position()
assert helper.scroll_position == 0.0
def test_scroll_complete_clamped(self, helper):
helper.create_scrolling_image([_make_image(width=100)])
# Force position past the end
helper.scroll_position = helper.total_scroll_width + 50
helper.total_distance_scrolled = helper.total_scroll_width + 50
helper.update_scroll_position()
assert helper.scroll_complete is True
assert helper.scroll_position <= helper.total_scroll_width
# ---------------------------------------------------------------------------
# get_visible_portion
# ---------------------------------------------------------------------------
class TestGetVisiblePortion:
def test_returns_none_without_image(self, helper):
assert helper.get_visible_portion() is None
def test_returns_image_sized_to_display(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
visible = helper.get_visible_portion()
assert visible is not None
assert visible.width == DISPLAY_W
assert visible.height == DISPLAY_H
def test_different_positions_give_different_images(self, helper):
helper.create_scrolling_image([_make_image(width=300)])
img1 = helper.get_visible_portion()
helper.scroll_position = 50
img2 = helper.get_visible_portion()
# Images should differ (colour from scrolled content)
# Just verify both are valid PIL images with correct size
assert img1.width == img2.width == DISPLAY_W
# ---------------------------------------------------------------------------
# reset_scroll / clear_cache
# ---------------------------------------------------------------------------
class TestResetAndClear:
def test_reset_restores_position(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
helper.scroll_position = 100.0
helper.reset_scroll()
assert helper.scroll_position == 0.0
def test_reset_clears_complete_flag(self, helper):
helper.scroll_complete = True
helper.reset_scroll()
assert helper.scroll_complete is False
def test_reset_alias(self, helper):
helper.scroll_position = 50.0
helper.reset()
assert helper.scroll_position == 0.0
def test_clear_cache(self, helper):
helper.create_scrolling_image([_make_image()])
helper.clear_cache()
assert helper.cached_image is None
assert helper.cached_array is None
assert helper.total_scroll_width == 0
# ---------------------------------------------------------------------------
# calculate_dynamic_duration
# ---------------------------------------------------------------------------
class TestCalculateDynamicDuration:
def test_returns_min_when_disabled(self, helper):
helper.dynamic_duration_enabled = False
helper.min_duration = 30
result = helper.calculate_dynamic_duration()
assert result == 30
def test_returns_min_when_no_content(self, helper):
helper.total_scroll_width = 0
helper.min_duration = 30
result = helper.calculate_dynamic_duration()
assert result == 30
def test_respects_min_duration(self, helper):
helper.create_scrolling_image([_make_image(width=50)])
helper.min_duration = 60
helper.max_duration = 300
helper.scroll_speed = 500.0 # very fast → very short time
result = helper.calculate_dynamic_duration()
assert result >= 60
def test_respects_max_duration(self, helper):
helper.create_scrolling_image([_make_image(width=5000)])
helper.min_duration = 10
helper.max_duration = 60
helper.scroll_speed = 1.0 # very slow → very long time
result = helper.calculate_dynamic_duration()
assert result <= 60
def test_time_based_calculation(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
helper.scroll_speed = 100.0
helper.min_duration = 1
helper.max_duration = 600
helper.frame_based_scrolling = False
result = helper.calculate_dynamic_duration()
assert isinstance(result, int)
assert result > 0
# ---------------------------------------------------------------------------
# set_* configuration methods
# ---------------------------------------------------------------------------
class TestSetMethods:
def test_set_scroll_speed_time_based(self, helper):
helper.frame_based_scrolling = False
helper.set_scroll_speed(50.0)
assert helper.scroll_speed == 50.0
def test_set_scroll_speed_clamped_low(self, helper):
helper.frame_based_scrolling = False
helper.set_scroll_speed(0.0)
assert helper.scroll_speed >= 1.0
def test_set_scroll_speed_clamped_high(self, helper):
helper.frame_based_scrolling = False
helper.set_scroll_speed(10000.0)
assert helper.scroll_speed <= 500.0
def test_set_scroll_delay(self, helper):
helper.set_scroll_delay(0.05)
assert helper.scroll_delay == 0.05
def test_set_scroll_delay_clamped(self, helper):
helper.set_scroll_delay(0.0001)
assert helper.scroll_delay >= 0.001
def test_set_target_fps(self, helper):
helper.set_target_fps(60.0)
assert helper.target_fps == 60.0
def test_set_target_fps_clamped(self, helper):
helper.set_target_fps(1000.0)
assert helper.target_fps <= 200.0
def test_set_sub_pixel_scrolling(self, helper):
helper.set_sub_pixel_scrolling(True)
assert helper.sub_pixel_scrolling is True
helper.set_sub_pixel_scrolling(False)
assert helper.sub_pixel_scrolling is False
def test_set_frame_based_scrolling(self, helper):
helper.set_frame_based_scrolling(True)
assert helper.frame_based_scrolling is True
def test_set_dynamic_duration_settings(self, helper):
helper.set_dynamic_duration_settings(enabled=True, min_duration=20, max_duration=120, buffer=0.2)
assert helper.dynamic_duration_enabled is True
assert helper.min_duration == 20
assert helper.max_duration == 120
assert helper.duration_buffer == pytest.approx(0.2)
# ---------------------------------------------------------------------------
# get_scroll_info
# ---------------------------------------------------------------------------
class TestGetScrollInfo:
def test_returns_dict(self, helper):
info = helper.get_scroll_info()
assert isinstance(info, dict)
def test_required_keys(self, helper):
info = helper.get_scroll_info()
for key in ("scroll_position", "total_distance_scrolled", "scroll_speed",
"scroll_complete", "dynamic_duration"):
assert key in info
def test_scroll_position_reflected(self, helper):
helper.scroll_position = 42.0
info = helper.get_scroll_info()
assert info["scroll_position"] == 42.0

View File

@@ -1,329 +0,0 @@
"""
Tests for src/common/utils.py
Covers all pure utility functions: normalize_team_abbreviation, format_time,
format_date, get_timezone, validate_dimensions, parse_team_abbreviation,
format_score, format_period, is_live_game, is_final_game, is_upcoming_game,
sanitize_filename, truncate_text, parse_boolean.
"""
import pytest
from datetime import datetime, timezone
import pytz
from src.common.utils import (
normalize_team_abbreviation,
format_time,
format_date,
get_timezone,
validate_dimensions,
parse_team_abbreviation,
format_score,
format_period,
is_live_game,
is_final_game,
is_upcoming_game,
sanitize_filename,
truncate_text,
parse_boolean,
)
# ---------------------------------------------------------------------------
# normalize_team_abbreviation
# ---------------------------------------------------------------------------
class TestNormalizeTeamAbbreviation:
def test_basic_uppercase(self):
assert normalize_team_abbreviation("lal") == "LAL"
def test_strips_spaces(self):
assert normalize_team_abbreviation(" KC ") == "KC"
def test_replaces_ampersand(self):
assert normalize_team_abbreviation("TA&M") == "TAANDM"
def test_removes_internal_spaces(self):
assert normalize_team_abbreviation("A B") == "AB"
def test_removes_hyphens(self):
assert normalize_team_abbreviation("A-B") == "AB"
def test_empty_string_returns_empty(self):
assert normalize_team_abbreviation("") == ""
def test_none_returns_empty(self):
assert normalize_team_abbreviation(None) == ""
# ---------------------------------------------------------------------------
# format_time / format_date
# ---------------------------------------------------------------------------
class TestFormatTime:
def _utc_dt(self, hour=20, minute=30):
return datetime(2024, 1, 15, hour, minute, 0, tzinfo=timezone.utc)
def test_formats_utc_to_utc(self):
dt = self._utc_dt(20, 30)
result = format_time(dt, timezone_str="UTC")
# 20:30 UTC → "8:30PM" (leading zero stripped)
assert "8:30PM" in result or "8:30 PM" in result or result != ""
def test_naive_datetime_treated_as_utc(self):
dt = datetime(2024, 1, 15, 12, 0, 0) # naive
result = format_time(dt, timezone_str="UTC")
assert result != ""
def test_invalid_timezone_returns_empty(self):
dt = self._utc_dt()
result = format_time(dt, timezone_str="Invalid/TZ")
assert result == ""
def test_eastern_timezone(self):
dt = self._utc_dt(20, 0) # 8 PM UTC = 3 PM ET
result = format_time(dt, timezone_str="America/New_York")
assert result != ""
class TestFormatDate:
def test_formats_date(self):
dt = datetime(2024, 6, 15, 18, 0, 0, tzinfo=timezone.utc)
result = format_date(dt, timezone_str="UTC")
assert "June" in result or "15" in result
def test_naive_datetime(self):
dt = datetime(2024, 3, 10, 12, 0, 0)
result = format_date(dt, timezone_str="UTC")
assert result != ""
def test_invalid_timezone_returns_empty(self):
dt = datetime(2024, 6, 15, 18, 0, 0, tzinfo=timezone.utc)
result = format_date(dt, timezone_str="BadZone/Here")
assert result == ""
# ---------------------------------------------------------------------------
# get_timezone
# ---------------------------------------------------------------------------
class TestGetTimezone:
def test_valid_timezone(self):
tz = get_timezone("America/New_York")
assert tz is not None
def test_utc(self):
tz = get_timezone("UTC")
assert tz is pytz.utc or str(tz) == "UTC"
def test_invalid_returns_utc(self):
tz = get_timezone("Not/ATimezone")
assert tz is pytz.utc
# ---------------------------------------------------------------------------
# validate_dimensions
# ---------------------------------------------------------------------------
class TestValidateDimensions:
def test_valid(self):
assert validate_dimensions(64, 32) is True
def test_zero_width(self):
assert validate_dimensions(0, 32) is False
def test_zero_height(self):
assert validate_dimensions(64, 0) is False
def test_negative(self):
assert validate_dimensions(-1, 32) is False
def test_too_large(self):
assert validate_dimensions(1001, 32) is False
def test_max_valid(self):
assert validate_dimensions(1000, 1000) is True
def test_non_integer(self):
assert validate_dimensions("64", 32) is False # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# parse_team_abbreviation
# ---------------------------------------------------------------------------
class TestParseTeamAbbreviation:
def test_empty_string(self):
assert parse_team_abbreviation("") == ""
def test_none_returns_empty(self):
assert parse_team_abbreviation(None) == ""
def test_extracts_uppercase(self):
result = parse_team_abbreviation("LAL")
assert result == "LAL"
def test_fallback_first_three(self):
# text without recognisable 2-4 char uppercase block
result = parse_team_abbreviation("ab")
assert len(result) <= 3
# ---------------------------------------------------------------------------
# format_score
# ---------------------------------------------------------------------------
class TestFormatScore:
def test_format_score(self):
assert format_score(14, 7) == "7-14"
def test_format_score_strings(self):
assert format_score("21", "14") == "14-21"
def test_zero_zero(self):
assert format_score(0, 0) == "0-0"
# ---------------------------------------------------------------------------
# format_period
# ---------------------------------------------------------------------------
class TestFormatPeriod:
def test_basketball_q1(self):
assert format_period(1, "basketball") == "Q1"
def test_basketball_q4(self):
assert format_period(4, "basketball") == "Q4"
def test_basketball_ot1(self):
assert format_period(5, "basketball") == "OT1"
def test_basketball_ot2(self):
assert format_period(6, "basketball") == "OT2"
def test_football_q1(self):
assert format_period(1, "football") == "Q1"
def test_football_ot(self):
assert format_period(5, "football") == "OT1"
def test_hockey_p1(self):
assert format_period(1, "hockey") == "P1"
def test_hockey_p3(self):
assert format_period(3, "hockey") == "P3"
def test_hockey_ot(self):
assert format_period(4, "hockey") == "OT1"
def test_baseball_inning(self):
assert format_period(7, "baseball") == "INN 7"
def test_unknown_sport(self):
result = format_period(2, "unknown")
assert "2" in result
# ---------------------------------------------------------------------------
# is_live_game / is_final_game / is_upcoming_game
# ---------------------------------------------------------------------------
class TestGameStatusHelpers:
def test_is_live_game_true(self):
assert is_live_game("In Progress") is True
assert is_live_game("halftime") is True
assert is_live_game("overtime") is True
def test_is_live_game_false(self):
assert is_live_game("Final") is False
assert is_live_game("Scheduled") is False
def test_is_final_game_true(self):
assert is_final_game("Final") is True
assert is_final_game("COMPLETED") is True
def test_is_final_game_false(self):
assert is_final_game("In Progress") is False
def test_is_upcoming_game_true(self):
assert is_upcoming_game("Scheduled") is True
assert is_upcoming_game("upcoming") is True
def test_is_upcoming_game_false(self):
assert is_upcoming_game("Final") is False
assert is_upcoming_game("In Progress") is False
# ---------------------------------------------------------------------------
# sanitize_filename
# ---------------------------------------------------------------------------
class TestSanitizeFilename:
def test_removes_invalid_chars(self):
result = sanitize_filename('file<>:"/\\|?*.txt')
assert "<" not in result
assert ">" not in result
assert ":" not in result
def test_collapses_underscores(self):
result = sanitize_filename("file___name")
assert "__" not in result
def test_strips_leading_trailing(self):
result = sanitize_filename("_file_")
assert not result.startswith("_")
assert not result.endswith("_")
def test_normal_filename_unchanged(self):
result = sanitize_filename("my_logo")
assert result == "my_logo"
# ---------------------------------------------------------------------------
# truncate_text
# ---------------------------------------------------------------------------
class TestTruncateText:
def test_no_truncation_needed(self):
assert truncate_text("hello", 10) == "hello"
def test_truncation_adds_suffix(self):
result = truncate_text("hello world", 8)
assert result.endswith("...")
assert len(result) == 8
def test_exact_length(self):
assert truncate_text("hello", 5) == "hello"
def test_custom_suffix(self):
result = truncate_text("hello world", 8, suffix="~")
assert result.endswith("~")
# ---------------------------------------------------------------------------
# parse_boolean
# ---------------------------------------------------------------------------
class TestParseBoolean:
def test_true_bool(self):
assert parse_boolean(True) is True
def test_false_bool(self):
assert parse_boolean(False) is False
def test_int_1(self):
assert parse_boolean(1) is True
def test_int_0(self):
assert parse_boolean(0) is False
def test_string_true(self):
for val in ("true", "True", "TRUE", "1", "yes", "on", "enabled"):
assert parse_boolean(val) is True, f"Expected True for {val!r}"
def test_string_false(self):
for val in ("false", "False", "0", "no", "off", "disabled"):
assert parse_boolean(val) is False, f"Expected False for {val!r}"
def test_none_returns_false(self):
assert parse_boolean(None) is False # type: ignore[arg-type]

View File

@@ -1,310 +0,0 @@
"""
Tests for src/vegas_mode/config.py
Covers VegasModeConfig: from_config, to_dict, get_frame_interval,
is_plugin_included, get_ordered_plugins, validate, update.
"""
import pytest
from src.vegas_mode.config import VegasModeConfig
# ---------------------------------------------------------------------------
# Default construction
# ---------------------------------------------------------------------------
class TestVegasModeConfigDefaults:
def test_default_disabled(self):
cfg = VegasModeConfig()
assert cfg.enabled is False
def test_default_scroll_speed(self):
cfg = VegasModeConfig()
assert cfg.scroll_speed == 50.0
def test_default_separator_width(self):
cfg = VegasModeConfig()
assert cfg.separator_width == 32
def test_default_target_fps(self):
cfg = VegasModeConfig()
assert cfg.target_fps == 125
def test_default_plugin_order_empty(self):
cfg = VegasModeConfig()
assert cfg.plugin_order == []
def test_default_excluded_plugins_empty(self):
cfg = VegasModeConfig()
assert len(cfg.excluded_plugins) == 0
# ---------------------------------------------------------------------------
# from_config
# ---------------------------------------------------------------------------
class TestFromConfig:
def _cfg(self, **kwargs) -> dict:
return {"display": {"vegas_scroll": kwargs}}
def test_enabled_flag(self):
cfg = VegasModeConfig.from_config(self._cfg(enabled=True))
assert cfg.enabled is True
def test_scroll_speed(self):
cfg = VegasModeConfig.from_config(self._cfg(scroll_speed=80.0))
assert cfg.scroll_speed == 80.0
def test_separator_width(self):
cfg = VegasModeConfig.from_config(self._cfg(separator_width=16))
assert cfg.separator_width == 16
def test_plugin_order(self):
cfg = VegasModeConfig.from_config(self._cfg(plugin_order=["a", "b", "c"]))
assert cfg.plugin_order == ["a", "b", "c"]
def test_excluded_plugins(self):
cfg = VegasModeConfig.from_config(self._cfg(excluded_plugins=["x", "y"]))
assert "x" in cfg.excluded_plugins
assert "y" in cfg.excluded_plugins
def test_target_fps(self):
cfg = VegasModeConfig.from_config(self._cfg(target_fps=60))
assert cfg.target_fps == 60
def test_buffer_ahead(self):
cfg = VegasModeConfig.from_config(self._cfg(buffer_ahead=3))
assert cfg.buffer_ahead == 3
def test_min_max_cycle_duration(self):
cfg = VegasModeConfig.from_config(self._cfg(min_cycle_duration=30, max_cycle_duration=120))
assert cfg.min_cycle_duration == 30
assert cfg.max_cycle_duration == 120
def test_defaults_when_missing(self):
cfg = VegasModeConfig.from_config({})
assert cfg.enabled is False
assert cfg.scroll_speed == 50.0
def test_frame_based_scrolling(self):
cfg = VegasModeConfig.from_config(self._cfg(frame_based_scrolling=False))
assert cfg.frame_based_scrolling is False
# ---------------------------------------------------------------------------
# to_dict
# ---------------------------------------------------------------------------
class TestToDict:
def test_roundtrip(self):
original = VegasModeConfig(
enabled=True,
scroll_speed=75.0,
separator_width=24,
plugin_order=["a", "b"],
excluded_plugins={"z"},
target_fps=100,
)
d = original.to_dict()
assert d["enabled"] is True
assert d["scroll_speed"] == 75.0
assert d["separator_width"] == 24
assert d["plugin_order"] == ["a", "b"]
assert "z" in d["excluded_plugins"]
assert d["target_fps"] == 100
def test_excluded_plugins_is_list(self):
cfg = VegasModeConfig(excluded_plugins={"x"})
d = cfg.to_dict()
assert isinstance(d["excluded_plugins"], list)
def test_all_keys_present(self):
d = VegasModeConfig().to_dict()
for key in ("enabled", "scroll_speed", "separator_width", "plugin_order",
"excluded_plugins", "target_fps", "buffer_ahead",
"frame_based_scrolling", "scroll_delay",
"dynamic_duration_enabled", "min_cycle_duration", "max_cycle_duration"):
assert key in d
# ---------------------------------------------------------------------------
# get_frame_interval
# ---------------------------------------------------------------------------
class TestGetFrameInterval:
def test_125fps(self):
cfg = VegasModeConfig(target_fps=125)
assert abs(cfg.get_frame_interval() - 1.0 / 125) < 1e-9
def test_60fps(self):
cfg = VegasModeConfig(target_fps=60)
assert abs(cfg.get_frame_interval() - 1.0 / 60) < 1e-6
def test_zero_fps_guarded(self):
cfg = VegasModeConfig(target_fps=0)
# Should not raise ZeroDivisionError (max(1, fps) guard)
result = cfg.get_frame_interval()
assert result == 1.0
# ---------------------------------------------------------------------------
# is_plugin_included
# ---------------------------------------------------------------------------
class TestIsPluginIncluded:
def test_not_excluded_is_included(self):
cfg = VegasModeConfig(excluded_plugins={"bad_plugin"})
assert cfg.is_plugin_included("good_plugin") is True
def test_excluded_plugin_not_included(self):
cfg = VegasModeConfig(excluded_plugins={"bad_plugin"})
assert cfg.is_plugin_included("bad_plugin") is False
def test_empty_exclusions_all_included(self):
cfg = VegasModeConfig()
assert cfg.is_plugin_included("anything") is True
# ---------------------------------------------------------------------------
# get_ordered_plugins
# ---------------------------------------------------------------------------
class TestGetOrderedPlugins:
def test_natural_order_when_no_order_configured(self):
cfg = VegasModeConfig()
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert result == ["a", "b", "c"]
def test_explicit_order_followed(self):
cfg = VegasModeConfig(plugin_order=["c", "a", "b"])
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert result == ["c", "a", "b"]
def test_unavailable_plugins_skipped(self):
cfg = VegasModeConfig(plugin_order=["c", "x", "a"])
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert "x" not in result
assert result[:2] == ["c", "a"]
def test_excluded_plugins_removed(self):
cfg = VegasModeConfig(excluded_plugins={"b"})
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert "b" not in result
def test_unordered_available_appended(self):
cfg = VegasModeConfig(plugin_order=["a"])
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert result[0] == "a"
assert "b" in result
assert "c" in result
def test_empty_available(self):
cfg = VegasModeConfig(plugin_order=["a"])
result = cfg.get_ordered_plugins([])
assert result == []
# ---------------------------------------------------------------------------
# validate
# ---------------------------------------------------------------------------
class TestValidate:
def test_valid_config_no_errors(self):
cfg = VegasModeConfig()
errors = cfg.validate()
assert errors == []
def test_scroll_speed_too_low(self):
cfg = VegasModeConfig(scroll_speed=0.5)
errors = cfg.validate()
assert any("scroll_speed" in e for e in errors)
def test_scroll_speed_too_high(self):
cfg = VegasModeConfig(scroll_speed=300.0)
errors = cfg.validate()
assert any("scroll_speed" in e for e in errors)
def test_separator_width_negative(self):
cfg = VegasModeConfig(separator_width=-1)
errors = cfg.validate()
assert any("separator_width" in e for e in errors)
def test_separator_width_too_large(self):
cfg = VegasModeConfig(separator_width=200)
errors = cfg.validate()
assert any("separator_width" in e for e in errors)
def test_target_fps_too_low(self):
cfg = VegasModeConfig(target_fps=10)
errors = cfg.validate()
assert any("target_fps" in e for e in errors)
def test_target_fps_too_high(self):
cfg = VegasModeConfig(target_fps=300)
errors = cfg.validate()
assert any("target_fps" in e for e in errors)
def test_buffer_ahead_too_low(self):
cfg = VegasModeConfig(buffer_ahead=0)
errors = cfg.validate()
assert any("buffer_ahead" in e for e in errors)
def test_buffer_ahead_too_high(self):
cfg = VegasModeConfig(buffer_ahead=10)
errors = cfg.validate()
assert any("buffer_ahead" in e for e in errors)
def test_multiple_errors_returned(self):
cfg = VegasModeConfig(scroll_speed=0.1, target_fps=5)
errors = cfg.validate()
assert len(errors) >= 2
# ---------------------------------------------------------------------------
# update
# ---------------------------------------------------------------------------
class TestUpdate:
def _wrap(self, **kwargs) -> dict:
return {"display": {"vegas_scroll": kwargs}}
def test_update_enabled(self):
cfg = VegasModeConfig(enabled=False)
cfg.update(self._wrap(enabled=True))
assert cfg.enabled is True
def test_update_scroll_speed(self):
cfg = VegasModeConfig(scroll_speed=50.0)
cfg.update(self._wrap(scroll_speed=90.0))
assert cfg.scroll_speed == 90.0
def test_update_separator_width(self):
cfg = VegasModeConfig(separator_width=32)
cfg.update(self._wrap(separator_width=8))
assert cfg.separator_width == 8
def test_update_plugin_order(self):
cfg = VegasModeConfig(plugin_order=[])
cfg.update(self._wrap(plugin_order=["x", "y"]))
assert cfg.plugin_order == ["x", "y"]
def test_update_excluded_plugins(self):
cfg = VegasModeConfig()
cfg.update(self._wrap(excluded_plugins=["skip_me"]))
assert "skip_me" in cfg.excluded_plugins
def test_update_ignores_missing_keys(self):
cfg = VegasModeConfig(scroll_speed=50.0)
cfg.update(self._wrap(target_fps=80)) # only fps, not speed
assert cfg.scroll_speed == 50.0
assert cfg.target_fps == 80
def test_empty_update_no_change(self):
cfg = VegasModeConfig(scroll_speed=50.0)
cfg.update({})
assert cfg.scroll_speed == 50.0

View File

@@ -204,12 +204,24 @@ def serve_plugin_asset(plugin_id, filename):
# Use send_from_directory to serve the file
return send_from_directory(str(assets_dir), filename, mimetype=content_type)
except Exception:
except Exception as e:
# Log the exception with full traceback server-side
import traceback
app.logger.exception('Error serving plugin asset file')
return jsonify({
'status': 'error',
'message': 'Internal server error'
}), 500
# Return generic error message to client (avoid leaking internal details)
# Only include detailed error information when in debug mode
if app.debug:
return jsonify({
'status': 'error',
'message': str(e),
'traceback': traceback.format_exc()
}), 500
else:
return jsonify({
'status': 'error',
'message': 'Internal server error'
}), 500
# Prime psutil CPU measurement once at startup so interval=None returns a real value
try:
@@ -330,25 +342,35 @@ def not_found_error(error):
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
import traceback
error_details = traceback.format_exc()
# Log the error
import logging
logger = logging.getLogger('web_interface')
logger.error("Internal server error", exc_info=True)
logger.error(f"Internal server error: {error}", exc_info=True)
# Return user-friendly error (hide internal details in production)
return jsonify({
'status': 'error',
'error_code': 'INTERNAL_ERROR',
'message': 'An internal error occurred; see logs for details',
'message': 'An internal error occurred',
'details': error_details if app.debug else None
}), 500
@app.errorhandler(Exception)
def handle_exception(error):
"""Handle all unhandled exceptions."""
import traceback
import logging
logger = logging.getLogger('web_interface')
logger.error("Unhandled exception", exc_info=True)
logger.error(f"Unhandled exception: {error}", exc_info=True)
return jsonify({
'status': 'error',
'error_code': 'UNKNOWN_ERROR',
'message': 'An error occurred; see logs for details',
'message': str(error) if app.debug else 'An error occurred',
'details': traceback.format_exc() if app.debug else None
}), 500
# Captive portal redirect middleware
@@ -470,8 +492,7 @@ def system_status_generator():
}
yield status
except Exception as e:
app.logger.error("SSE generator error", exc_info=True)
yield {'error': 'An error occurred; see server logs'}
yield {'error': str(e)}
time.sleep(10) # Update every 10 seconds (reduced frequency for better performance)
# Display preview generator for SSE
@@ -534,8 +555,7 @@ def display_preview_generator():
}
except Exception as e:
app.logger.error("SSE generator error", exc_info=True)
yield {'error': 'An error occurred; see server logs'}
yield {'error': str(e)}
time.sleep(1.0) # Check once per second — halves PIL encode overhead vs 0.5s
@@ -578,19 +598,17 @@ def logs_generator():
except subprocess.TimeoutExpired:
# Timeout - just skip this update
pass
except Exception:
app.logger.error("Error running journalctl", exc_info=True)
except Exception as e:
error_data = {
'timestamp': time.time(),
'logs': 'Error running journalctl; see server logs'
'logs': f'Error running journalctl: {str(e)}'
}
yield error_data
except Exception:
app.logger.error("Unexpected error in logs generator", exc_info=True)
except Exception as e:
error_data = {
'timestamp': time.time(),
'logs': 'Unexpected error in logs generator; see server logs'
'logs': f'Unexpected error in logs generator: {str(e)}'
}
yield error_data
@@ -698,41 +716,6 @@ def _run_startup_reconciliation() -> None:
"manual 'Reconcile' action to resolve.",
len(result.inconsistencies_manual),
)
# Write status file so the web UI can surface unresolved issues as a
# banner without the user having to read journalctl. Mirrors the
# hw_status pattern (/tmp/led_matrix_hw_status.json).
import json as _json, tempfile as _tempfile, os as _os
_recon_status = {
"done": True,
"successful": result.reconciliation_successful,
"fixed_count": len(result.inconsistencies_fixed),
"unresolved": [
{
"plugin_id": inc.plugin_id,
"type": inc.inconsistency_type.value,
"description": inc.description,
}
for inc in result.inconsistencies_manual
],
}
_recon_path = _os.path.join(_tempfile.gettempdir(), "ledmatrix_reconciliation.json")
_tmp = None
try:
if not _os.path.islink(_recon_path):
_fd, _tmp = _tempfile.mkstemp(dir=_tempfile.gettempdir(), prefix=".led_recon_")
with _os.fdopen(_fd, "w") as _f:
_json.dump(_recon_status, _f)
_os.replace(_tmp, _recon_path)
_tmp = None # Rename succeeded; nothing to clean up
except (OSError, ValueError, TypeError) as _e:
_logger.warning("[Reconciliation] Could not write status file: %s", _e)
finally:
if _tmp is not None and _os.path.exists(_tmp):
try:
_os.unlink(_tmp)
except OSError:
pass
except Exception as e:
_logger.error("[Reconciliation] Error: %s", e, exc_info=True)
finally:

File diff suppressed because it is too large Load Diff

View File

@@ -2,8 +2,6 @@ from flask import Blueprint, render_template, flash
from markupsafe import escape
import json
import logging
import os
import re
from pathlib import Path
from src.web_interface.secret_helpers import mask_secret_fields
@@ -86,11 +84,10 @@ def load_partial(partial_name):
elif partial_name == 'operation-history':
return _load_operation_history_partial()
else:
return "Partial not found", 404
return f"Partial '{partial_name}' not found", 404
except Exception as e:
logger.error("Error loading partial %s", partial_name, exc_info=True)
return "Error loading partial", 500
return f"Error loading partial '{partial_name}': {str(e)}", 500
@pages_v3.route('/partials/plugin-config/<plugin_id>')
@@ -98,9 +95,8 @@ def load_plugin_config_partial(plugin_id):
"""Load plugin configuration partial via HTMX - server-side rendered form"""
try:
return _load_plugin_config_partial(plugin_id)
except Exception:
logger.error("Error loading plugin config partial for %s", plugin_id, exc_info=True)
return '<div class="text-red-500 p-4">Error loading plugin config; see logs for details</div>', 500
except Exception as e:
return f'<div class="text-red-500 p-4">Error loading plugin config: {escape(str(e))}</div>', 500
def _load_overview_partial():
"""Load overview partial with system stats"""
@@ -111,8 +107,7 @@ def _load_overview_partial():
return render_template('v3/partials/overview.html',
main_config=main_config)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_general_partial():
"""Load general settings partial"""
@@ -122,8 +117,7 @@ def _load_general_partial():
return render_template('v3/partials/general.html',
main_config=main_config)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_display_partial():
"""Load display settings partial"""
@@ -133,8 +127,7 @@ def _load_display_partial():
return render_template('v3/partials/display.html',
main_config=main_config)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_durations_partial():
"""Load display durations partial"""
@@ -144,8 +137,7 @@ def _load_durations_partial():
return render_template('v3/partials/durations.html',
main_config=main_config)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_schedule_partial():
"""Load schedule settings partial"""
@@ -161,8 +153,7 @@ def _load_schedule_partial():
dim_schedule_config=dim_schedule_config,
normal_brightness=normal_brightness)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_weather_partial():
@@ -173,8 +164,7 @@ def _load_weather_partial():
return render_template('v3/partials/weather.html',
main_config=main_config)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_stocks_partial():
"""Load stocks configuration partial"""
@@ -184,8 +174,7 @@ def _load_stocks_partial():
return render_template('v3/partials/stocks.html',
main_config=main_config)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_plugins_partial():
"""Load plugins management partial"""
@@ -219,7 +208,7 @@ def _load_plugins_partial():
plugin_info.update(fresh_manifest)
except Exception as e:
# If we can't read the fresh manifest, use the cached one
logger.warning("Could not read fresh manifest for plugin: %s", plugin_id)
print(f"Warning: Could not read fresh manifest for {plugin_id}: {e}")
# Get enabled status from config (source of truth)
# Read from config file first, fall back to plugin instance if config doesn't have the key
@@ -267,13 +256,12 @@ def _load_plugins_partial():
'branch': branch
})
except Exception as e:
logger.error("Error loading plugin data", exc_info=True)
print(f"Error loading plugin data: {e}")
return render_template('v3/partials/plugins.html',
plugins=plugins_data)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_fonts_partial():
"""Load fonts management partial"""
@@ -283,16 +271,14 @@ def _load_fonts_partial():
return render_template('v3/partials/fonts.html',
fonts=fonts_data)
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_logs_partial():
"""Load logs viewer partial"""
try:
return render_template('v3/partials/logs.html')
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_raw_json_partial():
"""Load raw JSON editor partial"""
@@ -309,16 +295,14 @@ def _load_raw_json_partial():
main_config_path=pages_v3.config_manager.get_config_path(),
secrets_config_path=pages_v3.config_manager.get_secrets_path())
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_backup_restore_partial():
"""Load backup & restore partial."""
try:
return render_template('v3/partials/backup_restore.html')
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
@pages_v3.route('/setup')
def captive_setup():
@@ -330,24 +314,21 @@ def _load_wifi_partial():
try:
return render_template('v3/partials/wifi.html')
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_cache_partial():
"""Load cache management partial"""
try:
return render_template('v3/partials/cache.html')
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_operation_history_partial():
"""Load operation history partial"""
try:
return render_template('v3/partials/operation_history.html')
except Exception as e:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
return f"Error: {str(e)}", 500
def _load_plugin_config_partial(plugin_id):
@@ -355,11 +336,6 @@ def _load_plugin_config_partial(plugin_id):
Load plugin configuration partial - server-side rendered form.
This replaces the client-side generateConfigForm() JavaScript.
"""
# Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format
plugin_id = os.path.basename(plugin_id or '')
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._\-:]*$', plugin_id):
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
try:
if not pages_v3.plugin_manager:
return '<div class="text-red-500 p-4">Plugin manager not available</div>', 500
@@ -368,14 +344,6 @@ def _load_plugin_config_partial(plugin_id):
if plugin_id.startswith('starlark:'):
return _load_starlark_config_partial(plugin_id[len('starlark:'):])
# Resolve and validate all plugin paths against the plugins base directory
_plugins_base = Path(pages_v3.plugin_manager.plugins_dir).resolve()
_plugin_dir = (_plugins_base / plugin_id).resolve()
try:
_plugin_dir.relative_to(_plugins_base)
except ValueError:
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
# Try to get plugin info first
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
@@ -385,7 +353,7 @@ def _load_plugin_config_partial(plugin_id):
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
if not plugin_info:
return '<div class="text-red-500 p-4">Plugin not found</div>', 404
return f'<div class="text-red-500 p-4">Plugin "{escape(plugin_id)}" not found</div>', 404
# Get plugin instance (may be None if not loaded)
plugin_instance = pages_v3.plugin_manager.get_plugin(plugin_id)
@@ -397,56 +365,59 @@ def _load_plugin_config_partial(plugin_id):
config = full_config.get(plugin_id, {})
# Load uploaded images from metadata file if images field exists in schema
schema_path_temp = _plugin_dir / "config_schema.json"
# This ensures uploaded images appear even if config hasn't been saved yet
schema_path_temp = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json"
if schema_path_temp.exists():
try:
with open(schema_path_temp, 'r', encoding='utf-8') as f:
temp_schema = json.load(f)
# Check if schema has an images field with x-widget: file-upload
if (temp_schema.get('properties', {}).get('images', {}).get('x-widget') == 'file-upload' or
temp_schema.get('properties', {}).get('images', {}).get('x_widget') == 'file-upload'):
_assets_base = (Path(__file__).parent.parent.parent / 'assets' / 'plugins').resolve()
metadata_file = (_assets_base / plugin_id / 'uploads' / '.metadata.json').resolve()
try:
metadata_file.relative_to(_assets_base)
except ValueError:
metadata_file = None
if metadata_file and metadata_file.exists():
# Load metadata file
# Get PROJECT_ROOT relative to this file
project_root = Path(__file__).parent.parent.parent
metadata_file = project_root / 'assets' / 'plugins' / plugin_id / 'uploads' / '.metadata.json'
if metadata_file.exists():
try:
with open(metadata_file, 'r', encoding='utf-8') as mf:
metadata = json.load(mf)
# Convert metadata dict to list of image objects
images_from_metadata = list(metadata.values())
# Only use metadata images if config doesn't have images or config images is empty
if not config.get('images') or len(config.get('images', [])) == 0:
config['images'] = images_from_metadata
else:
# Merge: add metadata images that aren't already in config
config_image_ids = {img.get('id') for img in config.get('images', []) if img.get('id')}
new_images = [img for img in images_from_metadata if img.get('id') not in config_image_ids]
if new_images:
config['images'] = config.get('images', []) + new_images
except Exception as e:
logger.warning("Could not load plugin upload metadata: %s", e)
print(f"Warning: Could not load metadata for {plugin_id}: {e}")
except Exception as e: # nosec B110 - metadata pre-load is optional; schema loads fully below
logger.debug("Metadata pre-load skipped for plugin %s: %s", plugin_id, e)
# Get plugin schema
schema = {}
schema_path = _plugin_dir / "config_schema.json"
schema_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json"
if schema_path.exists():
try:
with open(schema_path, 'r', encoding='utf-8') as f:
schema = json.load(f)
except Exception as e:
logger.warning("Could not load schema for plugin: %s", e)
print(f"Warning: Could not load schema for {plugin_id}: {e}")
# Get web UI actions from plugin manifest
web_ui_actions = []
manifest_path = _plugin_dir / "manifest.json"
manifest_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "manifest.json"
if manifest_path.exists():
try:
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
web_ui_actions = manifest.get('web_ui_actions', [])
except Exception as e:
logger.warning("Could not load manifest for plugin: %s", e)
print(f"Warning: Could not load manifest for {plugin_id}: {e}")
# Mask secret fields before rendering template (fail closed — never leak secrets)
schema_properties = schema.get('properties') if isinstance(schema, dict) else None
@@ -482,24 +453,20 @@ def _load_plugin_config_partial(plugin_id):
)
except Exception as e:
logger.error("Error loading plugin config partial for %s", plugin_id, exc_info=True)
return '<div class="text-red-500 p-4">Error loading plugin config; see logs for details</div>', 500
import traceback
traceback.print_exc()
return f'<div class="text-red-500 p-4">Error loading plugin config: {escape(str(e))}</div>', 500
def _load_starlark_config_partial(app_id):
"""Load configuration partial for a Starlark app."""
# Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format
app_id = os.path.basename(app_id or '')
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_\-]*$', app_id):
return '<div class="text-red-500 p-4">Invalid app ID</div>', 400
try:
starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None
if starlark_plugin and hasattr(starlark_plugin, 'apps'):
app = starlark_plugin.apps.get(app_id)
if not app:
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
return render_template(
'v3/partials/starlark_config.html',
app_id=app_id,
@@ -515,45 +482,36 @@ def _load_starlark_config_partial(app_id):
)
# Standalone: read from manifest file
starlark_base = (Path(__file__).resolve().parent.parent.parent / 'starlark-apps').resolve()
manifest_file = starlark_base / 'manifest.json'
manifest_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / 'manifest.json'
if not manifest_file.exists():
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
with open(manifest_file, 'r') as f:
manifest = json.load(f)
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
# Load schema from schema.json if it exists — validate path stays within starlark_base
# Load schema from schema.json if it exists
schema = None
schema_file = (starlark_base / app_id / 'schema.json').resolve()
try:
schema_file.relative_to(starlark_base)
except ValueError:
schema_file = None
if schema_file and schema_file.exists():
schema_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'schema.json'
if schema_file.exists():
try:
with open(schema_file, 'r') as f:
schema = json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.warning("Could not load starlark schema for app: %s", e)
logger.warning(f"[Pages V3] Could not load schema for {app_id}: {e}", exc_info=True)
# Load config from config.json if it exists — validate path stays within starlark_base
# Load config from config.json if it exists
config = {}
config_file = (starlark_base / app_id / 'config.json').resolve()
try:
config_file.relative_to(starlark_base)
except ValueError:
config_file = None
if config_file and config_file.exists():
config_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'config.json'
if config_file.exists():
try:
with open(config_file, 'r') as f:
config = json.load(f)
except (OSError, json.JSONDecodeError) as e:
logger.warning("Could not load starlark config for app: %s", e)
logger.warning(f"[Pages V3] Could not load config for {app_id}: {e}", exc_info=True)
return render_template(
'v3/partials/starlark_config.html',
@@ -570,5 +528,5 @@ def _load_starlark_config_partial(app_id):
)
except Exception as e:
logger.error("[Pages V3] Error loading starlark config for app", exc_info=True)
return '<div class="text-red-500 p-4">Error loading starlark config; see logs for details</div>', 500
logger.exception(f"[Pages V3] Error loading starlark config for {app_id}")
return f'<div class="text-red-500 p-4">Error loading starlark config: {str(e)}</div>', 500

View File

@@ -51,10 +51,8 @@
sanitizeValue(value) {
// Base implementation - widgets should override for specific needs
if (typeof value === 'string') {
// Strip all HTML tags via the DOM parser to prevent XSS
const div = document.createElement('div');
div.textContent = value;
return div.textContent;
// Basic XSS prevention
return value.replace(/<script\b[^<]*(?:(?!<\/script>)<[^<]*)*<\/script>/gi, '');
}
return value;
}

View File

@@ -1442,14 +1442,9 @@ function renderInstalledPlugins(plugins) {
return;
}
// Helper function to escape values for use in HTML attributes
// Helper function to escape attributes for use in HTML
const escapeAttr = (text) => {
return (text || '')
.replace(/&/g, '&amp;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&#39;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;');
return (text || '').replace(/'/g, "\\'").replace(/"/g, '&quot;');
};
// Helper function to escape for JavaScript strings (use JSON.stringify for proper escaping)
@@ -4512,8 +4507,6 @@ function syncFormToJson() {
// Deep merge with existing config to preserve nested structures
function deepMerge(target, source) {
for (const key in source) {
if (key === '__proto__' || key === 'constructor' || key === 'prototype') continue;
if (!Object.prototype.hasOwnProperty.call(source, key)) continue;
if (source[key] && typeof source[key] === 'object' && !Array.isArray(source[key])) {
if (!target[key] || typeof target[key] !== 'object' || Array.isArray(target[key])) {
target[key] = {};
@@ -7480,28 +7473,17 @@ setTimeout(function() {
console.log('installed-plugins-grid not found yet, will retry via event listeners');
}
// Also try to attach install button handler after a delay (fallback).
// Only run if the install button element is already in the DOM (i.e. the
// plugins partial has been loaded); otherwise the htmx:afterSettle listener
// below handles it when the tab is first visited.
// Also try to attach install button handler after a delay (fallback)
setTimeout(() => {
if (typeof window.attachInstallButtonHandler === 'function' &&
document.getElementById('install-plugin-from-url')) {
if (typeof window.attachInstallButtonHandler === 'function') {
console.log('[FALLBACK] Attempting to attach install button handler...');
window.attachInstallButtonHandler();
} else {
console.warn('[FALLBACK] attachInstallButtonHandler not available on window');
}
}, 500);
}, 200);
// Re-run install button wiring after HTMX settles the plugins tab content.
// Guard with element check so it only fires when the plugins partial is in the DOM,
// preventing spurious warnings on other tab loads.
document.addEventListener('htmx:afterSettle', function() {
if (document.getElementById('install-plugin-from-url') &&
typeof window.attachInstallButtonHandler === 'function') {
window.attachInstallButtonHandler();
}
});
// ─── Starlark Apps Integration ──────────────────────────────────────────────
(function() {

View File

@@ -136,7 +136,6 @@
setTimeout(function() {
if (typeof htmx !== 'undefined') {
console.log('HTMX loaded from fallback');
window.dispatchEvent(new Event('htmx:ready'));
// Load extensions after core loads
loadScript(sseSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/sse.js' : '/static/v3/js/htmx-sse.js');
loadScript(jsonEncSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/json-enc.js' : '/static/v3/js/htmx-json-enc.js');
@@ -153,7 +152,6 @@
}
} else {
console.log('HTMX loaded successfully');
window.dispatchEvent(new Event('htmx:ready'));
// Load extensions after core loads
loadScript(sseSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/sse.js' : '/static/v3/js/htmx-sse.js');
loadScript(jsonEncSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/json-enc.js' : '/static/v3/js/htmx-json-enc.js');
@@ -351,20 +349,6 @@
}
}
});
// Set data-loaded on tab containers after HTMX settles their content,
// preventing repeated re-fetches on every tab switch.
// Scoped to elements with hx-trigger="revealed" (tab containers only) so
// modals and plugin config panels that legitimately reload are unaffected.
document.body.addEventListener('htmx:afterSettle', function(event) {
if (event.detail && event.detail.target) {
var target = event.detail.target;
var trigger = target.getAttribute('hx-trigger') || '';
if (trigger.includes('revealed')) {
target.setAttribute('data-loaded', 'true');
}
}
});
} else {
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', setupScriptExecution);
@@ -427,9 +411,6 @@
.then(html => {
clearTimeout(timeout);
content.innerHTML = html;
if (typeof htmx !== 'undefined') {
htmx.process(content);
}
// Trigger full initialization chain
if (window.pluginManager) {
window.pluginManager.initialized = false;
@@ -449,7 +430,7 @@
}
// Fallback if HTMX doesn't load within 5 seconds
var _pluginsFallbackTimer = setTimeout(() => {
setTimeout(() => {
if (typeof htmx === 'undefined') {
console.warn('HTMX not loaded after 5 seconds, using direct fetch for plugins');
// Load plugins tab content directly regardless of active tab,
@@ -457,7 +438,6 @@
loadPluginsDirect();
}
}, 5000);
window.addEventListener('htmx:ready', function() { clearTimeout(_pluginsFallbackTimer); }, { once: true });
</script>
<!-- Alpine.js app function - defined early so it's available when Alpine initializes -->
<script>
@@ -1050,9 +1030,6 @@
.then(html => {
overviewContent.innerHTML = html;
overviewContent.setAttribute('data-loaded', 'true');
if (typeof htmx !== 'undefined') {
htmx.process(overviewContent);
}
// Re-initialize Alpine.js for the new content
if (window.Alpine) {
window.Alpine.initTree(overviewContent);
@@ -1081,7 +1058,7 @@
});
// Also try direct load if HTMX doesn't load within 5 seconds
var _overviewFallbackTimer = setTimeout(() => {
setTimeout(() => {
if (typeof htmx === 'undefined') {
console.warn('HTMX not loaded after 5 seconds, using direct fetch for content');
const appElement = document.querySelector('[x-data="app()"]');
@@ -1093,7 +1070,6 @@
}
}
}, 5000);
window.addEventListener('htmx:ready', function() { clearTimeout(_overviewFallbackTimer); }, { once: true });
</script>
<!-- General tab -->
@@ -1840,18 +1816,13 @@
htmx.trigger(contentEl, 'revealed');
}
} else {
// HTMX is still loading asynchronously — retry when it signals ready,
// or fall back to direct fetch if it fails to load entirely.
const self = this;
function onReady() { window.removeEventListener('htmx-load-failed', onFailed); self.loadTabContent(tab); }
function onFailed() {
window.removeEventListener('htmx:ready', onReady);
if (tab === 'overview' && typeof loadOverviewDirect === 'function') loadOverviewDirect();
else if (tab === 'wifi' && typeof loadWifiDirect === 'function') loadWifiDirect();
else if (tab === 'plugins' && typeof loadPluginsDirect === 'function') loadPluginsDirect();
// HTMX not available, use direct fetch
console.warn('HTMX not available, using direct fetch for tab:', tab);
if (tab === 'overview' && typeof loadOverviewDirect === 'function') {
loadOverviewDirect();
} else if (tab === 'wifi' && typeof loadWifiDirect === 'function') {
loadWifiDirect();
}
window.addEventListener('htmx:ready', onReady, { once: true });
window.addEventListener('htmx-load-failed', onFailed, { once: true });
}
},

View File

@@ -73,7 +73,7 @@
<button hx-post="/api/v3/system/action"
hx-vals='{"action": "start_display"}'
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Display started',s='success'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Display started', event.detail.xhr.responseJSON.status || 'success'); }"
class="inline-flex items-center px-4 py-2 border border-transparent text-sm font-medium rounded-md text-white bg-green-600 hover:bg-green-700">
<i class="fas fa-play mr-2"></i>
Start Display
@@ -82,7 +82,7 @@
<button hx-post="/api/v3/system/action"
hx-vals='{"action": "stop_display"}'
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Display stopped',s='success'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Display stopped', event.detail.xhr.responseJSON.status || 'success'); }"
class="inline-flex items-center px-4 py-2 border border-transparent text-sm font-medium rounded-md text-white bg-red-600 hover:bg-red-700">
<i class="fas fa-stop mr-2"></i>
Stop Display
@@ -91,7 +91,7 @@
<button hx-post="/api/v3/system/action"
hx-vals='{"action": "git_pull"}'
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Code update completed',s='info'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Code update completed', event.detail.xhr.responseJSON.status || 'info'); }"
class="inline-flex items-center px-4 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-download mr-2"></i>
Update Code
@@ -101,7 +101,7 @@
hx-vals='{"action": "reboot_system"}'
hx-confirm="Are you sure you want to reboot the system?"
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='System rebooting...',s='info'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'System rebooting...', event.detail.xhr.responseJSON.status || 'info'); }"
class="inline-flex items-center px-4 py-2 border border-transparent text-sm font-medium rounded-md text-white bg-yellow-600 hover:bg-yellow-700">
<i class="fas fa-power-off mr-2"></i>
Reboot System

View File

@@ -843,14 +843,6 @@ async function updateFontPreview() {
return;
}
// BDF bitmap fonts cannot be rendered server-side — skip the API call
if (family.toLowerCase().endsWith('.bdf')) {
previewImage.style.display = 'none';
loadingText.style.display = 'block';
loadingText.textContent = 'Preview not available for BDF bitmap fonts';
return;
}
// Show loading state
loadingText.textContent = 'Loading preview...';
loadingText.style.display = 'block';

View File

@@ -1,66 +1,3 @@
<!-- Reconciliation warning banner: shown when startup reconciliation found stale plugin config entries -->
<div id="reconciliation-banner" class="bg-yellow-50 border border-yellow-300 rounded-lg p-4 mb-4 flex items-start" style="display:none !important" role="alert">
<div class="flex-shrink-0 mr-3 mt-0.5">
<i class="fas fa-exclamation-triangle text-yellow-500"></i>
</div>
<div class="flex-1">
<p class="text-sm font-medium text-yellow-800">Plugin Config Warning</p>
<p class="text-sm text-yellow-700 mt-1" id="reconciliation-banner-text"></p>
</div>
<button type="button" onclick="window.dismissReconciliationBanner()" class="ml-4 flex-shrink-0 text-yellow-500 hover:text-yellow-700" aria-label="Dismiss">
<i class="fas fa-times"></i>
</button>
</div>
<script>
(function () {
var DISMISS_KEY = 'ledmatrix-recon-dismissed';
var _recon_timer = null;
function checkReconciliation() {
fetch('/api/v3/plugins/reconciliation-status')
.then(function (r) { return r.json(); })
.then(function (resp) {
var d = resp.data || {};
if (!d.done) {
// Reconciliation still running — poll again shortly
_recon_timer = setTimeout(checkReconciliation, 2000);
return;
}
_recon_timer = null;
if (!d.unresolved || d.unresolved.length === 0) return;
var key = d.unresolved.map(function (i) { return i.plugin_id; }).sort().join(',');
if (sessionStorage.getItem(DISMISS_KEY) === key) return;
var ids = d.unresolved.map(function (i) { return i.plugin_id; }).join(', ');
document.getElementById('reconciliation-banner-text').textContent =
'Stale plugin config entries found: ' + ids +
'. Remove them from config.json or reinstall via the Plugin Store.';
var banner = document.getElementById('reconciliation-banner');
banner.dataset.dismissKey = key;
banner.style.setProperty('display', 'flex', 'important');
})
.catch(function () {});
}
checkReconciliation();
window.dismissReconciliationBanner = function () {
var banner = document.getElementById('reconciliation-banner');
banner.style.setProperty('display', 'none', 'important');
if (_recon_timer !== null) {
clearTimeout(_recon_timer);
_recon_timer = null;
}
// Persist dismissal immediately so the banner won't reappear on reload
// even if the background sync fetch below fails.
var key = banner.dataset.dismissKey;
if (key) {
try { sessionStorage.setItem(DISMISS_KEY, key); } catch (e) {}
}
// Background sync only — do not rely on this for DISMISS_KEY or hiding.
fetch('/api/v3/plugins/reconciliation-status').catch(function () {});
};
}());
</script>
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<h2 class="text-lg font-semibold text-gray-900">System Overview</h2>
@@ -151,7 +88,7 @@
<button hx-post="/api/v3/system/action"
hx-vals='{"action": "start_display"}'
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Display started',s='success'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Display started', event.detail.xhr.responseJSON.status || 'success'); }"
class="inline-flex items-center px-4 py-2 border border-transparent text-base font-semibold rounded-md text-white bg-green-600 hover:bg-green-700">
<i class="fas fa-play mr-2"></i>
Start Display
@@ -160,7 +97,7 @@
<button hx-post="/api/v3/system/action"
hx-vals='{"action": "stop_display"}'
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Display stopped',s='success'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Display stopped', event.detail.xhr.responseJSON.status || 'success'); }"
class="inline-flex items-center px-4 py-2 border border-transparent text-base font-semibold rounded-md text-white bg-red-600 hover:bg-red-700">
<i class="fas fa-stop mr-2"></i>
Stop Display
@@ -170,7 +107,7 @@
hx-vals='{"action": "git_pull"}'
hx-confirm="This will stash any local changes and update the code. Continue?"
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Code update completed',s='info'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Code update completed', event.detail.xhr.responseJSON.status || 'info'); }"
class="inline-flex items-center px-4 py-2 border border-gray-300 text-base font-semibold rounded-md text-gray-900 bg-white hover:bg-gray-50">
<i class="fas fa-download mr-2"></i>
Update Code
@@ -180,7 +117,7 @@
hx-vals='{"action": "reboot_system"}'
hx-confirm="Are you sure you want to reboot the system?"
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='System rebooting...',s='info'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'System rebooting...', event.detail.xhr.responseJSON.status || 'info'); }"
class="inline-flex items-center px-4 py-2 border border-transparent text-base font-semibold rounded-md text-white bg-yellow-600 hover:bg-yellow-700">
<i class="fas fa-power-off mr-2"></i>
Reboot System
@@ -190,7 +127,7 @@
hx-vals='{"action": "shutdown_system"}'
hx-confirm="Are you sure you want to shut down the system? This will power off the Raspberry Pi."
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='System shutting down...',s='info'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'System shutting down...', event.detail.xhr.responseJSON.status || 'info'); }"
class="inline-flex items-center px-4 py-2 border border-transparent text-base font-semibold rounded-md text-white bg-red-800 hover:bg-red-900">
<i class="fas fa-power-off mr-2"></i>
Shutdown System
@@ -199,7 +136,7 @@
<button hx-post="/api/v3/system/action"
hx-vals='{"action": "restart_display_service"}'
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Display service restarted',s='success'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Display service restarted', event.detail.xhr.responseJSON.status || 'success'); }"
class="inline-flex items-center px-4 py-2 border border-gray-300 text-base font-semibold rounded-md text-gray-900 bg-white hover:bg-gray-50">
<i class="fas fa-redo mr-2"></i>
Restart Display Service
@@ -208,7 +145,7 @@
<button hx-post="/api/v3/system/action"
hx-vals='{"action": "restart_web_service"}'
hx-swap="none"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined') { var m='Web service restarted',s='success'; try { var d=JSON.parse(event.detail.xhr.responseText); m=d.message||m; s=d.status||s; } catch(e) { s=(event.detail.xhr&&event.detail.xhr.status>=400?'error':s); } showNotification(m,s); }"
hx-on:htmx:after-request="if (typeof showNotification !== 'undefined' && event.detail.xhr && event.detail.xhr.responseJSON) { showNotification(event.detail.xhr.responseJSON.message || 'Web service restarted', event.detail.xhr.responseJSON.status || 'success'); }"
class="inline-flex items-center px-4 py-2 border border-gray-300 text-base font-semibold rounded-md text-gray-900 bg-white hover:bg-gray-50">
<i class="fas fa-redo mr-2"></i>
Restart Web Service

View File

@@ -9,8 +9,7 @@
{% set field_id = (plugin_id ~ '-' ~ full_key)|replace('.', '-')|replace('_', '-') %}
{% set label = prop.title if prop.title else key|replace('_', ' ')|title %}
{% set description = prop.description if prop.description else '' %}
{% set _pt = prop.get('type') %}
{% set field_type = _pt if (_pt is string) else ((_pt | first) if (_pt and _pt is iterable and _pt is not string) else 'string') %}
{% set field_type = prop.type if prop.type is string else (prop.type[0] if prop.type is iterable else 'string') %}
{# Handle nested objects - check for widget first #}
{% if field_type == 'object' %}