13 Commits

Author SHA1 Message Date
Chuck
909db0993f Fix three remaining CodeQL path-injection and info-exposure alerts
- plugin_loader.py: resolve plugin_dir with strict=True and validate
  marker_path with relative_to() before any filesystem writes, giving
  CodeQL the positive sanitization pattern it requires (py/path-injection)
- api_v3.py _safe_backup_path: replace substring negative checks with a
  strict positive regex (^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$) that
  CodeQL recognises as sanitising the user-supplied filename
  (py/path-injection)
- api_v3.py backup_validate: whitelist known-safe manifest fields before
  returning JSON, preventing any exception strings captured inside
  validate_backup() from reaching the HTTP response (py/stack-trace-exposure)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 22:08:52 -04:00
Chuck
1d2303e620 Fix remaining GitHub CodeQL security alerts
- py/stack-trace-exposure: Remove str(e) and traceback.format_exc() from
  all HTTP responses across api_v3.py, pages_v3.py, and app.py; replace
  with generic messages and logger.error(exc_info=True)
- py/reflective-xss: Escape partial_name via markupsafe.escape in the
  load_partial 404 response
- py/path-injection: Add regex validation of plugin_id before filesystem
  use in _load_plugin_config_partial
- py/incomplete-url-substring-sanitization: Replace 'github.com' in
  substring checks with urlparse hostname comparison in store_manager.py
- py/clear-text-logging-sensitive-data: Remove football-scoreboard debug
  prints and sensitive request-body prints from update endpoint
- js/bad-tag-filter: Replace script-only regex in BaseWidget.sanitizeValue
  with DOM-based textContent stripping that removes all HTML
- js/incomplete-sanitization: Fix escapeAttr to properly encode &, ", ',
  <, > using HTML entities instead of backslash escaping
- js/prototype-pollution-utility: Add __proto__/constructor/prototype
  key guards to deepMerge function in plugins_manager.js
- app.py error handlers: Always return generic messages; remove debug-mode
  branches that could expose tracebacks in production

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 19:01:06 -04:00
Chuck
8652aacf37 Fix pre-existing information exposure in version and action endpoints
- get_system_version (alert #218): replaced str(e) with generic message;
  exception still logged via logger.error(exc_info=True)
- execute_system_action (alert #216): removed str(e) and full
  traceback.format_exc() from the HTTP response — the full stack trace
  was being sent directly to clients; replaced with generic message and
  proper logger.error call

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 16:50:24 -04:00
Chuck
76507014ce Resolve CodeQL security findings in backup API
Path traversal (CWE-22):
- backup_download: switch from send_file(user-tainted-path) to
  send_from_directory(_BACKUP_EXPORT_DIR, filename); Flask uses
  werkzeug safe_join internally which CodeQL recognises as a sanitizer
- backup_delete: enumerate the export directory and match by name so
  entry.unlink() operates on a filesystem-derived Path rather than one
  constructed from user input; _safe_backup_path still guards first

Information exposure through exceptions (CWE-209):
- backup_validate: err_msg from validate_backup() can embed exception
  strings containing temp-file paths; log the detail, return a generic
  'Invalid or corrupted backup file' to the client
- Other backup endpoints: already fixed (str(e) -> generic message);
  CodeQL alerts will clear on next scan

plugin_loader.py:185 (path traversal): false positive — requirements_file
is constructed from plugin_dir returned by find_plugin_directory() (a
filesystem scan), not from raw HTTP request input; no change needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 16:44:33 -04:00
Chuck
53806da8c5 Fix uninstall-no-record-file detection condition
The previous check used a string replacement that left 'error:' in the
remaining text, causing the condition to always evaluate false. Simplify
to a direct substring check: if 'uninstall-no-record-file' appears in pip
stderr the affected package is installed at the system level and we write
the marker, suppressing the repeated warning on every restart.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 16:33:24 -04:00
Chuck
3d4de89fd5 Treat system-managed pip packages as satisfied for dependency marker
When a plugin's requirements.txt includes a package installed via the
system package manager (dnf/apt), pip fails with 'uninstall-no-record-file'
because it can't replace the system-tracked copy. The package is present
and functional, but the missing marker caused the install to be retried
on every service restart.

Detect this specific error pattern: if the only pip failure is
uninstall-no-record-file, write the .dependencies_installed marker and
log a warning instead of returning False, suppressing the repeated warning.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 16:32:04 -04:00
Chuck
505fed70e3 Address review feedback: error leaks, ok:false, htmx:ready coverage
- Backup endpoints: replace raw str(e) in user-facing responses with a
  generic message; full exception still logged via exc_info=True
- hardware/status: change ok:null to ok:false for PermissionError and
  json.JSONDecodeError so the UI's hw.ok===false check triggers correctly
- base.html: dispatch htmx:ready from the fallback load path so any
  deferred listeners fire on CDN-fallback loads too
- loadTabContent: also listen for htmx-load-failed so overview/wifi/plugins
  fall back to direct fetch when HTMX is completely unavailable

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 16:30:24 -04:00
Chuck
c8d2eaeb85 Cancel HTMX fallback timers when htmx:ready fires
The 5-second setTimeout fallbacks for plugins and overview were firing
before the htmx:ready event arrived, logging spurious warnings. Each
timer now self-cancels via htmx:ready so the fallback only triggers
when HTMX genuinely fails to load.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 16:18:37 -04:00
Chuck
745ba8101e Fix backup API 404s, hardware status 500, and HTMX loading race
- Add all backup API routes to api_v3.py: preview, list, export,
  validate, restore (with plugin reinstall), download, delete
- Fix PermissionError on /hardware/status: return graceful 200 instead
  of 500 when the status file is owned by a different user; also fix
  root cause by writing the file world-readable (0o644) in display_manager
- Fix HTMX race: dispatch htmx:ready window event from HTMX onload
  callback; loadTabContent now waits for that event instead of
  immediately falling back to direct fetch (eliminating the
  "HTMX not available" console warning on initial load)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 16:16:31 -04:00
Chuck
ddc53ff1e0 fix(web-ui): guard setTimeout fallback for attachInstallButtonHandler
The 500ms fallback setTimeout was calling attachInstallButtonHandler()
unconditionally even when the plugins partial wasn't in the DOM, causing
a spurious console.warn on every page load. Add the same element-existence
check already present on the htmx:afterSettle listener.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 12:29:34 -04:00
Chuck
2cd3dbabe5 fix(web-ui): show error toast on non-JSON 4xx/5xx quick-action responses
In the catch block of all 11 hx-on:htmx:after-request handlers, check
xhr.status >= 400 and downgrade s to 'error' so a failed action that
returns an HTML error page (or other non-JSON body) surfaces as an error
toast instead of the optimistic 'success'/'info' default.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 12:20:54 -04:00
Chuck
f4e7fea7bb fix(web-ui): ensure quick-action toasts always fire even on xhr/parse failure
Replace silent catch(e){} in all 11 hx-on:htmx:after-request handlers with a
pattern that sets default message/status before the try block and calls
showNotification(m,s) unconditionally after it, so a fallback toast is shown
whenever xhr is absent or responseText is not valid JSON.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 10:32:56 -04:00
Chuck
a5c7ef20ec fix(web-ui): fix quick actions not firing, add toast feedback, suppress install handler warning
- base.html: add htmx:afterSettle listener to set data-loaded on tab
  containers after HTMX swaps their content, preventing the overview
  partial from being re-fetched (and handlers lost) on every tab switch
- base.html: call htmx.process() in loadOverviewDirect/loadPluginsDirect
  fallbacks so buttons get HTMX handlers even if HTMX finished its
  initial body scan before the fallback fetch completed
- overview.html + index.html (11 buttons): replace event.detail.xhr.responseJSON
  (undefined in HTMX 1.9.x) with JSON.parse(event.detail.xhr.responseText)
  so quick action toast notifications actually fire
- plugins_manager.js: add guarded htmx:afterSettle listener that only calls
  attachInstallButtonHandler when #install-plugin-from-url is in the DOM,
  eliminating the spurious console warning on non-plugin tab loads

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 10:21:20 -04:00
15 changed files with 289 additions and 2838 deletions

View File

@@ -67,9 +67,8 @@ def main():
print(" 📍 Will run on: http://0.0.0.0:5000") print(" 📍 Will run on: http://0.0.0.0:5000")
print(" ⏹️ Press Ctrl+C to stop") print(" ⏹️ Press Ctrl+C to stop")
# Run the app (debug mode controlled by env var to satisfy security scanners) # Run the app (this should start the server)
_debug = os.environ.get('LEDMATRIX_FLASK_DEBUG', '0') == '1' app.run(host='0.0.0.0', port=5000, debug=True)
app.run(host='0.0.0.0', port=5000, debug=_debug)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n ⏹️ Server stopped by user") print("\n ⏹️ Server stopped by user")

View File

@@ -410,8 +410,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
try: try:
manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8") manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8")
manifest = json.loads(manifest_raw) manifest = json.loads(manifest_raw)
except (OSError, UnicodeDecodeError, json.JSONDecodeError): except (OSError, UnicodeDecodeError, json.JSONDecodeError) as e:
return False, "Invalid manifest.json", {} return False, f"Invalid manifest.json: {e}", {}
if not isinstance(manifest, dict) or "schema_version" not in manifest: if not isinstance(manifest, dict) or "schema_version" not in manifest:
return False, "Invalid manifest structure", {} return False, "Invalid manifest structure", {}
@@ -456,8 +456,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
return True, "", result_manifest return True, "", result_manifest
except zipfile.BadZipFile: except zipfile.BadZipFile:
return False, "File is not a valid ZIP archive", {} return False, "File is not a valid ZIP archive", {}
except OSError: except OSError as e:
return False, "Could not read backup", {} return False, f"Could not read backup: {e}", {}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -8,7 +8,6 @@ Extracted from PluginManager to improve separation of concerns.
import json import json
import importlib import importlib
import importlib.util import importlib.util
import os
import sys import sys
import subprocess import subprocess
import threading import threading
@@ -69,11 +68,6 @@ class PluginLoader:
Returns: Returns:
Path to plugin directory or None if not found Path to plugin directory or None if not found
""" """
# Sanitize plugin_id — os.path.basename is a CodeQL-recognized path sanitizer
plugin_id = os.path.basename(plugin_id or '')
if not plugin_id:
return None
# Strategy 1: Use mapping from discovery # Strategy 1: Use mapping from discovery
if plugin_directories and plugin_id in plugin_directories: if plugin_directories and plugin_id in plugin_directories:
plugin_dir = plugin_directories[plugin_id] plugin_dir = plugin_directories[plugin_id]
@@ -81,16 +75,14 @@ class PluginLoader:
self.logger.debug("Using plugin directory from discovery mapping: %s", plugin_dir) self.logger.debug("Using plugin directory from discovery mapping: %s", plugin_dir)
return plugin_dir return plugin_dir
# Strategy 2: Direct paths — resolve and validate they stay within plugins_dir # Strategy 2: Direct paths
plugins_dir_resolved = plugins_dir.resolve() plugin_dir = plugins_dir / plugin_id
for _candidate_name in (plugin_id, f"ledmatrix-{plugin_id}"): if plugin_dir.exists():
_candidate = (plugins_dir_resolved / _candidate_name).resolve() return plugin_dir
try:
_candidate.relative_to(plugins_dir_resolved) plugin_dir = plugins_dir / f"ledmatrix-{plugin_id}"
except ValueError: if plugin_dir.exists():
continue return plugin_dir
if _candidate.exists():
return _candidate
# Strategy 3: Case-insensitive search # Strategy 3: Case-insensitive search
normalized_id = plugin_id.lower() normalized_id = plugin_id.lower()
@@ -151,19 +143,21 @@ class PluginLoader:
Returns: Returns:
True if dependencies installed or not needed, False on error True if dependencies installed or not needed, False on error
""" """
plugin_id = os.path.basename(plugin_id or '') requirements_file = plugin_dir / "requirements.txt"
if not plugin_id: if not requirements_file.exists():
return False return True # No dependencies needed
# Resolve and validate plugin_dir before constructing any derived paths
# Resolve and validate plugin_dir before constructing derived paths from it
try: try:
plugin_dir_resolved = plugin_dir.resolve(strict=True) plugin_dir_resolved = plugin_dir.resolve(strict=True)
except OSError: except OSError:
self.logger.error("Plugin directory does not exist: %s", plugin_dir) self.logger.error("Plugin directory does not exist: %s", plugin_dir)
return False return False
requirements_file = plugin_dir_resolved / "requirements.txt"
if not requirements_file.exists():
return True # No dependencies needed
marker_path = plugin_dir_resolved / ".dependencies_installed" marker_path = plugin_dir_resolved / ".dependencies_installed"
try:
marker_path.relative_to(plugin_dir_resolved)
except ValueError:
return False
# Check if already installed # Check if already installed
if marker_path.exists(): if marker_path.exists():
@@ -380,20 +374,9 @@ class PluginLoader:
Returns: Returns:
Loaded module or None on error Loaded module or None on error
""" """
plugin_id = os.path.basename(plugin_id or '') entry_file = plugin_dir / entry_point
if not plugin_id:
raise PluginError("Invalid plugin ID")
try:
plugin_dir_resolved = plugin_dir.resolve(strict=True)
except OSError:
raise PluginError("Plugin directory not found", plugin_id=plugin_id)
entry_file = (plugin_dir_resolved / entry_point).resolve()
try:
entry_file.relative_to(plugin_dir_resolved)
except ValueError:
raise PluginError("Invalid entry point path", plugin_id=plugin_id)
if not entry_file.exists(): if not entry_file.exists():
error_msg = f"Entry point file not found for plugin {plugin_id}" error_msg = f"Entry point file not found: {entry_file} for plugin {plugin_id}"
self.logger.error(error_msg) self.logger.error(error_msg)
raise PluginError(error_msg, plugin_id=plugin_id, context={'entry_file': str(entry_file)}) raise PluginError(error_msg, plugin_id=plugin_id, context={'entry_file': str(entry_file)})

View File

@@ -151,18 +151,6 @@ class WiFiManager:
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, " f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}") f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
# Once per process: remove a stale force-AP flag left by a prior crash.
# Guard with a class-level flag so the nmcli AP-state check only runs
# once even though WiFiManager is instantiated per-request.
if not WiFiManager._startup_cleanup_done:
WiFiManager._startup_cleanup_done = True
if self._FORCE_AP_FLAG_PATH.exists() and not self._is_ap_mode_active():
try:
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
logger.debug("Removed stale force-AP flag on startup (AP not active)")
except OSError as exc:
logger.warning(f"Could not remove stale force-AP flag: {exc}")
def _show_led_message(self, message: str, duration: int = 5): def _show_led_message(self, message: str, duration: int = 5):
""" """
Show a WiFi status message on the LED display. Show a WiFi status message on the LED display.
@@ -486,10 +474,7 @@ class WiFiManager:
if result.returncode == 0: if result.returncode == 0:
for line in result.stdout.strip().split('\n'): for line in result.stdout.strip().split('\n'):
if '/' in line: if '/' in line:
# nmcli -t output is "IP4.ADDRESS[1]:x.x.x.x/prefix"; ip_address = line.split('/')[0].strip()
# bare "x.x.x.x/prefix" is also accepted defensively.
_, sep, rest = line.partition(':')
ip_address = (rest if sep else line).split('/')[0].strip()
break break
# Final fallback: Get signal strength by matching SSID in WiFi list # Final fallback: Get signal strength by matching SSID in WiFi list
@@ -515,13 +500,6 @@ class WiFiManager:
# Check if AP mode is active # Check if AP mode is active
ap_active = self._is_ap_mode_active() ap_active = self._is_ap_mode_active()
# wlan0 shows as "connected" in AP mode; clear client-station fields so
# callers don't mistake the AP for an outbound WiFi connection.
if ap_active and wifi_connected:
wifi_connected = False
ssid = None
ip_address = None
logger.debug(f"{wlan_device} is in AP mode — overriding wifi_connected to False")
return WiFiStatus( return WiFiStatus(
connected=wifi_connected, connected=wifi_connected,
@@ -712,10 +690,6 @@ class WiFiManager:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi _IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi
# Written when AP mode is manually force-enabled; prevents daemon auto-disable
_FORCE_AP_FLAG_PATH = Path("/tmp/ledmatrix_force_ap_active") # nosec B108 - process-specific named file; device is single-user RPi
# Ensures the startup stale-flag cleanup runs once per process, not per instantiation
_startup_cleanup_done: bool = False
def _validate_ap_config(self) -> Tuple[str, int]: def _validate_ap_config(self) -> Tuple[str, int]:
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults.""" """Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
@@ -1393,7 +1367,7 @@ class WiFiManager:
logger.error(f"Failed to restore original connection: {original_ssid}") logger.error(f"Failed to restore original connection: {original_ssid}")
# Trigger AP mode as last resort # Trigger AP mode as last resort
self._show_led_message("Enabling AP mode...", duration=5) self._show_led_message("Enabling AP mode...", duration=5)
ap_success, ap_msg = self.enable_ap_mode(force=True) ap_success, ap_msg = self.enable_ap_mode()
if ap_success: if ap_success:
logger.info("AP mode enabled as failsafe") logger.info("AP mode enabled as failsafe")
return False, "Connection failed and restoration failed. AP mode enabled." return False, "Connection failed and restoration failed. AP mode enabled."
@@ -1405,7 +1379,7 @@ class WiFiManager:
elif not success: elif not success:
logger.warning(f"Connection to {ssid} failed and no original connection to restore") logger.warning(f"Connection to {ssid} failed and no original connection to restore")
self._show_led_message("Enabling AP mode...", duration=5) self._show_led_message("Enabling AP mode...", duration=5)
ap_success, ap_msg = self.enable_ap_mode(force=True) ap_success, ap_msg = self.enable_ap_mode()
if ap_success: if ap_success:
logger.info("AP mode enabled as failsafe") logger.info("AP mode enabled as failsafe")
return False, "Connection failed. AP mode enabled." return False, "Connection failed. AP mode enabled."
@@ -1426,7 +1400,7 @@ class WiFiManager:
logger.error(f"Failed to restore after exception: {restore_error}") logger.error(f"Failed to restore after exception: {restore_error}")
# Last resort: enable AP mode # Last resort: enable AP mode
try: try:
self.enable_ap_mode(force=True) self.enable_ap_mode()
except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging
logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True) logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True)
return False, str(e) return False, str(e)
@@ -1490,29 +1464,26 @@ class WiFiManager:
# Show LED message # Show LED message
self._show_led_message(f"Connecting to {ssid}...", duration=10) self._show_led_message(f"Connecting to {ssid}...", duration=10)
# Find existing NM connection for this SSID. # First, check if connection already exists and try to activate it
# 802-11-wireless.ssid is not a valid column in 'nmcli connection show', # NetworkManager connection names might not match SSID exactly, so search by SSID
# so list all wifi connections then query each one's SSID individually. check_result = subprocess.run(
list_result = subprocess.run( # nosec B603 B607 - fixed args, no user input ["nmcli", "-t", "-f", "NAME,802-11-wireless.ssid", "connection", "show"],
["nmcli", "-t", "-f", "NAME,TYPE", "connection", "show"], capture_output=True,
capture_output=True, text=True, timeout=5 text=True,
timeout=5
) )
existing_conn_name = None existing_conn_name = None
if list_result.returncode == 0: if check_result.returncode == 0:
for line in list_result.stdout.strip().split('\n'): for line in check_result.stdout.strip().split('\n'):
if ':' not in line: if ':' in line:
continue parts = line.split(':')
parts = line.split(':') if len(parts) >= 2:
if len(parts) < 2 or parts[1].strip() != '802-11-wireless': conn_name = parts[0].strip()
continue conn_ssid = parts[1].strip() if len(parts) > 1 else ""
conn_name = parts[0].strip() if conn_ssid == ssid:
ssid_r = subprocess.run( # nosec B603 B607 - conn_name from nmcli output, not user input existing_conn_name = conn_name
["nmcli", "-g", "802-11-wireless.ssid", "connection", "show", conn_name], break
capture_output=True, text=True, timeout=5
)
if ssid_r.returncode == 0 and ssid_r.stdout.strip() == ssid:
existing_conn_name = conn_name
break
# Also try direct lookup by SSID (in case connection name matches SSID) # Also try direct lookup by SSID (in case connection name matches SSID)
if not existing_conn_name: if not existing_conn_name:
@@ -1884,7 +1855,7 @@ class WiFiManager:
logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts") logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts")
return False return False
def enable_ap_mode(self, force: bool = False) -> Tuple[bool, str]: def enable_ap_mode(self) -> Tuple[bool, str]:
""" """
Enable access point mode Enable access point mode
@@ -1906,29 +1877,20 @@ class WiFiManager:
if not self._ensure_wifi_radio_enabled(): if not self._ensure_wifi_radio_enabled():
return False, "WiFi radio is disabled and could not be enabled" return False, "WiFi radio is disabled and could not be enabled"
# Check if WiFi is connected (skip when force=True) # Check if WiFi is connected
status = self.get_wifi_status() status = self.get_wifi_status()
if not force and status.connected: if status.connected:
return False, "Cannot enable AP mode while WiFi is connected" return False, "Cannot enable AP mode while WiFi is connected"
# Check if Ethernet is connected (skip when force=True) # Check if Ethernet is connected
if not force and self._is_ethernet_connected(): if self._is_ethernet_connected():
return False, "Cannot enable AP mode while Ethernet is connected" return False, "Cannot enable AP mode while Ethernet is connected"
if force:
logger.debug(f"enable_ap_mode: force=True — WiFi/Ethernet guards bypassed; will create {self._FORCE_AP_FLAG_PATH}")
# Try hostapd/dnsmasq first (captive portal mode) # Try hostapd/dnsmasq first (captive portal mode)
if self.has_hostapd and self.has_dnsmasq: if self.has_hostapd and self.has_dnsmasq:
result = self._enable_ap_mode_hostapd() result = self._enable_ap_mode_hostapd()
if result[0]: if result[0]:
self._ap_enabled_at = time.time() self._ap_enabled_at = time.time()
if force:
try:
self._FORCE_AP_FLAG_PATH.touch()
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
except OSError as exc:
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
return result return result
# Fallback to nmcli hotspot (simpler, no captive portal) # Fallback to nmcli hotspot (simpler, no captive portal)
@@ -1938,12 +1900,6 @@ class WiFiManager:
result = self._enable_ap_mode_nmcli_hotspot() result = self._enable_ap_mode_nmcli_hotspot()
if result[0]: if result[0]:
self._ap_enabled_at = time.time() self._ap_enabled_at = time.time()
if force:
try:
self._FORCE_AP_FLAG_PATH.touch()
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
except OSError as exc:
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
return result return result
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)" return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
@@ -2135,14 +2091,8 @@ class WiFiManager:
self._clear_led_message() self._clear_led_message()
return False, "AP started but captive-portal redirect setup failed" return False, "AP started but captive-portal redirect setup failed"
# Verify the AP is actually running (retry up to 5x with 2s delay for NM async activation) # Verify the AP is actually running
status = {} status = self._get_ap_status_nmcli()
for _attempt in range(5):
status = self._get_ap_status_nmcli()
if status.get('active'):
break
logger.debug(f"AP verification attempt {_attempt + 1}/5 not yet active, waiting 2s")
time.sleep(2)
if status.get('active'): if status.get('active'):
ip = status.get('ip', '192.168.4.1') ip = status.get('ip', '192.168.4.1')
logger.info(f"AP mode confirmed active at {ip} (open network, no password)") logger.info(f"AP mode confirmed active at {ip} (open network, no password)")
@@ -2340,7 +2290,6 @@ class WiFiManager:
logger.warning("WiFi radio may be disabled after nmcli AP cleanup") logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
self._ap_enabled_at = None self._ap_enabled_at = None
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
logger.info("AP mode disabled successfully") logger.info("AP mode disabled successfully")
return True, "AP mode disabled" return True, "AP mode disabled"
except Exception as e: except Exception as e:
@@ -2529,29 +2478,22 @@ address=/detectportal.firefox.com/192.168.4.1
else: else:
logger.warning(f"Failed to enable AP mode: {message}") logger.warning(f"Failed to enable AP mode: {message}")
elif not should_have_ap and ap_active: elif not should_have_ap and ap_active:
# Should not have AP but do - check if it was manually force-enabled # Should not have AP but do - disable AP mode
force_active = self._FORCE_AP_FLAG_PATH.exists() # Always disable if WiFi or Ethernet connects, regardless of auto_enable setting
if status.connected: if status.connected or ethernet_connected:
# WiFi connected: always disable AP (user successfully configured WiFi)
success, message = self.disable_ap_mode() success, message = self.disable_ap_mode()
if success: if success:
logger.info("Auto-disabled AP mode (WiFi connected)") if status.connected:
self._disconnected_checks = 0 logger.info("Auto-disabled AP mode (WiFi connected)")
elif ethernet_connected:
logger.info("Auto-disabled AP mode (Ethernet connected)")
self._disconnected_checks = 0 # Reset counter
return True return True
else: else:
logger.warning(f"Failed to auto-disable AP mode: {message}") logger.warning(f"Failed to auto-disable AP mode: {message}")
elif ethernet_connected and not force_active:
# Ethernet connected, AP not manually forced: auto-disable
success, message = self.disable_ap_mode()
if success:
logger.info("Auto-disabled AP mode (Ethernet connected)")
self._disconnected_checks = 0
return True
else:
logger.warning(f"Failed to auto-disable AP mode: {message}")
elif ethernet_connected and force_active:
logger.debug("AP mode is force-active; Ethernet connected but auto-disable suppressed")
elif not auto_enable: elif not auto_enable:
# AP is active but auto_enable is disabled - this means it was manually enabled
# Don't disable it automatically, let it stay active
logger.debug("AP mode is active (manually enabled), keeping active") logger.debug("AP mode is active (manually enabled), keeping active")
# Idle-timeout check: disable AP if no client has connected within the window. # Idle-timeout check: disable AP if no client has connected within the window.

View File

@@ -1,342 +0,0 @@
"""
Tests for src/base_classes/api_extractors.py
Covers ESPNFootballExtractor, ESPNBaseballExtractor, ESPNHockeyExtractor,
SoccerAPIExtractor, and the shared _extract_common_details logic.
"""
import logging
import pytest
from src.base_classes.api_extractors import (
ESPNFootballExtractor,
ESPNBaseballExtractor,
ESPNHockeyExtractor,
SoccerAPIExtractor,
)
# ---------------------------------------------------------------------------
# Shared test data factories
# ---------------------------------------------------------------------------
def _make_espn_event(state: str = "in", home_abbr: str = "KC", away_abbr: str = "BUF",
home_score: str = "14", away_score: str = "7",
date_str: str = "2024-01-15T20:00:00Z",
include_situation: bool = False,
situation: dict | None = None,
status_detail: str = "2nd Qtr 8:42",
period: int = 2) -> dict:
"""Build a minimal ESPN-style game event dict."""
comp_status = {
"type": {
"state": state,
"shortDetail": status_detail,
"detail": status_detail,
"name": "STATUS_IN_PROGRESS",
},
"period": period,
"displayClock": "8:42",
}
comp = {
"status": comp_status,
"competitors": [
{
"homeAway": "home",
"team": {"abbreviation": home_abbr, "displayName": f"{home_abbr} Team"},
"score": home_score,
},
{
"homeAway": "away",
"team": {"abbreviation": away_abbr, "displayName": f"{away_abbr} Team"},
"score": away_score,
},
],
}
if include_situation:
comp["situation"] = situation or {}
return {
"id": "test-game-1",
"date": date_str,
"competitions": [comp],
}
def _make_logger() -> logging.Logger:
return logging.getLogger("test_extractor")
# ---------------------------------------------------------------------------
# ESPNFootballExtractor
# ---------------------------------------------------------------------------
class TestESPNFootballExtractor:
def setup_method(self):
self.extractor = ESPNFootballExtractor(_make_logger())
def test_extract_live_game_basic_fields(self):
event = _make_espn_event(state="in", home_score="14", away_score="7")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "KC"
assert result["away_abbr"] == "BUF"
assert result["home_score"] == "14"
assert result["away_score"] == "7"
assert result["is_live"] is True
assert result["is_final"] is False
assert result["is_upcoming"] is False
def test_extract_final_game(self):
event = _make_espn_event(state="post")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_final"] is True
assert result["is_live"] is False
def test_extract_upcoming_game(self):
event = _make_espn_event(state="pre")
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_upcoming"] is True
def test_sport_specific_fields_default_when_pregame(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "down" in fields
assert "distance" in fields
assert "possession" in fields
assert "is_redzone" in fields
assert fields["is_redzone"] is False
def test_sport_specific_fields_live_with_situation(self):
situation = {
"down": 3,
"distance": 7,
"possession": "KC",
"isRedZone": True,
"homeTimeouts": 2,
"awayTimeouts": 1,
}
event = _make_espn_event(state="in", include_situation=True, situation=situation)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["down"] == 3
assert fields["distance"] == 7
assert fields["is_redzone"] is True
assert fields["home_timeouts"] == 2
assert fields["away_timeouts"] == 1
def test_scoring_event_detected(self):
# situation must be non-empty (truthy) for the live block to execute
situation = {"down": 1, "distance": 10}
event = _make_espn_event(
state="in",
include_situation=True,
situation=situation,
status_detail="touchdown scored",
)
fields = self.extractor.get_sport_specific_fields(event)
assert "touchdown" in fields.get("scoring_event", "").lower()
def test_returns_none_on_empty_event(self):
assert self.extractor.extract_game_details({}) is None
def test_returns_none_when_teams_missing(self):
event = {
"id": "x",
"date": "2024-01-15T20:00:00Z",
"competitions": [
{
"status": {"type": {"state": "in", "shortDetail": "", "detail": "", "name": ""}},
"competitors": [], # no competitors
}
],
}
assert self.extractor.extract_game_details(event) is None
def test_date_z_suffix_parsed(self):
event = _make_espn_event(date_str="2024-01-15T20:00:00Z")
result = self.extractor.extract_game_details(event)
# Should not raise and should return a result
assert result is not None
def test_id_propagated(self):
event = _make_espn_event()
result = self.extractor.extract_game_details(event)
assert result["id"] == "test-game-1"
# ---------------------------------------------------------------------------
# ESPNBaseballExtractor
# ---------------------------------------------------------------------------
class TestESPNBaseballExtractor:
def setup_method(self):
self.extractor = ESPNBaseballExtractor(_make_logger())
def test_extract_live_game(self):
event = _make_espn_event(
state="in", home_abbr="NYY", away_abbr="BOS",
home_score="3", away_score="2"
)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "NYY"
assert result["is_live"] is True
def test_baseball_sport_fields_defaults(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "inning" in fields
assert "outs" in fields
assert "bases" in fields
assert "strikes" in fields
assert "balls" in fields
def test_baseball_sport_fields_live(self):
situation = {
"inning": 7,
"outs": 2,
"bases": "110",
"strikes": 2,
"balls": 3,
"pitcher": "Smith",
"batter": "Jones",
}
event = _make_espn_event(state="in", include_situation=True, situation=situation)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["inning"] == 7
assert fields["outs"] == 2
assert fields["strikes"] == 2
assert fields["pitcher"] == "Smith"
def test_returns_none_on_empty(self):
assert self.extractor.extract_game_details({}) is None
# ---------------------------------------------------------------------------
# ESPNHockeyExtractor
# ---------------------------------------------------------------------------
class TestESPNHockeyExtractor:
def setup_method(self):
self.extractor = ESPNHockeyExtractor(_make_logger())
def test_extract_live_game(self):
event = _make_espn_event(
state="in", home_abbr="BOS", away_abbr="TOR",
home_score="2", away_score="1"
)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["is_live"] is True
def test_hockey_period_text_p1(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=1
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P1"
def test_hockey_period_text_p2(self):
situation = {"isPowerPlay": False} # non-empty so the live block executes
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=2
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P2"
def test_hockey_period_text_p3(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=3
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "P3"
def test_hockey_period_text_ot(self):
situation = {"isPowerPlay": False}
event = _make_espn_event(
state="in", include_situation=True, situation=situation, period=4
)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["period_text"] == "OT1"
def test_hockey_power_play(self):
situation = {"isPowerPlay": True, "homeShots": 12, "awayShots": 8}
event = _make_espn_event(state="in", include_situation=True, situation=situation, period=2)
fields = self.extractor.get_sport_specific_fields(event)
assert fields["power_play"] is True
assert fields["shots_on_goal"]["home"] == 12
assert fields["shots_on_goal"]["away"] == 8
def test_hockey_fields_defaults_pregame(self):
event = _make_espn_event(state="pre")
fields = self.extractor.get_sport_specific_fields(event)
assert "period" in fields
assert "power_play" in fields
assert fields["power_play"] is False
def test_returns_none_on_empty(self):
assert self.extractor.extract_game_details({}) is None
# ---------------------------------------------------------------------------
# SoccerAPIExtractor
# ---------------------------------------------------------------------------
class TestSoccerAPIExtractor:
def setup_method(self):
self.extractor = SoccerAPIExtractor(_make_logger())
def _make_soccer_event(self, is_live: bool = True) -> dict:
return {
"id": "soccer-1",
"home_team": {"abbreviation": "ARS", "name": "Arsenal"},
"away_team": {"abbreviation": "CHE", "name": "Chelsea"},
"home_score": "2",
"away_score": "1",
"status": "LIVE",
"is_live": is_live,
"is_final": not is_live,
"is_upcoming": False,
"half": "1",
"stoppage_time": "2",
"home_yellow_cards": 1,
"away_yellow_cards": 2,
"home_red_cards": 0,
"away_red_cards": 0,
"home_possession": 55,
"away_possession": 45,
}
def test_extract_live_game(self):
event = self._make_soccer_event(is_live=True)
result = self.extractor.extract_game_details(event)
assert result is not None
assert result["home_abbr"] == "ARS"
assert result["away_abbr"] == "CHE"
assert result["is_live"] is True
def test_sport_specific_cards(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["cards"]["home_yellow"] == 1
assert fields["cards"]["away_yellow"] == 2
assert fields["cards"]["home_red"] == 0
def test_sport_specific_possession(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["possession"]["home"] == 55
assert fields["possession"]["away"] == 45
def test_sport_specific_half(self):
event = self._make_soccer_event()
fields = self.extractor.get_sport_specific_fields(event)
assert fields["half"] == "1"
def test_scores_as_strings(self):
event = self._make_soccer_event()
result = self.extractor.extract_game_details(event)
assert result["home_score"] == "2"
assert result["away_score"] == "1"

View File

@@ -1,299 +0,0 @@
"""
Tests for src/background_data_service.py
Covers BackgroundDataService: submit_fetch_request, get_result,
is_request_complete, get_request_status, cancel_request, get_statistics,
_cleanup_completed_requests, shutdown, and get_background_service singleton.
"""
import time
import pytest
from unittest.mock import MagicMock, patch, Mock
from concurrent.futures import Future
from src.background_data_service import (
BackgroundDataService,
FetchStatus,
FetchResult,
FetchRequest,
get_background_service,
shutdown_background_service,
)
import src.background_data_service as bds_module
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_global_service():
"""Ensure each test starts with no global singleton."""
shutdown_background_service()
yield
shutdown_background_service()
@pytest.fixture
def mock_cache_manager():
m = MagicMock()
m.get.return_value = None
m.set.return_value = None
m.generate_sport_cache_key.return_value = "test_key"
return m
@pytest.fixture
def service(mock_cache_manager):
svc = BackgroundDataService(mock_cache_manager, max_workers=2, request_timeout=5)
yield svc
svc.shutdown(wait=False)
# ---------------------------------------------------------------------------
# Initialisation
# ---------------------------------------------------------------------------
class TestInitialisation:
def test_stats_zeroed(self, service):
stats = service.get_statistics()
assert stats["total_requests"] == 0
assert stats["completed_requests"] == 0
assert stats["failed_requests"] == 0
def test_no_active_requests(self, service):
assert len(service.active_requests) == 0
def test_not_shutdown(self, service):
assert service._shutdown is False
# ---------------------------------------------------------------------------
# Cache hit path
# ---------------------------------------------------------------------------
class TestCacheHit:
def test_cache_hit_returns_request_id(self, service, mock_cache_manager):
mock_cache_manager.get.return_value = {"events": [{"id": "1"}]}
req_id = service.submit_fetch_request(
sport="nfl", year=2024,
url="https://example.com/nfl",
cache_key="nfl_key",
)
assert req_id is not None
# Request should be immediately complete due to cache hit
result = service.get_result(req_id)
assert result is not None
assert result.success is True
assert result.cached is True
def test_cache_hit_increments_stat(self, service, mock_cache_manager):
mock_cache_manager.get.return_value = {"events": []}
service.submit_fetch_request(sport="nba", year=2024, url="https://x.com", cache_key="k")
stats = service.get_statistics()
assert stats["cached_hits"] == 1
# ---------------------------------------------------------------------------
# Actual fetch path (mocked HTTP)
# ---------------------------------------------------------------------------
class TestFetchPath:
def _valid_payload(self) -> dict:
return {"events": [{"id": "g1"}, {"id": "g2"}]}
def test_successful_fetch_completes(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024,
url="https://example.com/nfl",
cache_key="nfl_test",
)
# Wait for the background thread
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
result = service.get_result(req_id)
assert result is not None
assert result.success is True
assert result.data == self._valid_payload()
def test_failed_fetch_records_error(self, service, mock_cache_manager):
with patch.object(service.session, "get", side_effect=Exception("network error")):
req_id = service.submit_fetch_request(
sport="nba", year=2024,
url="https://example.com/nba",
cache_key="nba_test",
max_retries=0,
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
result = service.get_result(req_id)
assert result is not None
assert result.success is False
assert result.error is not None
def test_cache_miss_increments_stat(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="new_key",
)
stats = service.get_statistics()
assert stats["cache_misses"] == 1
def test_callback_called_on_success(self, service, mock_cache_manager):
callback = Mock()
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com",
cache_key="cb_key", callback=callback, max_retries=0,
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
callback.assert_called_once()
call_arg = callback.call_args[0][0]
assert isinstance(call_arg, FetchResult)
def test_data_cached_after_successful_fetch(self, service, mock_cache_manager):
mock_resp = Mock()
mock_resp.json.return_value = self._valid_payload()
mock_resp.raise_for_status.return_value = None
with patch.object(service.session, "get", return_value=mock_resp):
req_id = service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="cache_after_key",
)
deadline = time.time() + 5
while not service.is_request_complete(req_id) and time.time() < deadline:
time.sleep(0.05)
mock_cache_manager.set.assert_called()
# ---------------------------------------------------------------------------
# Request status / cancel
# ---------------------------------------------------------------------------
class TestRequestStatusAndCancel:
def test_unknown_request_status_is_none(self, service):
assert service.get_request_status("nonexistent") is None
def test_cancel_active_request(self, service, mock_cache_manager):
# Manually insert an active request
req = FetchRequest(
id="r1", sport="nfl", year=2024,
cache_key="k", url="https://x.com",
)
req.status = FetchStatus.PENDING
service.active_requests["r1"] = req
result = service.cancel_request("r1")
assert result is True
assert "r1" not in service.active_requests
def test_cancel_nonexistent_request(self, service):
assert service.cancel_request("does-not-exist") is False
def test_is_request_complete_false_for_active(self, service, mock_cache_manager):
req = FetchRequest(
id="r2", sport="mlb", year=2024,
cache_key="k2", url="https://x.com",
)
service.active_requests["r2"] = req
assert service.is_request_complete("r2") is False
def test_is_request_complete_true_for_done(self, service):
result = FetchResult(request_id="r3", success=True)
service.completed_requests["r3"] = result
assert service.is_request_complete("r3") is True
def test_get_result_returns_none_for_unknown(self, service):
assert service.get_result("unknown") is None
# ---------------------------------------------------------------------------
# Shutdown
# ---------------------------------------------------------------------------
class TestShutdown:
def test_shutdown_sets_flag(self, service):
service.shutdown(wait=False)
assert service._shutdown is True
def test_submit_after_shutdown_raises(self, service, mock_cache_manager):
service.shutdown(wait=False)
with pytest.raises(RuntimeError, match="shutting down"):
service.submit_fetch_request(
sport="nfl", year=2024, url="https://x.com", cache_key="k"
)
# ---------------------------------------------------------------------------
# Cleanup
# ---------------------------------------------------------------------------
class TestCleanup:
def test_cleanup_removes_old_requests(self, service):
old_result = FetchResult(request_id="old", success=True)
old_result.completed_at = time.time() - 7200 # 2 hours ago
service.completed_requests["old"] = old_result
service._last_completed_requests_cleanup = 0 # force cleanup
removed = service._cleanup_completed_requests(force=True)
assert removed >= 1
assert "old" not in service.completed_requests
def test_cleanup_respects_interval(self, service):
old_result = FetchResult(request_id="r", success=True)
old_result.completed_at = time.time() - 7200
service.completed_requests["r"] = old_result
# Cleanup interval not passed, should skip
service._last_completed_requests_cleanup = time.time()
removed = service._cleanup_completed_requests(force=False)
assert removed == 0
def test_size_limit_enforcement(self, service):
service._max_completed_requests = 3
for i in range(5):
result = FetchResult(request_id=str(i), success=True)
result.completed_at = time.time() - (5 - i) * 100 # oldest first
service.completed_requests[str(i)] = result
service._last_completed_requests_cleanup = 0
service._cleanup_completed_requests(force=True)
assert len(service.completed_requests) <= 3
# ---------------------------------------------------------------------------
# Singleton get_background_service
# ---------------------------------------------------------------------------
class TestGetBackgroundService:
def test_first_call_requires_cache_manager(self):
with pytest.raises(ValueError, match="cache_manager is required"):
get_background_service()
def test_creates_singleton(self, mock_cache_manager):
svc1 = get_background_service(mock_cache_manager)
svc2 = get_background_service()
assert svc1 is svc2
def test_shutdown_clears_singleton(self, mock_cache_manager):
get_background_service(mock_cache_manager)
shutdown_background_service()
with pytest.raises(ValueError):
get_background_service()

View File

@@ -1,209 +0,0 @@
"""
Tests for src/base_classes/data_sources.py
Covers ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource.
All HTTP calls are mocked to avoid network access.
"""
import logging
from datetime import datetime, date
from unittest.mock import MagicMock, patch, Mock
import pytest
import requests
from src.base_classes.data_sources import ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource
def _make_logger() -> logging.Logger:
return logging.getLogger("test_data_sources")
def _mock_response(json_data: dict, status_code: int = 200):
resp = Mock(spec=requests.Response)
resp.status_code = status_code
resp.json.return_value = json_data
resp.raise_for_status = Mock()
if status_code >= 400:
resp.raise_for_status.side_effect = requests.HTTPError(response=resp)
return resp
# ---------------------------------------------------------------------------
# ESPNDataSource
# ---------------------------------------------------------------------------
class TestESPNDataSource:
def setup_method(self):
self.source = ESPNDataSource(_make_logger())
def test_get_headers(self):
headers = self.source.get_headers()
assert headers["Accept"] == "application/json"
assert "LEDMatrix" in headers["User-Agent"]
def test_fetch_live_games_returns_live_events(self):
live_event = {
"competitions": [{"status": {"type": {"state": "in"}}}]
}
non_live_event = {
"competitions": [{"status": {"type": {"state": "pre"}}}]
}
payload = {"events": [live_event, non_live_event]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("football", "nfl")
assert len(result) == 1
assert result[0] is live_event
def test_fetch_live_games_empty_when_none_live(self):
payload = {"events": [
{"competitions": [{"status": {"type": {"state": "post"}}}]}
]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("football", "nfl")
assert result == []
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("network failure")):
result = self.source.fetch_live_games("football", "nfl")
assert result == []
def test_fetch_schedule_returns_all_events(self):
events = [{"id": "1"}, {"id": "2"}]
payload = {"events": events}
start = datetime(2024, 1, 1)
end = datetime(2024, 1, 7)
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("football", "nfl", (start, end))
assert len(result) == 2
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("timeout")):
result = self.source.fetch_schedule("football", "nfl", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"standings": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("football", "nfl")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("error")):
result = self.source.fetch_standings("football", "nfl")
assert result == {}
def test_base_url_set_correctly(self):
assert "espn.com" in self.source.base_url
# ---------------------------------------------------------------------------
# MLBAPIDataSource
# ---------------------------------------------------------------------------
class TestMLBAPIDataSource:
def setup_method(self):
self.source = MLBAPIDataSource(_make_logger())
def test_fetch_live_games_filters_live(self):
live_game = {"status": {"abstractGameState": "Live"}}
final_game = {"status": {"abstractGameState": "Final"}}
payload = {"dates": [{"games": [live_game, final_game]}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("baseball", "mlb")
assert len(result) == 1
assert result[0] is live_game
def test_fetch_live_games_empty_dates(self):
payload = {"dates": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("baseball", "mlb")
assert result == []
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_live_games("baseball", "mlb")
assert result == []
def test_fetch_schedule_aggregates_all_dates(self):
payload = {
"dates": [
{"games": [{"id": "1"}, {"id": "2"}]},
{"games": [{"id": "3"}]},
]
}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
assert len(result) == 3
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"records": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("baseball", "mlb")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_standings("baseball", "mlb")
assert result == {}
# ---------------------------------------------------------------------------
# SoccerAPIDataSource
# ---------------------------------------------------------------------------
class TestSoccerAPIDataSource:
def setup_method(self):
self.source = SoccerAPIDataSource(_make_logger(), api_key="test-key-123")
def test_headers_include_api_key(self):
headers = self.source.get_headers()
assert headers["X-Auth-Token"] == "test-key-123"
def test_headers_without_api_key(self):
source = SoccerAPIDataSource(_make_logger())
headers = source.get_headers()
assert "X-Auth-Token" not in headers
def test_fetch_live_games_success(self):
payload = {"matches": [{"id": "m1"}, {"id": "m2"}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_live_games("soccer", "eng.1")
assert len(result) == 2
def test_fetch_live_games_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_live_games("soccer", "eng.1")
assert result == []
def test_fetch_schedule_success(self):
payload = {"matches": [{"id": "m1"}]}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
assert len(result) == 1
def test_fetch_schedule_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
assert result == []
def test_fetch_standings_success(self):
payload = {"standings": []}
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
result = self.source.fetch_standings("soccer", "PL")
assert result == payload
def test_fetch_standings_returns_empty_on_error(self):
with patch.object(self.source.session, "get", side_effect=Exception("err")):
result = self.source.fetch_standings("soccer", "PL")
assert result == {}

View File

@@ -1,317 +0,0 @@
"""
Tests for src/common/game_helper.py
Covers GameHelper: extract_game_details, filter_*, sort_games_by_time,
process_games, get_game_summary, and all private helpers.
"""
import logging
import pytest
from datetime import datetime, timezone, timedelta
from src.common.game_helper import GameHelper
def _make_logger() -> logging.Logger:
return logging.getLogger("test_game_helper")
def _make_espn_event(
state: str = "in",
home_abbr: str = "LAL",
away_abbr: str = "BOS",
home_score: str = "105",
away_score: str = "98",
date_str: str = "2024-01-15T20:00:00Z",
period: int = 4,
status_name: str = "STATUS_IN_PROGRESS",
home_record: str = "30-10",
away_record: str = "25-15",
event_id: str = "game-1",
) -> dict:
return {
"id": event_id,
"date": date_str,
"competitions": [
{
"status": {
"type": {
"state": state,
"shortDetail": "Q4 2:30",
"name": status_name,
},
"period": period,
"displayClock": "2:30",
},
"competitors": [
{
"homeAway": "home",
"id": "h1",
"team": {"abbreviation": home_abbr, "displayName": f"{home_abbr} Team"},
"score": home_score,
"records": [{"summary": home_record}],
},
{
"homeAway": "away",
"id": "a1",
"team": {"abbreviation": away_abbr, "displayName": f"{away_abbr} Team"},
"score": away_score,
"records": [{"summary": away_record}],
},
],
}
],
}
@pytest.fixture
def helper():
return GameHelper(timezone_str="UTC", logger=_make_logger())
# ---------------------------------------------------------------------------
# extract_game_details
# ---------------------------------------------------------------------------
class TestExtractGameDetails:
def test_live_game(self, helper):
event = _make_espn_event(state="in")
result = helper.extract_game_details(event)
assert result is not None
assert result["is_live"] is True
assert result["is_final"] is False
assert result["is_upcoming"] is False
def test_final_game(self, helper):
event = _make_espn_event(state="post")
result = helper.extract_game_details(event)
assert result["is_final"] is True
def test_upcoming_game(self, helper):
event = _make_espn_event(state="pre")
result = helper.extract_game_details(event)
assert result["is_upcoming"] is True
def test_halftime_detection(self, helper):
event = _make_espn_event(state="halftime", status_name="STATUS_HALFTIME")
result = helper.extract_game_details(event)
assert result["is_halftime"] is True
def test_basic_fields_present(self, helper):
event = _make_espn_event()
result = helper.extract_game_details(event)
for key in ("id", "home_abbr", "away_abbr", "home_score", "away_score",
"home_record", "away_record", "start_time_utc"):
assert key in result
def test_team_abbreviations(self, helper):
event = _make_espn_event(home_abbr="MIA", away_abbr="PHX")
result = helper.extract_game_details(event)
assert result["home_abbr"] == "MIA"
assert result["away_abbr"] == "PHX"
def test_scores_as_strings(self, helper):
event = _make_espn_event(home_score="110", away_score="99")
result = helper.extract_game_details(event)
assert result["home_score"] == "110"
assert result["away_score"] == "99"
def test_returns_none_on_empty(self, helper):
assert helper.extract_game_details({}) is None
assert helper.extract_game_details(None) is None
def test_returns_none_when_no_competitors(self, helper):
event = _make_espn_event()
event["competitions"][0]["competitors"] = []
assert helper.extract_game_details(event) is None
def test_date_z_suffix_parsed(self, helper):
event = _make_espn_event(date_str="2024-06-01T19:30:00Z")
result = helper.extract_game_details(event)
assert result["start_time_utc"] is not None
assert result["start_time_utc"].tzinfo is not None
def test_zero_zero_record_suppressed(self, helper):
event = _make_espn_event(home_record="0-0", away_record="0-0-0")
result = helper.extract_game_details(event)
assert result["home_record"] == ""
assert result["away_record"] == ""
def test_basketball_sport_fields(self, helper):
event = _make_espn_event(period=3)
result = helper.extract_game_details(event, sport="basketball")
assert result["period_text"] == "Q3"
assert "clock" in result
def test_basketball_overtime_period(self, helper):
event = _make_espn_event(period=5)
result = helper.extract_game_details(event, sport="basketball")
assert result["period_text"] == "OT1"
def test_football_sport_fields(self, helper):
event = _make_espn_event(period=2)
result = helper.extract_game_details(event, sport="football")
assert result["period_text"] == "Q2"
def test_hockey_sport_fields_period_1(self, helper):
event = _make_espn_event(period=1)
result = helper.extract_game_details(event, sport="hockey")
assert result["period_text"] == "P1"
def test_hockey_sport_fields_ot(self, helper):
event = _make_espn_event(period=4)
result = helper.extract_game_details(event, sport="hockey")
assert result["period_text"] == "OT1"
def test_baseball_sport_fields(self, helper):
event = _make_espn_event(period=7)
result = helper.extract_game_details(event, sport="baseball")
assert result["period_text"] == "INN 7"
# ---------------------------------------------------------------------------
# Filter methods
# ---------------------------------------------------------------------------
class TestFilterMethods:
def _make_games(self):
now = datetime.now(timezone.utc)
return [
{"is_live": True, "is_final": False, "is_upcoming": False, "home_abbr": "LAL", "away_abbr": "BOS", "start_time_utc": now},
{"is_live": False, "is_final": True, "is_upcoming": False, "home_abbr": "MIA", "away_abbr": "PHX", "start_time_utc": now - timedelta(hours=3)},
{"is_live": False, "is_final": False, "is_upcoming": True, "home_abbr": "DAL", "away_abbr": "CHI", "start_time_utc": now + timedelta(hours=2)},
]
def test_filter_live_games(self, helper):
games = self._make_games()
result = helper.filter_live_games(games)
assert len(result) == 1
assert result[0]["home_abbr"] == "LAL"
def test_filter_final_games(self, helper):
games = self._make_games()
result = helper.filter_final_games(games)
assert len(result) == 1
assert result[0]["home_abbr"] == "MIA"
def test_filter_upcoming_games(self, helper):
games = self._make_games()
result = helper.filter_upcoming_games(games)
assert len(result) == 1
assert result[0]["home_abbr"] == "DAL"
def test_filter_favorite_teams_match(self, helper):
games = self._make_games()
result = helper.filter_favorite_teams(games, ["LAL"])
assert len(result) == 1
assert result[0]["home_abbr"] == "LAL"
def test_filter_favorite_teams_empty_list_returns_all(self, helper):
games = self._make_games()
result = helper.filter_favorite_teams(games, [])
assert len(result) == 3
def test_filter_favorite_teams_away_match(self, helper):
games = self._make_games()
result = helper.filter_favorite_teams(games, ["BOS"])
assert len(result) == 1
def test_filter_recent_games_within_window(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now - timedelta(days=2), "is_final": True},
{"start_time_utc": now - timedelta(days=10), "is_final": True},
]
result = helper.filter_recent_games(games, days_back=7)
assert len(result) == 1
def test_filter_recent_games_all_within(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now - timedelta(days=1)},
{"start_time_utc": now - timedelta(days=3)},
]
result = helper.filter_recent_games(games, days_back=7)
assert len(result) == 2
def test_sort_games_ascending(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now + timedelta(hours=2), "id": "late"},
{"start_time_utc": now + timedelta(hours=1), "id": "early"},
]
result = helper.sort_games_by_time(games)
assert result[0]["id"] == "early"
def test_sort_games_descending(self, helper):
now = datetime.now(timezone.utc)
games = [
{"start_time_utc": now + timedelta(hours=1), "id": "early"},
{"start_time_utc": now + timedelta(hours=2), "id": "late"},
]
result = helper.sort_games_by_time(games, reverse=True)
assert result[0]["id"] == "late"
# ---------------------------------------------------------------------------
# process_games
# ---------------------------------------------------------------------------
class TestProcessGames:
def test_processes_valid_events(self, helper):
events = [
_make_espn_event(event_id="1"),
_make_espn_event(event_id="2"),
]
result = helper.process_games(events)
assert len(result) == 2
def test_skips_invalid_events(self, helper):
events = [
_make_espn_event(event_id="1"),
{}, # invalid
]
result = helper.process_games(events)
assert len(result) == 1
def test_empty_events(self, helper):
assert helper.process_games([]) == []
# ---------------------------------------------------------------------------
# get_game_summary
# ---------------------------------------------------------------------------
class TestGetGameSummary:
def test_live_summary(self, helper):
game = {
"home_abbr": "LAL", "away_abbr": "BOS",
"home_score": "105", "away_score": "98",
"status_text": "Q4 2:30",
"is_live": True, "is_final": False,
}
summary = helper.get_game_summary(game)
assert "BOS" in summary
assert "LAL" in summary
assert "98" in summary
assert "105" in summary
def test_final_summary(self, helper):
game = {
"home_abbr": "LAL", "away_abbr": "BOS",
"home_score": "110", "away_score": "102",
"status_text": "Final",
"is_live": False, "is_final": True,
}
summary = helper.get_game_summary(game)
assert "Final" in summary
def test_upcoming_summary(self, helper):
game = {
"home_abbr": "LAL", "away_abbr": "BOS",
"home_score": "0", "away_score": "0",
"status_text": "7:30 PM",
"is_live": False, "is_final": False,
}
summary = helper.get_game_summary(game)
assert "7:30 PM" in summary

View File

@@ -1,307 +0,0 @@
"""
Tests for src/plugin_system/health_monitor.py
Covers PluginHealthMonitor: get_plugin_health_status, get_plugin_health_metrics,
get_all_plugin_health, _get_recovery_suggestions, start/stop_monitoring,
register_health_check.
"""
import pytest
from unittest.mock import MagicMock, patch
from datetime import datetime
from src.plugin_system.health_monitor import (
PluginHealthMonitor,
HealthStatus,
HealthMetrics,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_health_tracker(
summary: dict | None = None,
all_summaries: dict | None = None,
):
"""Return a mock PluginHealthTracker."""
tracker = MagicMock()
tracker.get_health_summary.return_value = summary
tracker.get_all_health_summaries.return_value = all_summaries or {}
return tracker
def _healthy_summary() -> dict:
return {
"success_rate": 100.0,
"circuit_state": "closed",
"consecutive_failures": 0,
"total_failures": 0,
"total_successes": 50,
"last_success_time": datetime.now().isoformat(),
"last_error": None,
}
def _degraded_summary() -> dict:
return {
"success_rate": 40.0, # 60% error rate
"circuit_state": "closed",
"consecutive_failures": 3,
"total_failures": 6,
"total_successes": 4,
"last_success_time": None,
"last_error": "timeout occurred",
}
def _unhealthy_summary() -> dict:
return {
"success_rate": 10.0, # 90% error rate
"circuit_state": "open",
"consecutive_failures": 10,
"total_failures": 9,
"total_successes": 1,
"last_success_time": None,
"last_error": "ImportError: missing module",
}
@pytest.fixture
def monitor():
tracker = _make_health_tracker(_healthy_summary())
return PluginHealthMonitor(health_tracker=tracker)
# ---------------------------------------------------------------------------
# get_plugin_health_status
# ---------------------------------------------------------------------------
class TestGetPluginHealthStatus:
def test_healthy_status(self):
tracker = _make_health_tracker(_healthy_summary())
monitor = PluginHealthMonitor(tracker)
status = monitor.get_plugin_health_status("plugin_a")
assert status == HealthStatus.HEALTHY
def test_degraded_status(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
status = monitor.get_plugin_health_status("plugin_b")
assert status == HealthStatus.DEGRADED
def test_unhealthy_status(self):
tracker = _make_health_tracker(_unhealthy_summary())
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
status = monitor.get_plugin_health_status("plugin_c")
assert status == HealthStatus.UNHEALTHY
def test_open_circuit_breaker_is_unhealthy(self):
summary = _healthy_summary()
summary["circuit_state"] = "open"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker)
status = monitor.get_plugin_health_status("plugin_d")
assert status == HealthStatus.UNHEALTHY
def test_unknown_when_no_tracker(self):
monitor = PluginHealthMonitor(health_tracker=None)
status = monitor.get_plugin_health_status("plugin_e")
assert status == HealthStatus.UNKNOWN
def test_unknown_when_no_summary(self):
tracker = _make_health_tracker(None)
monitor = PluginHealthMonitor(tracker)
status = monitor.get_plugin_health_status("plugin_f")
assert status == HealthStatus.UNKNOWN
# ---------------------------------------------------------------------------
# get_plugin_health_metrics
# ---------------------------------------------------------------------------
class TestGetPluginHealthMetrics:
def test_healthy_metrics(self):
tracker = _make_health_tracker(_healthy_summary())
monitor = PluginHealthMonitor(tracker)
metrics = monitor.get_plugin_health_metrics("plugin_a")
assert isinstance(metrics, HealthMetrics)
assert metrics.status == HealthStatus.HEALTHY
assert metrics.success_rate == pytest.approx(1.0)
assert metrics.error_rate == pytest.approx(0.0)
def test_degraded_metrics(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
metrics = monitor.get_plugin_health_metrics("plugin_b")
assert metrics.status == HealthStatus.DEGRADED
assert metrics.consecutive_failures == 3
def test_unhealthy_metrics(self):
tracker = _make_health_tracker(_unhealthy_summary())
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
metrics = monitor.get_plugin_health_metrics("plugin_c")
assert metrics.status == HealthStatus.UNHEALTHY
assert metrics.circuit_breaker_state == "open"
assert metrics.last_error is not None
def test_metrics_without_tracker(self):
monitor = PluginHealthMonitor(health_tracker=None)
metrics = monitor.get_plugin_health_metrics("plugin_d")
assert metrics.status == HealthStatus.UNKNOWN
assert metrics.plugin_id == "plugin_d"
def test_metrics_without_summary(self):
tracker = _make_health_tracker(None)
monitor = PluginHealthMonitor(tracker)
metrics = monitor.get_plugin_health_metrics("plugin_e")
assert metrics.status == HealthStatus.UNKNOWN
def test_last_successful_update_parsed(self):
summary = _healthy_summary()
summary["last_success_time"] = "2024-06-01T12:00:00"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker)
metrics = monitor.get_plugin_health_metrics("plugin_a")
assert metrics.last_successful_update is not None
assert isinstance(metrics.last_successful_update, datetime)
def test_invalid_last_success_time_handled(self):
summary = _healthy_summary()
summary["last_success_time"] = "not-a-date"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker)
# Should not raise
metrics = monitor.get_plugin_health_metrics("plugin_a")
assert metrics.last_successful_update is None
def test_total_successes_failures(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
metrics = monitor.get_plugin_health_metrics("plugin_b")
assert metrics.total_failures == 6
assert metrics.total_successes == 4
# ---------------------------------------------------------------------------
# get_all_plugin_health
# ---------------------------------------------------------------------------
class TestGetAllPluginHealth:
def test_returns_empty_without_tracker(self):
monitor = PluginHealthMonitor(health_tracker=None)
result = monitor.get_all_plugin_health()
assert result == {}
def test_returns_metrics_for_each_plugin(self):
all_summaries = {
"plugin_a": _healthy_summary(),
"plugin_b": _degraded_summary(),
}
tracker = MagicMock()
tracker.get_all_health_summaries.return_value = all_summaries
tracker.get_health_summary.side_effect = lambda pid: all_summaries.get(pid)
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
result = monitor.get_all_plugin_health()
assert "plugin_a" in result
assert "plugin_b" in result
assert isinstance(result["plugin_a"], HealthMetrics)
def test_returns_empty_when_no_summaries(self):
tracker = _make_health_tracker(all_summaries={})
monitor = PluginHealthMonitor(tracker)
result = monitor.get_all_plugin_health()
assert result == {}
# ---------------------------------------------------------------------------
# _get_recovery_suggestions
# ---------------------------------------------------------------------------
class TestGetRecoverySuggestions:
def test_healthy_plugin_suggestion(self):
tracker = _make_health_tracker(_healthy_summary())
monitor = PluginHealthMonitor(tracker)
suggestions = monitor._get_recovery_suggestions("p", _healthy_summary(), HealthStatus.HEALTHY)
assert any("healthy" in s.lower() for s in suggestions)
def test_unhealthy_suggestions(self):
tracker = _make_health_tracker(_unhealthy_summary())
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", _unhealthy_summary(), HealthStatus.UNHEALTHY)
assert len(suggestions) > 0
assert any("unhealthy" in s.lower() for s in suggestions)
def test_open_circuit_breaker_suggestion(self):
summary = _unhealthy_summary()
summary["circuit_state"] = "open"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
assert any("circuit" in s.lower() for s in suggestions)
def test_timeout_error_suggestion(self):
summary = _degraded_summary()
summary["last_error"] = "connection timeout occurred"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.DEGRADED)
assert any("timeout" in s.lower() for s in suggestions)
def test_import_error_suggestion(self):
summary = _unhealthy_summary()
summary["last_error"] = "ImportError: missing module"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
assert any("dependencies" in s.lower() or "import" in s.lower() or "missing" in s.lower()
for s in suggestions)
def test_permission_error_suggestion(self):
summary = _unhealthy_summary()
summary["last_error"] = "permission denied to access resource"
tracker = _make_health_tracker(summary)
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
assert any("permission" in s.lower() for s in suggestions)
def test_degraded_suggestions_include_error_rate(self):
tracker = _make_health_tracker(_degraded_summary())
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
suggestions = monitor._get_recovery_suggestions("p", _degraded_summary(), HealthStatus.DEGRADED)
assert any("%" in s for s in suggestions)
# ---------------------------------------------------------------------------
# start / stop monitoring
# ---------------------------------------------------------------------------
class TestMonitorLifecycle:
def test_start_monitoring(self, monitor):
monitor.start_monitoring()
try:
assert monitor._monitor_thread is not None
assert monitor._monitor_thread.is_alive()
finally:
monitor.stop_monitoring()
def test_stop_monitoring(self, monitor):
monitor.start_monitoring()
monitor.stop_monitoring()
# Thread should no longer be alive
assert not monitor._monitor_thread.is_alive()
def test_double_start_no_duplicate_threads(self, monitor):
monitor.start_monitoring()
try:
thread1 = monitor._monitor_thread
monitor.start_monitoring() # should be idempotent
assert monitor._monitor_thread is thread1
finally:
monitor.stop_monitoring()
def test_register_health_check(self, monitor):
callback = MagicMock()
monitor.register_health_check(callback)
assert callback in monitor._health_check_callbacks

View File

@@ -1,129 +0,0 @@
"""
Tests for src/logo_downloader.py
Focuses on the pure/static methods that don't require network calls:
normalize_abbreviation, get_logo_filename_variations, get_logo_directory,
ensure_logo_directory, and the download_missing_logo function path
(with HTTP mocked).
"""
import os
import pytest
from pathlib import Path
from unittest.mock import patch, Mock, MagicMock
from src.logo_downloader import LogoDownloader
# ---------------------------------------------------------------------------
# normalize_abbreviation
# ---------------------------------------------------------------------------
class TestNormalizeAbbreviation:
def test_basic_lowercase(self):
result = LogoDownloader.normalize_abbreviation("lal")
assert result == "LAL"
def test_uppercases(self):
result = LogoDownloader.normalize_abbreviation("bos")
assert result == "BOS"
def test_ampersand_replaced(self):
result = LogoDownloader.normalize_abbreviation("TA&M")
assert "&" not in result
assert "AND" in result
def test_forward_slash_replaced(self):
result = LogoDownloader.normalize_abbreviation("A/B")
assert "/" not in result
def test_empty_returns_empty(self):
result = LogoDownloader.normalize_abbreviation("")
assert result == ""
# ---------------------------------------------------------------------------
# get_logo_filename_variations
# ---------------------------------------------------------------------------
class TestGetLogoFilenameVariations:
def test_returns_list(self):
result = LogoDownloader.get_logo_filename_variations("LAL")
assert isinstance(result, list)
assert len(result) > 0
def test_includes_png(self):
result = LogoDownloader.get_logo_filename_variations("KC")
filenames = " ".join(result)
assert ".png" in filenames
def test_includes_original(self):
result = LogoDownloader.get_logo_filename_variations("LAL")
assert any("LAL" in f for f in result)
def test_ampersand_variation(self):
result = LogoDownloader.get_logo_filename_variations("TA&M")
# Should produce at least the normalized version
assert len(result) > 0
def test_empty_string_no_crash(self):
result = LogoDownloader.get_logo_filename_variations("")
assert isinstance(result, list)
# ---------------------------------------------------------------------------
# get_logo_directory
# ---------------------------------------------------------------------------
class TestGetLogoDirectory:
def test_known_sport_returns_string(self):
downloader = LogoDownloader()
result = downloader.get_logo_directory("nfl")
assert isinstance(result, str)
assert len(result) > 0
def test_known_sport_nba(self):
downloader = LogoDownloader()
result = downloader.get_logo_directory("nba")
assert "nba" in result.lower() or "sports" in result.lower()
def test_unknown_sport_returns_string(self):
downloader = LogoDownloader()
result = downloader.get_logo_directory("unknown_sport_xyz")
assert isinstance(result, str)
# ---------------------------------------------------------------------------
# ensure_logo_directory
# ---------------------------------------------------------------------------
class TestEnsureLogoDirectory:
def test_creates_writable_directory(self, tmp_path):
downloader = LogoDownloader()
test_dir = str(tmp_path / "logos" / "nfl")
result = downloader.ensure_logo_directory(test_dir)
assert result is True
assert Path(test_dir).is_dir()
def test_existing_writable_directory(self, tmp_path):
downloader = LogoDownloader()
test_dir = str(tmp_path)
result = downloader.ensure_logo_directory(test_dir)
assert result is True
def test_returns_false_when_write_test_fails(self, tmp_path):
"""Simulate a directory that exists but raises PermissionError on write."""
downloader = LogoDownloader()
test_dir = str(tmp_path / "logos")
import builtins
original_open = builtins.open
def mock_open(path, *args, **kwargs):
if ".write_test" in str(path):
raise PermissionError("no write access")
return original_open(path, *args, **kwargs)
with patch("builtins.open", side_effect=mock_open):
result = downloader.ensure_logo_directory(test_dir)
assert result is False

View File

@@ -1,317 +0,0 @@
"""
Tests for src/common/scroll_helper.py
Covers ScrollHelper: create_scrolling_image, update_scroll_position,
get_visible_portion, calculate_dynamic_duration, set_* methods,
reset_scroll, clear_cache, get_scroll_info.
"""
import pytest
import time
from unittest.mock import patch
from PIL import Image
from src.common.scroll_helper import ScrollHelper
DISPLAY_W = 64
DISPLAY_H = 32
@pytest.fixture
def helper():
return ScrollHelper(display_width=DISPLAY_W, display_height=DISPLAY_H)
def _make_image(width: int = 64, height: int = 32, color=(255, 0, 0)) -> Image.Image:
img = Image.new("RGB", (width, height), color)
return img
# ---------------------------------------------------------------------------
# __init__ / initial state
# ---------------------------------------------------------------------------
class TestScrollHelperInit:
def test_initial_scroll_position(self, helper):
assert helper.scroll_position == 0.0
def test_initial_scroll_complete_false(self, helper):
assert helper.scroll_complete is False
def test_display_dimensions(self, helper):
assert helper.display_width == DISPLAY_W
assert helper.display_height == DISPLAY_H
# ---------------------------------------------------------------------------
# create_scrolling_image
# ---------------------------------------------------------------------------
class TestCreateScrollingImage:
def test_empty_content_returns_blank_image(self, helper):
result = helper.create_scrolling_image([])
assert isinstance(result, Image.Image)
assert helper.total_scroll_width == 0
def test_single_item_creates_image(self, helper):
img = _make_image(width=100)
result = helper.create_scrolling_image([img])
assert isinstance(result, Image.Image)
assert result.width > DISPLAY_W # includes leading gap
def test_multiple_items_wider_image(self, helper):
items = [_make_image(width=50), _make_image(width=50)]
result = helper.create_scrolling_image(items)
# Should be wider than two items alone
assert result.width > 100
def test_scroll_position_reset(self, helper):
helper.scroll_position = 500.0
helper.create_scrolling_image([_make_image()])
assert helper.scroll_position == 0.0
def test_cached_array_set(self, helper):
helper.create_scrolling_image([_make_image()])
assert helper.cached_array is not None
def test_scroll_complete_reset(self, helper):
helper.scroll_complete = True
helper.create_scrolling_image([_make_image()])
assert helper.scroll_complete is False
def test_total_scroll_width_matches_image(self, helper):
img = _make_image(width=200)
result = helper.create_scrolling_image([img])
assert helper.total_scroll_width == result.width
# ---------------------------------------------------------------------------
# set_scrolling_image
# ---------------------------------------------------------------------------
class TestSetScrollingImage:
def test_sets_cached_image(self, helper):
img = _make_image(width=200)
helper.set_scrolling_image(img)
assert helper.cached_image is img
def test_sets_cached_array(self, helper):
img = _make_image(width=200)
helper.set_scrolling_image(img)
assert helper.cached_array is not None
def test_scroll_width_matches_image(self, helper):
img = _make_image(width=300)
helper.set_scrolling_image(img)
assert helper.total_scroll_width == 300
def test_none_clears_cache(self, helper):
helper.set_scrolling_image(_make_image())
helper.set_scrolling_image(None)
assert helper.cached_image is None
# ---------------------------------------------------------------------------
# update_scroll_position (time-based mode)
# ---------------------------------------------------------------------------
class TestUpdateScrollPosition:
def test_position_advances_over_time(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
helper.scroll_speed = 100.0 # 100 px/s
helper.last_update_time = time.time() - 0.1 # pretend 100ms elapsed
initial = helper.scroll_position
helper.update_scroll_position()
assert helper.scroll_position > initial
def test_no_advance_without_image(self, helper):
helper.update_scroll_position() # no image, should not crash
assert helper.scroll_position == 0.0
def test_zero_width_content_stays_zero(self, helper):
helper.create_scrolling_image([]) # empty → width 0
helper.update_scroll_position()
assert helper.scroll_position == 0.0
def test_scroll_complete_clamped(self, helper):
helper.create_scrolling_image([_make_image(width=100)])
# Force position past the end
helper.scroll_position = helper.total_scroll_width + 50
helper.total_distance_scrolled = helper.total_scroll_width + 50
helper.update_scroll_position()
assert helper.scroll_complete is True
assert helper.scroll_position <= helper.total_scroll_width
# ---------------------------------------------------------------------------
# get_visible_portion
# ---------------------------------------------------------------------------
class TestGetVisiblePortion:
def test_returns_none_without_image(self, helper):
assert helper.get_visible_portion() is None
def test_returns_image_sized_to_display(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
visible = helper.get_visible_portion()
assert visible is not None
assert visible.width == DISPLAY_W
assert visible.height == DISPLAY_H
def test_different_positions_give_different_images(self, helper):
helper.create_scrolling_image([_make_image(width=300)])
img1 = helper.get_visible_portion()
helper.scroll_position = 50
img2 = helper.get_visible_portion()
# Images should differ (colour from scrolled content)
# Just verify both are valid PIL images with correct size
assert img1.width == img2.width == DISPLAY_W
# ---------------------------------------------------------------------------
# reset_scroll / clear_cache
# ---------------------------------------------------------------------------
class TestResetAndClear:
def test_reset_restores_position(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
helper.scroll_position = 100.0
helper.reset_scroll()
assert helper.scroll_position == 0.0
def test_reset_clears_complete_flag(self, helper):
helper.scroll_complete = True
helper.reset_scroll()
assert helper.scroll_complete is False
def test_reset_alias(self, helper):
helper.scroll_position = 50.0
helper.reset()
assert helper.scroll_position == 0.0
def test_clear_cache(self, helper):
helper.create_scrolling_image([_make_image()])
helper.clear_cache()
assert helper.cached_image is None
assert helper.cached_array is None
assert helper.total_scroll_width == 0
# ---------------------------------------------------------------------------
# calculate_dynamic_duration
# ---------------------------------------------------------------------------
class TestCalculateDynamicDuration:
def test_returns_min_when_disabled(self, helper):
helper.dynamic_duration_enabled = False
helper.min_duration = 30
result = helper.calculate_dynamic_duration()
assert result == 30
def test_returns_min_when_no_content(self, helper):
helper.total_scroll_width = 0
helper.min_duration = 30
result = helper.calculate_dynamic_duration()
assert result == 30
def test_respects_min_duration(self, helper):
helper.create_scrolling_image([_make_image(width=50)])
helper.min_duration = 60
helper.max_duration = 300
helper.scroll_speed = 500.0 # very fast → very short time
result = helper.calculate_dynamic_duration()
assert result >= 60
def test_respects_max_duration(self, helper):
helper.create_scrolling_image([_make_image(width=5000)])
helper.min_duration = 10
helper.max_duration = 60
helper.scroll_speed = 1.0 # very slow → very long time
result = helper.calculate_dynamic_duration()
assert result <= 60
def test_time_based_calculation(self, helper):
helper.create_scrolling_image([_make_image(width=200)])
helper.scroll_speed = 100.0
helper.min_duration = 1
helper.max_duration = 600
helper.frame_based_scrolling = False
result = helper.calculate_dynamic_duration()
assert isinstance(result, int)
assert result > 0
# ---------------------------------------------------------------------------
# set_* configuration methods
# ---------------------------------------------------------------------------
class TestSetMethods:
def test_set_scroll_speed_time_based(self, helper):
helper.frame_based_scrolling = False
helper.set_scroll_speed(50.0)
assert helper.scroll_speed == 50.0
def test_set_scroll_speed_clamped_low(self, helper):
helper.frame_based_scrolling = False
helper.set_scroll_speed(0.0)
assert helper.scroll_speed >= 1.0
def test_set_scroll_speed_clamped_high(self, helper):
helper.frame_based_scrolling = False
helper.set_scroll_speed(10000.0)
assert helper.scroll_speed <= 500.0
def test_set_scroll_delay(self, helper):
helper.set_scroll_delay(0.05)
assert helper.scroll_delay == 0.05
def test_set_scroll_delay_clamped(self, helper):
helper.set_scroll_delay(0.0001)
assert helper.scroll_delay >= 0.001
def test_set_target_fps(self, helper):
helper.set_target_fps(60.0)
assert helper.target_fps == 60.0
def test_set_target_fps_clamped(self, helper):
helper.set_target_fps(1000.0)
assert helper.target_fps <= 200.0
def test_set_sub_pixel_scrolling(self, helper):
helper.set_sub_pixel_scrolling(True)
assert helper.sub_pixel_scrolling is True
helper.set_sub_pixel_scrolling(False)
assert helper.sub_pixel_scrolling is False
def test_set_frame_based_scrolling(self, helper):
helper.set_frame_based_scrolling(True)
assert helper.frame_based_scrolling is True
def test_set_dynamic_duration_settings(self, helper):
helper.set_dynamic_duration_settings(enabled=True, min_duration=20, max_duration=120, buffer=0.2)
assert helper.dynamic_duration_enabled is True
assert helper.min_duration == 20
assert helper.max_duration == 120
assert helper.duration_buffer == pytest.approx(0.2)
# ---------------------------------------------------------------------------
# get_scroll_info
# ---------------------------------------------------------------------------
class TestGetScrollInfo:
def test_returns_dict(self, helper):
info = helper.get_scroll_info()
assert isinstance(info, dict)
def test_required_keys(self, helper):
info = helper.get_scroll_info()
for key in ("scroll_position", "total_distance_scrolled", "scroll_speed",
"scroll_complete", "dynamic_duration"):
assert key in info
def test_scroll_position_reflected(self, helper):
helper.scroll_position = 42.0
info = helper.get_scroll_info()
assert info["scroll_position"] == 42.0

View File

@@ -1,329 +0,0 @@
"""
Tests for src/common/utils.py
Covers all pure utility functions: normalize_team_abbreviation, format_time,
format_date, get_timezone, validate_dimensions, parse_team_abbreviation,
format_score, format_period, is_live_game, is_final_game, is_upcoming_game,
sanitize_filename, truncate_text, parse_boolean.
"""
import pytest
from datetime import datetime, timezone
import pytz
from src.common.utils import (
normalize_team_abbreviation,
format_time,
format_date,
get_timezone,
validate_dimensions,
parse_team_abbreviation,
format_score,
format_period,
is_live_game,
is_final_game,
is_upcoming_game,
sanitize_filename,
truncate_text,
parse_boolean,
)
# ---------------------------------------------------------------------------
# normalize_team_abbreviation
# ---------------------------------------------------------------------------
class TestNormalizeTeamAbbreviation:
def test_basic_uppercase(self):
assert normalize_team_abbreviation("lal") == "LAL"
def test_strips_spaces(self):
assert normalize_team_abbreviation(" KC ") == "KC"
def test_replaces_ampersand(self):
assert normalize_team_abbreviation("TA&M") == "TAANDM"
def test_removes_internal_spaces(self):
assert normalize_team_abbreviation("A B") == "AB"
def test_removes_hyphens(self):
assert normalize_team_abbreviation("A-B") == "AB"
def test_empty_string_returns_empty(self):
assert normalize_team_abbreviation("") == ""
def test_none_returns_empty(self):
assert normalize_team_abbreviation(None) == ""
# ---------------------------------------------------------------------------
# format_time / format_date
# ---------------------------------------------------------------------------
class TestFormatTime:
def _utc_dt(self, hour=20, minute=30):
return datetime(2024, 1, 15, hour, minute, 0, tzinfo=timezone.utc)
def test_formats_utc_to_utc(self):
dt = self._utc_dt(20, 30)
result = format_time(dt, timezone_str="UTC")
# 20:30 UTC → "8:30PM" (leading zero stripped)
assert "8:30PM" in result or "8:30 PM" in result or result != ""
def test_naive_datetime_treated_as_utc(self):
dt = datetime(2024, 1, 15, 12, 0, 0) # naive
result = format_time(dt, timezone_str="UTC")
assert result != ""
def test_invalid_timezone_returns_empty(self):
dt = self._utc_dt()
result = format_time(dt, timezone_str="Invalid/TZ")
assert result == ""
def test_eastern_timezone(self):
dt = self._utc_dt(20, 0) # 8 PM UTC = 3 PM ET
result = format_time(dt, timezone_str="America/New_York")
assert result != ""
class TestFormatDate:
def test_formats_date(self):
dt = datetime(2024, 6, 15, 18, 0, 0, tzinfo=timezone.utc)
result = format_date(dt, timezone_str="UTC")
assert "June" in result or "15" in result
def test_naive_datetime(self):
dt = datetime(2024, 3, 10, 12, 0, 0)
result = format_date(dt, timezone_str="UTC")
assert result != ""
def test_invalid_timezone_returns_empty(self):
dt = datetime(2024, 6, 15, 18, 0, 0, tzinfo=timezone.utc)
result = format_date(dt, timezone_str="BadZone/Here")
assert result == ""
# ---------------------------------------------------------------------------
# get_timezone
# ---------------------------------------------------------------------------
class TestGetTimezone:
def test_valid_timezone(self):
tz = get_timezone("America/New_York")
assert tz is not None
def test_utc(self):
tz = get_timezone("UTC")
assert tz is pytz.utc or str(tz) == "UTC"
def test_invalid_returns_utc(self):
tz = get_timezone("Not/ATimezone")
assert tz is pytz.utc
# ---------------------------------------------------------------------------
# validate_dimensions
# ---------------------------------------------------------------------------
class TestValidateDimensions:
def test_valid(self):
assert validate_dimensions(64, 32) is True
def test_zero_width(self):
assert validate_dimensions(0, 32) is False
def test_zero_height(self):
assert validate_dimensions(64, 0) is False
def test_negative(self):
assert validate_dimensions(-1, 32) is False
def test_too_large(self):
assert validate_dimensions(1001, 32) is False
def test_max_valid(self):
assert validate_dimensions(1000, 1000) is True
def test_non_integer(self):
assert validate_dimensions("64", 32) is False # type: ignore[arg-type]
# ---------------------------------------------------------------------------
# parse_team_abbreviation
# ---------------------------------------------------------------------------
class TestParseTeamAbbreviation:
def test_empty_string(self):
assert parse_team_abbreviation("") == ""
def test_none_returns_empty(self):
assert parse_team_abbreviation(None) == ""
def test_extracts_uppercase(self):
result = parse_team_abbreviation("LAL")
assert result == "LAL"
def test_fallback_first_three(self):
# text without recognisable 2-4 char uppercase block
result = parse_team_abbreviation("ab")
assert len(result) <= 3
# ---------------------------------------------------------------------------
# format_score
# ---------------------------------------------------------------------------
class TestFormatScore:
def test_format_score(self):
assert format_score(14, 7) == "7-14"
def test_format_score_strings(self):
assert format_score("21", "14") == "14-21"
def test_zero_zero(self):
assert format_score(0, 0) == "0-0"
# ---------------------------------------------------------------------------
# format_period
# ---------------------------------------------------------------------------
class TestFormatPeriod:
def test_basketball_q1(self):
assert format_period(1, "basketball") == "Q1"
def test_basketball_q4(self):
assert format_period(4, "basketball") == "Q4"
def test_basketball_ot1(self):
assert format_period(5, "basketball") == "OT1"
def test_basketball_ot2(self):
assert format_period(6, "basketball") == "OT2"
def test_football_q1(self):
assert format_period(1, "football") == "Q1"
def test_football_ot(self):
assert format_period(5, "football") == "OT1"
def test_hockey_p1(self):
assert format_period(1, "hockey") == "P1"
def test_hockey_p3(self):
assert format_period(3, "hockey") == "P3"
def test_hockey_ot(self):
assert format_period(4, "hockey") == "OT1"
def test_baseball_inning(self):
assert format_period(7, "baseball") == "INN 7"
def test_unknown_sport(self):
result = format_period(2, "unknown")
assert "2" in result
# ---------------------------------------------------------------------------
# is_live_game / is_final_game / is_upcoming_game
# ---------------------------------------------------------------------------
class TestGameStatusHelpers:
def test_is_live_game_true(self):
assert is_live_game("In Progress") is True
assert is_live_game("halftime") is True
assert is_live_game("overtime") is True
def test_is_live_game_false(self):
assert is_live_game("Final") is False
assert is_live_game("Scheduled") is False
def test_is_final_game_true(self):
assert is_final_game("Final") is True
assert is_final_game("COMPLETED") is True
def test_is_final_game_false(self):
assert is_final_game("In Progress") is False
def test_is_upcoming_game_true(self):
assert is_upcoming_game("Scheduled") is True
assert is_upcoming_game("upcoming") is True
def test_is_upcoming_game_false(self):
assert is_upcoming_game("Final") is False
assert is_upcoming_game("In Progress") is False
# ---------------------------------------------------------------------------
# sanitize_filename
# ---------------------------------------------------------------------------
class TestSanitizeFilename:
def test_removes_invalid_chars(self):
result = sanitize_filename('file<>:"/\\|?*.txt')
assert "<" not in result
assert ">" not in result
assert ":" not in result
def test_collapses_underscores(self):
result = sanitize_filename("file___name")
assert "__" not in result
def test_strips_leading_trailing(self):
result = sanitize_filename("_file_")
assert not result.startswith("_")
assert not result.endswith("_")
def test_normal_filename_unchanged(self):
result = sanitize_filename("my_logo")
assert result == "my_logo"
# ---------------------------------------------------------------------------
# truncate_text
# ---------------------------------------------------------------------------
class TestTruncateText:
def test_no_truncation_needed(self):
assert truncate_text("hello", 10) == "hello"
def test_truncation_adds_suffix(self):
result = truncate_text("hello world", 8)
assert result.endswith("...")
assert len(result) == 8
def test_exact_length(self):
assert truncate_text("hello", 5) == "hello"
def test_custom_suffix(self):
result = truncate_text("hello world", 8, suffix="~")
assert result.endswith("~")
# ---------------------------------------------------------------------------
# parse_boolean
# ---------------------------------------------------------------------------
class TestParseBoolean:
def test_true_bool(self):
assert parse_boolean(True) is True
def test_false_bool(self):
assert parse_boolean(False) is False
def test_int_1(self):
assert parse_boolean(1) is True
def test_int_0(self):
assert parse_boolean(0) is False
def test_string_true(self):
for val in ("true", "True", "TRUE", "1", "yes", "on", "enabled"):
assert parse_boolean(val) is True, f"Expected True for {val!r}"
def test_string_false(self):
for val in ("false", "False", "0", "no", "off", "disabled"):
assert parse_boolean(val) is False, f"Expected False for {val!r}"
def test_none_returns_false(self):
assert parse_boolean(None) is False # type: ignore[arg-type]

View File

@@ -1,310 +0,0 @@
"""
Tests for src/vegas_mode/config.py
Covers VegasModeConfig: from_config, to_dict, get_frame_interval,
is_plugin_included, get_ordered_plugins, validate, update.
"""
import pytest
from src.vegas_mode.config import VegasModeConfig
# ---------------------------------------------------------------------------
# Default construction
# ---------------------------------------------------------------------------
class TestVegasModeConfigDefaults:
def test_default_disabled(self):
cfg = VegasModeConfig()
assert cfg.enabled is False
def test_default_scroll_speed(self):
cfg = VegasModeConfig()
assert cfg.scroll_speed == 50.0
def test_default_separator_width(self):
cfg = VegasModeConfig()
assert cfg.separator_width == 32
def test_default_target_fps(self):
cfg = VegasModeConfig()
assert cfg.target_fps == 125
def test_default_plugin_order_empty(self):
cfg = VegasModeConfig()
assert cfg.plugin_order == []
def test_default_excluded_plugins_empty(self):
cfg = VegasModeConfig()
assert len(cfg.excluded_plugins) == 0
# ---------------------------------------------------------------------------
# from_config
# ---------------------------------------------------------------------------
class TestFromConfig:
def _cfg(self, **kwargs) -> dict:
return {"display": {"vegas_scroll": kwargs}}
def test_enabled_flag(self):
cfg = VegasModeConfig.from_config(self._cfg(enabled=True))
assert cfg.enabled is True
def test_scroll_speed(self):
cfg = VegasModeConfig.from_config(self._cfg(scroll_speed=80.0))
assert cfg.scroll_speed == 80.0
def test_separator_width(self):
cfg = VegasModeConfig.from_config(self._cfg(separator_width=16))
assert cfg.separator_width == 16
def test_plugin_order(self):
cfg = VegasModeConfig.from_config(self._cfg(plugin_order=["a", "b", "c"]))
assert cfg.plugin_order == ["a", "b", "c"]
def test_excluded_plugins(self):
cfg = VegasModeConfig.from_config(self._cfg(excluded_plugins=["x", "y"]))
assert "x" in cfg.excluded_plugins
assert "y" in cfg.excluded_plugins
def test_target_fps(self):
cfg = VegasModeConfig.from_config(self._cfg(target_fps=60))
assert cfg.target_fps == 60
def test_buffer_ahead(self):
cfg = VegasModeConfig.from_config(self._cfg(buffer_ahead=3))
assert cfg.buffer_ahead == 3
def test_min_max_cycle_duration(self):
cfg = VegasModeConfig.from_config(self._cfg(min_cycle_duration=30, max_cycle_duration=120))
assert cfg.min_cycle_duration == 30
assert cfg.max_cycle_duration == 120
def test_defaults_when_missing(self):
cfg = VegasModeConfig.from_config({})
assert cfg.enabled is False
assert cfg.scroll_speed == 50.0
def test_frame_based_scrolling(self):
cfg = VegasModeConfig.from_config(self._cfg(frame_based_scrolling=False))
assert cfg.frame_based_scrolling is False
# ---------------------------------------------------------------------------
# to_dict
# ---------------------------------------------------------------------------
class TestToDict:
def test_roundtrip(self):
original = VegasModeConfig(
enabled=True,
scroll_speed=75.0,
separator_width=24,
plugin_order=["a", "b"],
excluded_plugins={"z"},
target_fps=100,
)
d = original.to_dict()
assert d["enabled"] is True
assert d["scroll_speed"] == 75.0
assert d["separator_width"] == 24
assert d["plugin_order"] == ["a", "b"]
assert "z" in d["excluded_plugins"]
assert d["target_fps"] == 100
def test_excluded_plugins_is_list(self):
cfg = VegasModeConfig(excluded_plugins={"x"})
d = cfg.to_dict()
assert isinstance(d["excluded_plugins"], list)
def test_all_keys_present(self):
d = VegasModeConfig().to_dict()
for key in ("enabled", "scroll_speed", "separator_width", "plugin_order",
"excluded_plugins", "target_fps", "buffer_ahead",
"frame_based_scrolling", "scroll_delay",
"dynamic_duration_enabled", "min_cycle_duration", "max_cycle_duration"):
assert key in d
# ---------------------------------------------------------------------------
# get_frame_interval
# ---------------------------------------------------------------------------
class TestGetFrameInterval:
def test_125fps(self):
cfg = VegasModeConfig(target_fps=125)
assert abs(cfg.get_frame_interval() - 1.0 / 125) < 1e-9
def test_60fps(self):
cfg = VegasModeConfig(target_fps=60)
assert abs(cfg.get_frame_interval() - 1.0 / 60) < 1e-6
def test_zero_fps_guarded(self):
cfg = VegasModeConfig(target_fps=0)
# Should not raise ZeroDivisionError (max(1, fps) guard)
result = cfg.get_frame_interval()
assert result == 1.0
# ---------------------------------------------------------------------------
# is_plugin_included
# ---------------------------------------------------------------------------
class TestIsPluginIncluded:
def test_not_excluded_is_included(self):
cfg = VegasModeConfig(excluded_plugins={"bad_plugin"})
assert cfg.is_plugin_included("good_plugin") is True
def test_excluded_plugin_not_included(self):
cfg = VegasModeConfig(excluded_plugins={"bad_plugin"})
assert cfg.is_plugin_included("bad_plugin") is False
def test_empty_exclusions_all_included(self):
cfg = VegasModeConfig()
assert cfg.is_plugin_included("anything") is True
# ---------------------------------------------------------------------------
# get_ordered_plugins
# ---------------------------------------------------------------------------
class TestGetOrderedPlugins:
def test_natural_order_when_no_order_configured(self):
cfg = VegasModeConfig()
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert result == ["a", "b", "c"]
def test_explicit_order_followed(self):
cfg = VegasModeConfig(plugin_order=["c", "a", "b"])
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert result == ["c", "a", "b"]
def test_unavailable_plugins_skipped(self):
cfg = VegasModeConfig(plugin_order=["c", "x", "a"])
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert "x" not in result
assert result[:2] == ["c", "a"]
def test_excluded_plugins_removed(self):
cfg = VegasModeConfig(excluded_plugins={"b"})
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert "b" not in result
def test_unordered_available_appended(self):
cfg = VegasModeConfig(plugin_order=["a"])
available = ["a", "b", "c"]
result = cfg.get_ordered_plugins(available)
assert result[0] == "a"
assert "b" in result
assert "c" in result
def test_empty_available(self):
cfg = VegasModeConfig(plugin_order=["a"])
result = cfg.get_ordered_plugins([])
assert result == []
# ---------------------------------------------------------------------------
# validate
# ---------------------------------------------------------------------------
class TestValidate:
def test_valid_config_no_errors(self):
cfg = VegasModeConfig()
errors = cfg.validate()
assert errors == []
def test_scroll_speed_too_low(self):
cfg = VegasModeConfig(scroll_speed=0.5)
errors = cfg.validate()
assert any("scroll_speed" in e for e in errors)
def test_scroll_speed_too_high(self):
cfg = VegasModeConfig(scroll_speed=300.0)
errors = cfg.validate()
assert any("scroll_speed" in e for e in errors)
def test_separator_width_negative(self):
cfg = VegasModeConfig(separator_width=-1)
errors = cfg.validate()
assert any("separator_width" in e for e in errors)
def test_separator_width_too_large(self):
cfg = VegasModeConfig(separator_width=200)
errors = cfg.validate()
assert any("separator_width" in e for e in errors)
def test_target_fps_too_low(self):
cfg = VegasModeConfig(target_fps=10)
errors = cfg.validate()
assert any("target_fps" in e for e in errors)
def test_target_fps_too_high(self):
cfg = VegasModeConfig(target_fps=300)
errors = cfg.validate()
assert any("target_fps" in e for e in errors)
def test_buffer_ahead_too_low(self):
cfg = VegasModeConfig(buffer_ahead=0)
errors = cfg.validate()
assert any("buffer_ahead" in e for e in errors)
def test_buffer_ahead_too_high(self):
cfg = VegasModeConfig(buffer_ahead=10)
errors = cfg.validate()
assert any("buffer_ahead" in e for e in errors)
def test_multiple_errors_returned(self):
cfg = VegasModeConfig(scroll_speed=0.1, target_fps=5)
errors = cfg.validate()
assert len(errors) >= 2
# ---------------------------------------------------------------------------
# update
# ---------------------------------------------------------------------------
class TestUpdate:
def _wrap(self, **kwargs) -> dict:
return {"display": {"vegas_scroll": kwargs}}
def test_update_enabled(self):
cfg = VegasModeConfig(enabled=False)
cfg.update(self._wrap(enabled=True))
assert cfg.enabled is True
def test_update_scroll_speed(self):
cfg = VegasModeConfig(scroll_speed=50.0)
cfg.update(self._wrap(scroll_speed=90.0))
assert cfg.scroll_speed == 90.0
def test_update_separator_width(self):
cfg = VegasModeConfig(separator_width=32)
cfg.update(self._wrap(separator_width=8))
assert cfg.separator_width == 8
def test_update_plugin_order(self):
cfg = VegasModeConfig(plugin_order=[])
cfg.update(self._wrap(plugin_order=["x", "y"]))
assert cfg.plugin_order == ["x", "y"]
def test_update_excluded_plugins(self):
cfg = VegasModeConfig()
cfg.update(self._wrap(excluded_plugins=["skip_me"]))
assert "skip_me" in cfg.excluded_plugins
def test_update_ignores_missing_keys(self):
cfg = VegasModeConfig(scroll_speed=50.0)
cfg.update(self._wrap(target_fps=80)) # only fps, not speed
assert cfg.scroll_speed == 50.0
assert cfg.target_fps == 80
def test_empty_update_no_change(self):
cfg = VegasModeConfig(scroll_speed=50.0)
cfg.update({})
assert cfg.scroll_speed == 50.0

View File

@@ -402,7 +402,9 @@ def save_schedule_config():
return success_response(message='Schedule configuration saved successfully') return success_response(message='Schedule configuration saved successfully')
except Exception as e: except Exception as e:
import logging import logging
logger.error("Error saving schedule config", exc_info=True) import traceback
error_msg = f"Error saving schedule config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return error_response( return error_response(
ErrorCode.CONFIG_SAVE_FAILED, ErrorCode.CONFIG_SAVE_FAILED,
"An error occurred; see logs for details", "An error occurred; see logs for details",
@@ -621,7 +623,9 @@ def save_dim_schedule_config():
return success_response(message='Dim schedule configuration saved successfully') return success_response(message='Dim schedule configuration saved successfully')
except Exception as e: except Exception as e:
import logging import logging
logger.error("Error saving dim schedule config", exc_info=True) import traceback
error_msg = f"Error saving dim schedule config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return error_response( return error_response(
ErrorCode.CONFIG_SAVE_FAILED, ErrorCode.CONFIG_SAVE_FAILED,
"An error occurred; see logs for details", "An error occurred; see logs for details",
@@ -917,7 +921,7 @@ def save_main_config():
if 'properties' in schema: if 'properties' in schema:
secret_fields = find_secret_fields(schema['properties']) secret_fields = find_secret_fields(schema['properties'])
except Exception as e: except Exception as e:
logger.debug("Error reading schema for secret detection: %s", e) print(f"Error reading schema for secret detection: {e}")
# Separate secrets from regular config (same logic as save_plugin_config) # Separate secrets from regular config (same logic as save_plugin_config)
def separate_secrets(config, secrets_set, prefix=''): def separate_secrets(config, secrets_set, prefix=''):
@@ -955,7 +959,7 @@ def save_main_config():
if 'enabled' not in regular_config: if 'enabled' not in regular_config:
regular_config['enabled'] = True regular_config['enabled'] = True
except Exception as e: except Exception as e:
logger.debug("Error preserving enabled state: %s", e) print(f"Error preserving enabled state for {plugin_id}: {e}")
# Default to True on error to avoid disabling plugins # Default to True on error to avoid disabling plugins
regular_config['enabled'] = True regular_config['enabled'] = True
@@ -990,7 +994,7 @@ def save_main_config():
plugin_instance.on_config_change(plugin_full_config) plugin_instance.on_config_change(plugin_full_config)
except Exception as hook_err: except Exception as hook_err:
# Don't fail the save if hook fails # Don't fail the save if hook fails
logger.warning("on_config_change failed: %s", hook_err) print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}")
# Remove processed plugin keys from data (they're already in current_config) # Remove processed plugin keys from data (they're already in current_config)
for key in plugin_keys_to_remove: for key in plugin_keys_to_remove:
@@ -1039,10 +1043,14 @@ def save_main_config():
return success_response(message='Configuration saved successfully') return success_response(message='Configuration saved successfully')
except Exception as e: except Exception as e:
logger.error("Error saving config", exc_info=True) import logging
import traceback
error_msg = f"Error saving config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return error_response( return error_response(
ErrorCode.CONFIG_SAVE_FAILED, ErrorCode.CONFIG_SAVE_FAILED,
"An error occurred; see logs for details", f"Error saving configuration: {e}",
status_code=500 status_code=500
) )
@@ -1079,8 +1087,13 @@ def save_raw_main_config():
logger.error('Invalid JSON', exc_info=True) logger.error('Invalid JSON', exc_info=True)
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400 return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
except Exception as e: except Exception as e:
import logging
import traceback
from src.exceptions import ConfigError from src.exceptions import ConfigError
logger.error("Error saving raw main config", exc_info=True)
# Log the full error for debugging
error_msg = f"Error saving raw main config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
# Extract more specific error message if it's a ConfigError # Extract more specific error message if it's a ConfigError
if isinstance(e, ConfigError): if isinstance(e, ConfigError):
@@ -1126,8 +1139,13 @@ def save_raw_secrets_config():
logger.error('Invalid JSON', exc_info=True) logger.error('Invalid JSON', exc_info=True)
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400 return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
except Exception as e: except Exception as e:
import logging
import traceback
from src.exceptions import ConfigError from src.exceptions import ConfigError
logger.error("Error saving raw secrets config", exc_info=True)
# Log the full error for debugging
error_msg = f"Error saving raw secrets config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
# Extract more specific error message if it's a ConfigError # Extract more specific error message if it's a ConfigError
if isinstance(e, ConfigError): if isinstance(e, ConfigError):
@@ -1347,9 +1365,7 @@ def get_git_version(project_dir=None):
) )
if result.returncode == 0: if result.returncode == 0:
version_str = result.stdout.strip() return result.stdout.strip()
if re.match(r'^[a-zA-Z0-9._\-]+$', version_str):
return version_str
# Fallback to short commit hash # Fallback to short commit hash
result = subprocess.run( result = subprocess.run(
@@ -1361,9 +1377,7 @@ def get_git_version(project_dir=None):
) )
if result.returncode == 0: if result.returncode == 0:
version_str = result.stdout.strip() return result.stdout.strip()
if re.match(r'^[a-zA-Z0-9._\-]+$', version_str):
return version_str
return 'Unknown' return 'Unknown'
except Exception: except Exception:
@@ -1458,10 +1472,12 @@ def execute_system_action():
# For now, just start the display service # For now, just start the display service
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'], result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
capture_output=True, text=True) capture_output=True, text=True)
logger.info("start_display (%s) returned code %d", mode, result.returncode)
return jsonify({ return jsonify({
'status': 'success' if result.returncode == 0 else 'error', 'status': 'success' if result.returncode == 0 else 'error',
'message': 'Display started' if result.returncode == 0 else 'Failed to start display', 'message': f'Started display in {mode} mode',
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}) })
else: else:
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'], result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
@@ -1522,12 +1538,13 @@ def execute_system_action():
cwd=project_dir cwd=project_dir
) )
if stash_result.returncode == 0: if stash_result.returncode == 0:
logger.debug("git stash: stashed local changes before pull") print(f"Stashed local changes: {stash_result.stdout}")
stash_info = " Local changes were stashed." stash_info = " Local changes were stashed."
else: else:
logger.warning("git stash failed before pull (returncode=%d)", stash_result.returncode) # If stash fails, log but continue with pull
print(f"Stash failed: {stash_result.stderr}")
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
logger.warning("git stash timed out, proceeding with pull") print("Stash operation timed out, proceeding with pull")
# Perform the git pull # Perform the git pull
result = subprocess.run( result = subprocess.run(
@@ -1546,12 +1563,14 @@ def execute_system_action():
if result.stdout and "Already up to date" not in result.stdout: if result.stdout and "Already up to date" not in result.stdout:
pull_message = f"Code updated successfully.{stash_info}" pull_message = f"Code updated successfully.{stash_info}"
else: else:
logger.warning("git pull failed (returncode=%d): %s", result.returncode, result.stderr) pull_message = f"Update failed: {result.stderr or 'Unknown error'}"
pull_message = "Update failed; check logs for details"
return jsonify({ return jsonify({
'status': 'success' if result.returncode == 0 else 'error', 'status': 'success' if result.returncode == 0 else 'error',
'message': pull_message, 'message': pull_message,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}) })
elif action == 'restart_display_service': elif action == 'restart_display_service':
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'], result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'],
@@ -1561,12 +1580,14 @@ def execute_system_action():
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'], result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'],
capture_output=True, text=True) capture_output=True, text=True)
else: else:
return jsonify({'status': 'error', 'message': 'Unknown action'}), 400 return jsonify({'status': 'error', 'message': f'Unknown action: {action}'}), 400
logger.info("system action '%s' returncode=%d", action, result.returncode)
return jsonify({ return jsonify({
'status': 'success' if result.returncode == 0 else 'error', 'status': 'success' if result.returncode == 0 else 'error',
'message': 'Action completed' if result.returncode == 0 else 'Action failed; check logs for details', 'message': f'Action {action} completed',
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}) })
except Exception as e: except Exception as e:
@@ -1666,6 +1687,8 @@ def get_on_demand_status():
} }
}) })
except Exception as exc: except Exception as exc:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_on_demand_status', exc_info=True) logger.error('Error in get_on_demand_status', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1732,11 +1755,11 @@ def start_on_demand_display():
# Stop the display service first to ensure clean state when we will restart it # Stop the display service first to ensure clean state when we will restart it
if service_was_running and start_service: if service_was_running and start_service:
import time as time_module import time as time_module
logger.debug("Stopping display service before starting on-demand mode") print("Stopping display service before starting on-demand mode...")
_stop_display_service() _stop_display_service()
# Wait a brief moment for the service to fully stop # Wait a brief moment for the service to fully stop
time_module.sleep(1.5) time_module.sleep(1.5)
logger.debug("Display service stopped, now starting with on-demand request") print("Display service stopped, now starting with on-demand request...")
if not service_status.get('active') and not start_service: if not service_status.get('active') and not start_service:
return jsonify({ return jsonify({
@@ -1769,6 +1792,8 @@ def start_on_demand_display():
} }
return jsonify({'status': 'success', 'data': response_data}) return jsonify({'status': 'success', 'data': response_data})
except Exception as exc: except Exception as exc:
import traceback
error_details = traceback.format_exc()
logger.error('Error in start_on_demand_display', exc_info=True) logger.error('Error in start_on_demand_display', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1805,6 +1830,8 @@ def stop_on_demand_display():
} }
}) })
except Exception as exc: except Exception as exc:
import traceback
error_details = traceback.format_exc()
logger.error('Error in stop_on_demand_display', exc_info=True) logger.error('Error in stop_on_demand_display', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1934,6 +1961,8 @@ def get_installed_plugins():
return jsonify({'status': 'success', 'data': {'plugins': plugins}}) return jsonify({'status': 'success', 'data': {'plugins': plugins}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_installed_plugins', exc_info=True) logger.error('Error in get_installed_plugins', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1960,6 +1989,8 @@ def get_plugin_health():
'data': health_summaries 'data': health_summaries
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_health', exc_info=True) logger.error('Error in get_plugin_health', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1985,6 +2016,8 @@ def get_plugin_health_single(plugin_id):
'data': health_summary 'data': health_summary
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_health_single', exc_info=True) logger.error('Error in get_plugin_health_single', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2010,6 +2043,8 @@ def reset_plugin_health(plugin_id):
'message': f'Health state reset for plugin {plugin_id}' 'message': f'Health state reset for plugin {plugin_id}'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in reset_plugin_health', exc_info=True) logger.error('Error in reset_plugin_health', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2036,6 +2071,8 @@ def get_plugin_metrics():
'data': metrics_summaries 'data': metrics_summaries
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_metrics', exc_info=True) logger.error('Error in get_plugin_metrics', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2061,6 +2098,8 @@ def get_plugin_metrics_single(plugin_id):
'data': metrics_summary 'data': metrics_summary
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_metrics_single', exc_info=True) logger.error('Error in get_plugin_metrics_single', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2086,6 +2125,8 @@ def reset_plugin_metrics(plugin_id):
'message': f'Metrics reset for plugin {plugin_id}' 'message': f'Metrics reset for plugin {plugin_id}'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in reset_plugin_metrics', exc_info=True) logger.error('Error in reset_plugin_metrics', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2141,6 +2182,8 @@ def manage_plugin_limits(plugin_id):
'message': f'Resource limits updated for plugin {plugin_id}' 'message': f'Resource limits updated for plugin {plugin_id}'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in manage_plugin_limits', exc_info=True) logger.error('Error in manage_plugin_limits', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2185,7 +2228,7 @@ def toggle_plugin():
# Check if plugin exists in manifests (discovered but may not be loaded) # Check if plugin exists in manifests (discovered but may not be loaded)
if plugin_id not in api_v3.plugin_manager.plugin_manifests: if plugin_id not in api_v3.plugin_manager.plugin_manifests:
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
# Update config (this is what the display controller reads) # Update config (this is what the display controller reads)
config = api_v3.config_manager.load_config() config = api_v3.config_manager.load_config()
@@ -2562,7 +2605,7 @@ def get_plugin_config():
categories_from_files[category_name]['data_file'] = f'of_the_day/{filename}' categories_from_files[category_name]['data_file'] = f'of_the_day/{filename}'
except Exception as e: except Exception as e:
logger.debug("Could not read json file: %s", e) print(f"Warning: Could not read {json_file}: {e}")
continue continue
# Update plugin_config with scanned files # Update plugin_config with scanned files
@@ -2665,7 +2708,7 @@ def update_plugin():
manifest = json.load(f) manifest = json.load(f)
current_last_updated = manifest.get('last_updated') current_last_updated = manifest.get('last_updated')
except Exception as e: except Exception as e:
logger.debug("Could not read local manifest for plugin: %s", e) print(f"Warning: Could not read local manifest for {plugin_id}: {e}")
if api_v3.plugin_store_manager: if api_v3.plugin_store_manager:
git_info_before = api_v3.plugin_store_manager._get_local_git_info(plugin_dir) git_info_before = api_v3.plugin_store_manager._get_local_git_info(plugin_dir)
@@ -2680,14 +2723,16 @@ def update_plugin():
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir) git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
is_git_repo = git_info is not None is_git_repo = git_info is not None
if is_git_repo: if is_git_repo:
logger.debug("Plugin is a git repository, will update via git pull") print(f"[UPDATE] Plugin {plugin_id} is a git repository, will update via git pull")
remote_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id, fetch_latest_from_github=True) remote_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id, fetch_latest_from_github=True)
remote_commit = remote_info.get('last_commit_sha') if remote_info else None remote_commit = remote_info.get('last_commit_sha') if remote_info else None
remote_branch = remote_info.get('branch') if remote_info else None remote_branch = remote_info.get('branch') if remote_info else None
# Update the plugin # Update the plugin
print(f"[UPDATE] Attempting to update plugin {plugin_id}...")
success = api_v3.plugin_store_manager.update_plugin(plugin_id) success = api_v3.plugin_store_manager.update_plugin(plugin_id)
print(f"[UPDATE] Update result for {plugin_id}: {success}")
if success: if success:
updated_last_updated = current_last_updated updated_last_updated = current_last_updated
@@ -2698,7 +2743,7 @@ def update_plugin():
manifest = json.load(f) manifest = json.load(f)
updated_last_updated = manifest.get('last_updated', current_last_updated) updated_last_updated = manifest.get('last_updated', current_last_updated)
except Exception as e: except Exception as e:
logger.debug("Could not read updated manifest after update: %s", e) print(f"Warning: Could not read updated manifest for {plugin_id}: {e}")
updated_commit = None updated_commit = None
updated_branch = remote_branch or current_branch updated_branch = remote_branch or current_branch
@@ -2760,41 +2805,52 @@ def update_plugin():
message=message message=message
) )
else: else:
error_msg = f'Failed to update plugin {plugin_id}'
plugin_path_dir = Path(api_v3.plugin_store_manager.plugins_dir) / plugin_id plugin_path_dir = Path(api_v3.plugin_store_manager.plugins_dir) / plugin_id
if not plugin_path_dir.exists(): if not plugin_path_dir.exists():
client_msg = 'Plugin update failed: plugin not found' error_msg += ': Plugin not found'
else: else:
# Check if it's a git repo (could be installed from URL, not in registry)
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir) git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
if not git_info: if git_info:
# It's a git repo, so update should have worked - provide generic error
error_msg += ': Update failed (check logs for details)'
else:
# Not a git repo, check if it's in registry
plugin_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id) plugin_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id)
if not plugin_info: if not plugin_info:
client_msg = 'Plugin update failed: not found in registry' error_msg += ': Plugin not found in registry and not a git repository'
else: else:
client_msg = 'Plugin update failed; check logs for details' error_msg += ': Update failed (check logs for details)'
else:
client_msg = 'Plugin update failed; check logs for details'
logger.error("update_plugin failed for plugin_id=%s: %s", plugin_id, client_msg)
if api_v3.operation_history: if api_v3.operation_history:
api_v3.operation_history.record_operation( api_v3.operation_history.record_operation(
"update", "update",
plugin_id=plugin_id, plugin_id=plugin_id,
status="failed", status="failed",
error=client_msg, error=error_msg,
details={ details={
"previous_commit": current_commit[:7] if current_commit else None, "previous_commit": current_commit[:7] if current_commit else None,
"branch": current_branch "branch": current_branch
} }
) )
import traceback
error_details = traceback.format_exc()
print(f"[UPDATE] Update failed for {plugin_id}: {error_msg}")
print(f"[UPDATE] Traceback: {error_details}")
return error_response( return error_response(
ErrorCode.PLUGIN_UPDATE_FAILED, ErrorCode.PLUGIN_UPDATE_FAILED,
client_msg, error_msg,
status_code=500 status_code=500
) )
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error("Unhandled exception in update endpoint", exc_info=True) logger.error("Unhandled exception in update endpoint", exc_info=True)
print(f"[UPDATE] Traceback: {error_details}")
from src.web_interface.errors import WebInterfaceError from src.web_interface.errors import WebInterfaceError
error = WebInterfaceError.from_exception(e, ErrorCode.PLUGIN_UPDATE_FAILED) error = WebInterfaceError.from_exception(e, ErrorCode.PLUGIN_UPDATE_FAILED)
@@ -2863,7 +2919,7 @@ def uninstall_plugin():
try: try:
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True) api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
except Exception as cleanup_err: except Exception as cleanup_err:
logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err) print(f"Warning: Failed to cleanup config for {plugin_id}: {cleanup_err}")
# Remove from state manager # Remove from state manager
if api_v3.plugin_state_manager: if api_v3.plugin_state_manager:
@@ -2878,7 +2934,7 @@ def uninstall_plugin():
details={"preserve_config": preserve_config} details={"preserve_config": preserve_config}
) )
return {'success': True, 'message': 'Plugin uninstalled successfully'} return {'success': True, 'message': f'Plugin {plugin_id} uninstalled successfully'}
# Enqueue operation # Enqueue operation
operation_id = api_v3.operation_queue.enqueue_operation( operation_id = api_v3.operation_queue.enqueue_operation(
@@ -2889,7 +2945,7 @@ def uninstall_plugin():
return success_response( return success_response(
data={'operation_id': operation_id}, data={'operation_id': operation_id},
message='Plugin uninstallation queued' message=f'Plugin {plugin_id} uninstallation queued'
) )
else: else:
# Fallback to direct uninstall # Fallback to direct uninstall
@@ -2910,7 +2966,7 @@ def uninstall_plugin():
try: try:
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True) api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
except Exception as cleanup_err: except Exception as cleanup_err:
logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err) print(f"Warning: Failed to cleanup config for {plugin_id}: {cleanup_err}")
# Remove from state manager # Remove from state manager
if api_v3.plugin_state_manager: if api_v3.plugin_state_manager:
@@ -2925,19 +2981,19 @@ def uninstall_plugin():
details={"preserve_config": preserve_config} details={"preserve_config": preserve_config}
) )
return success_response(message='Plugin uninstalled successfully') return success_response(message=f'Plugin {plugin_id} uninstalled successfully')
else: else:
if api_v3.operation_history: if api_v3.operation_history:
api_v3.operation_history.record_operation( api_v3.operation_history.record_operation(
"uninstall", "uninstall",
plugin_id=plugin_id, plugin_id=plugin_id,
status="failed", status="failed",
error='Plugin uninstall failed' error=f'Failed to uninstall plugin {plugin_id}'
) )
return error_response( return error_response(
ErrorCode.PLUGIN_UNINSTALL_FAILED, ErrorCode.PLUGIN_UNINSTALL_FAILED,
'Plugin uninstall failed', f'Failed to uninstall plugin {plugin_id}',
status_code=500 status_code=500
) )
@@ -2977,7 +3033,7 @@ def install_plugin():
# Log the plugins directory being used for debugging # Log the plugins directory being used for debugging
plugins_dir = api_v3.plugin_store_manager.plugins_dir plugins_dir = api_v3.plugin_store_manager.plugins_dir
branch_info = f" (branch: {branch})" if branch else "" branch_info = f" (branch: {branch})" if branch else ""
logger.info("Installing plugin to directory: %s", plugins_dir) print(f"Installing plugin {plugin_id}{branch_info} to directory: {plugins_dir}", flush=True)
# Use operation queue if available # Use operation queue if available
if api_v3.operation_queue: if api_v3.operation_queue:
@@ -3065,7 +3121,7 @@ def install_plugin():
) )
branch_msg = f" (branch: {branch})" if branch else "" branch_msg = f" (branch: {branch})" if branch else ""
return success_response(message=f'Plugin installed successfully{branch_msg}') return success_response(message=f'Plugin {plugin_id} installed successfully{branch_msg}')
else: else:
error_msg = f'Failed to install plugin {plugin_id}' error_msg = f'Failed to install plugin {plugin_id}'
if branch: if branch:
@@ -3090,6 +3146,8 @@ def install_plugin():
) )
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in install_plugin', exc_info=True) logger.error('Error in install_plugin', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3145,6 +3203,8 @@ def install_plugin_from_url():
}), 500 }), 500
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in install_plugin_from_url', exc_info=True) logger.error('Error in install_plugin_from_url', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3177,6 +3237,8 @@ def get_registry_from_url():
}), 400 }), 400
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_registry_from_url', exc_info=True) logger.error('Error in get_registry_from_url', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3190,6 +3252,8 @@ def get_saved_repositories():
repositories = api_v3.saved_repositories_manager.get_all() repositories = api_v3.saved_repositories_manager.get_all()
return jsonify({'status': 'success', 'data': {'repositories': repositories}}) return jsonify({'status': 'success', 'data': {'repositories': repositories}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_saved_repositories', exc_info=True) logger.error('Error in get_saved_repositories', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3221,6 +3285,8 @@ def add_saved_repository():
'message': 'Repository already exists or failed to save' 'message': 'Repository already exists or failed to save'
}), 400 }), 400
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in add_saved_repository', exc_info=True) logger.error('Error in add_saved_repository', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3251,6 +3317,8 @@ def remove_saved_repository():
'message': 'Repository not found' 'message': 'Repository not found'
}), 404 }), 404
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in remove_saved_repository', exc_info=True) logger.error('Error in remove_saved_repository', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3304,6 +3372,8 @@ def list_plugin_store():
return jsonify({'status': 'success', 'data': {'plugins': formatted_plugins}}) return jsonify({'status': 'success', 'data': {'plugins': formatted_plugins}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in list_plugin_store', exc_info=True) logger.error('Error in list_plugin_store', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3355,6 +3425,8 @@ def get_github_auth_status():
} }
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_github_auth_status', exc_info=True) logger.error('Error in get_github_auth_status', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3382,6 +3454,8 @@ def refresh_plugin_store():
'plugin_count': plugin_count 'plugin_count': plugin_count
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in refresh_plugin_store', exc_info=True) logger.error('Error in refresh_plugin_store', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -4309,7 +4383,7 @@ def save_plugin_config():
if 'enabled' not in plugin_config: if 'enabled' not in plugin_config:
plugin_config['enabled'] = True plugin_config['enabled'] = True
except Exception as e: except Exception as e:
logger.debug("Error preserving enabled state: %s", e) print(f"Error preserving enabled state: {e}")
# Default to True on error to avoid disabling plugins # Default to True on error to avoid disabling plugins
plugin_config['enabled'] = True plugin_config['enabled'] = True
@@ -4606,11 +4680,16 @@ def save_plugin_config():
# Also print to console for immediate visibility # Also print to console for immediate visibility
import json import json
logger.warning("Config validation failed for plugin (see debug logs)") print(f"[ERROR] Config validation failed for {plugin_id}")
print(f"[ERROR] Validation errors: {validation_errors}")
print(f"[ERROR] Config keys: {list(plugin_config.keys())}")
print(f"[ERROR] Schema property keys: {list(enhanced_schema.get('properties', {}).keys())}")
# Log raw form data if this was a form submission # Log raw form data if this was a form submission
if 'application/json' not in (request.content_type or ''): if 'application/json' not in (request.content_type or ''):
form_data = request.form.to_dict() form_data = request.form.to_dict()
print(f"[ERROR] Raw form data: {json.dumps({k: str(v)[:200] for k, v in form_data.items()}, indent=2)}")
print(f"[ERROR] Parsed config: {json.dumps(plugin_config, indent=2, default=str)}")
return error_response( return error_response(
ErrorCode.CONFIG_VALIDATION_FAILED, ErrorCode.CONFIG_VALIDATION_FAILED,
'Configuration validation failed', 'Configuration validation failed',
@@ -4749,7 +4828,7 @@ def save_plugin_config():
logging.warning(f"Lifecycle method error for {plugin_id}: {lifecycle_error}", exc_info=True) logging.warning(f"Lifecycle method error for {plugin_id}: {lifecycle_error}", exc_info=True)
except Exception as hook_err: except Exception as hook_err:
# Do not fail the save if hook fails; just log # Do not fail the save if hook fails; just log
logger.warning("on_config_change failed: %s", hook_err) print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}")
secret_count = len(secrets_config) secret_count = len(secrets_config)
message = f'Plugin {plugin_id} configuration saved successfully' message = f'Plugin {plugin_id} configuration saved successfully'
@@ -4817,6 +4896,8 @@ def get_plugin_schema():
return jsonify({'status': 'success', 'data': {'schema': default_schema}}) return jsonify({'status': 'success', 'data': {'schema': default_schema}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_schema', exc_info=True) logger.error('Error in get_plugin_schema', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -4920,7 +5001,7 @@ def reset_plugin_config():
if hasattr(plugin_instance, 'on_config_change'): if hasattr(plugin_instance, 'on_config_change'):
plugin_instance.on_config_change(plugin_full_config) plugin_instance.on_config_change(plugin_full_config)
except Exception as hook_err: except Exception as hook_err:
logger.warning("on_config_change failed: %s", hook_err) print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}")
return jsonify({ return jsonify({
'status': 'success', 'status': 'success',
@@ -4928,6 +5009,8 @@ def reset_plugin_config():
'data': {'config': defaults} 'data': {'config': defaults}
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in reset_plugin_config', exc_info=True) logger.error('Error in reset_plugin_config', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -4965,7 +5048,7 @@ def execute_plugin_action():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
# Load manifest to get action definition # Load manifest to get action definition
manifest_path = Path(plugin_dir) / 'manifest.json' manifest_path = Path(plugin_dir) / 'manifest.json'
@@ -5186,7 +5269,9 @@ sys.exit(proc.returncode)
'message': 'Could not generate authorization URL' 'message': 'Could not generate authorization URL'
}), 400 }), 400
except Exception as e: except Exception as e:
logger.error("Error executing action step 1", exc_info=True) import traceback
error_details = traceback.format_exc()
print(f"Error executing action step 1: {e}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
@@ -5238,6 +5323,8 @@ sys.exit(proc.returncode)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Action timed out'}), 408 return jsonify({'status': 'error', 'message': 'Action timed out'}), 408
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in execute_plugin_action', exc_info=True) logger.error('Error in execute_plugin_action', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -5256,7 +5343,7 @@ def authenticate_spotify():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
auth_script = Path(plugin_dir) / 'authenticate_spotify.py' auth_script = Path(plugin_dir) / 'authenticate_spotify.py'
if not auth_script.exists(): if not auth_script.exists():
@@ -5369,13 +5456,17 @@ sys.exit(proc.returncode)
'auth_url': auth_url 'auth_url': auth_url
}) })
except Exception as e: except Exception as e:
logger.error("Error getting Spotify auth URL", exc_info=True) import traceback
error_details = traceback.format_exc()
print(f"Error getting Spotify auth URL: {e}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
}), 500 }), 500
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in authenticate_spotify', exc_info=True) logger.error('Error in authenticate_spotify', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -5391,7 +5482,7 @@ def authenticate_ytm():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
auth_script = Path(plugin_dir) / 'authenticate_ytm.py' auth_script = Path(plugin_dir) / 'authenticate_ytm.py'
if not auth_script.exists(): if not auth_script.exists():
@@ -5426,6 +5517,8 @@ def authenticate_ytm():
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Authentication timed out'}), 408 return jsonify({'status': 'error', 'message': 'Authentication timed out'}), 408
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in authenticate_ytm', exc_info=True) logger.error('Error in authenticate_ytm', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6022,6 +6115,7 @@ def upload_plugin_asset():
}) })
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6044,7 +6138,7 @@ def upload_of_the_day_json():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
# Setup of_the_day directory # Setup of_the_day directory
data_dir = Path(plugin_dir) / 'of_the_day' data_dir = Path(plugin_dir) / 'of_the_day'
@@ -6147,7 +6241,7 @@ def upload_of_the_day_json():
from scripts.update_config import add_category_to_config from scripts.update_config import add_category_to_config
add_category_to_config(category_name, f'of_the_day/{safe_filename}', display_name) add_category_to_config(category_name, f'of_the_day/{safe_filename}', display_name)
except Exception as e: except Exception as e:
logger.warning("Could not update config: %s", e) print(f"Warning: Could not update config: {e}")
# Continue anyway - file is uploaded # Continue anyway - file is uploaded
# Generate file ID (use category name as ID for simplicity) # Generate file ID (use category name as ID for simplicity)
@@ -6172,6 +6266,7 @@ def upload_of_the_day_json():
}) })
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6193,7 +6288,7 @@ def delete_of_the_day_json():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
data_dir = Path(plugin_dir) / 'of_the_day' data_dir = Path(plugin_dir) / 'of_the_day'
filename = f"{file_id}.json" filename = f"{file_id}.json"
@@ -6211,7 +6306,7 @@ def delete_of_the_day_json():
from scripts.update_config import remove_category_from_config from scripts.update_config import remove_category_from_config
remove_category_from_config(file_id) remove_category_from_config(file_id)
except Exception as e: except Exception as e:
logger.warning("Could not update config: %s", e) print(f"Warning: Could not update config: {e}")
return jsonify({ return jsonify({
'status': 'success', 'status': 'success',
@@ -6219,6 +6314,7 @@ def delete_of_the_day_json():
}) })
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6233,7 +6329,7 @@ def serve_plugin_static(plugin_id, file_path):
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
# Resolve file path (prevent directory traversal) # Resolve file path (prevent directory traversal)
plugin_dir = Path(plugin_dir).resolve() plugin_dir = Path(plugin_dir).resolve()
@@ -6265,6 +6361,7 @@ def serve_plugin_static(plugin_id, file_path):
return Response(content, mimetype=content_type) return Response(content, mimetype=content_type)
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6323,7 +6420,7 @@ def upload_calendar_credentials():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404 return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
# Save file to plugin directory # Save file to plugin directory
credentials_path = Path(plugin_dir) / 'credentials.json' credentials_path = Path(plugin_dir) / 'credentials.json'
@@ -6347,6 +6444,8 @@ def upload_calendar_credentials():
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in upload_calendar_credentials', exc_info=True) logger.error('Error in upload_calendar_credentials', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6390,6 +6489,7 @@ def delete_plugin_asset():
return jsonify({'status': 'success', 'message': 'Image deleted successfully'}) return jsonify({'status': 'success', 'message': 'Image deleted successfully'})
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6418,6 +6518,7 @@ def list_plugin_assets():
return jsonify({'status': 'success', 'data': {'assets': assets}}) return jsonify({'status': 'success', 'data': {'assets': assets}})
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6542,7 +6643,7 @@ def scan_wifi_networks():
ap_was_active = wifi_manager._is_ap_mode_active() ap_was_active = wifi_manager._is_ap_mode_active()
# Perform the scan (this will handle AP mode disabling/enabling internally) # Perform the scan (this will handle AP mode disabling/enabling internally)
networks, _was_cached = wifi_manager.scan_networks() networks = wifi_manager.scan_networks()
# Convert to dict format # Convert to dict format
networks_data = [ networks_data = [
@@ -6641,7 +6742,10 @@ def connect_wifi():
'message': message or 'Failed to connect to network' 'message': message or 'Failed to connect to network'
}), 400 }), 400
except Exception as e: except Exception as e:
logger.error("Error connecting to WiFi", exc_info=True) import logging
import traceback
logger = logging.getLogger(__name__)
logger.error(f"Error connecting to WiFi: {e}\n{traceback.format_exc()}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
@@ -6667,7 +6771,10 @@ def disconnect_wifi():
'message': message or 'Failed to disconnect from network' 'message': message or 'Failed to disconnect from network'
}), 400 }), 400
except Exception as e: except Exception as e:
logger.error("Error disconnecting from WiFi", exc_info=True) import logging
import traceback
logger = logging.getLogger(__name__)
logger.error(f"Error disconnecting from WiFi: {e}\n{traceback.format_exc()}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
@@ -6680,9 +6787,7 @@ def enable_ap_mode():
from src.wifi_manager import WiFiManager from src.wifi_manager import WiFiManager
wifi_manager = WiFiManager() wifi_manager = WiFiManager()
_force_raw = (request.get_json(silent=True) or {}).get('force', False) success, message = wifi_manager.enable_ap_mode()
force = _force_raw is True or (isinstance(_force_raw, str) and _force_raw.lower() in ('true', '1'))
success, message = wifi_manager.enable_ap_mode(force=force)
if success: if success:
return jsonify({ return jsonify({
@@ -6799,6 +6904,8 @@ def list_cache_files():
} }
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in list_cache_files', exc_info=True) logger.error('Error in list_cache_files', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6825,6 +6932,8 @@ def delete_cache_file():
'message': f'Cache file for key "{cache_key}" deleted successfully' 'message': f'Cache file for key "{cache_key}" deleted successfully'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in delete_cache_file', exc_info=True) logger.error('Error in delete_cache_file', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6866,7 +6975,7 @@ def get_plugin_errors(plugin_id):
try: try:
aggregator = get_error_aggregator() aggregator = get_error_aggregator()
health = aggregator.get_plugin_health(plugin_id) health = aggregator.get_plugin_health(plugin_id)
return success_response(data=health, message="Plugin health retrieved") return success_response(data=health, message=f"Plugin {plugin_id} health retrieved")
except Exception as e: except Exception as e:
logger.error(f"Error getting plugin health for {plugin_id}: {e}", exc_info=True) logger.error(f"Error getting plugin health for {plugin_id}: {e}", exc_info=True)
return error_response( return error_response(
@@ -6939,8 +7048,6 @@ _BACKUP_EXPORT_DIR = PROJECT_ROOT / "config" / "backups" / "exports"
def _safe_backup_path(filename: str) -> Path: def _safe_backup_path(filename: str) -> Path:
"""Resolve a filename to an absolute path inside the export dir, """Resolve a filename to an absolute path inside the export dir,
rejecting any traversal attempts. Returns None if unsafe.""" rejecting any traversal attempts. Returns None if unsafe."""
# Use basename first (CodeQL-recognized sanitizer) then validate format
filename = os.path.basename(filename or '')
if not filename or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$', filename): if not filename or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$', filename):
return None return None
path = (_BACKUP_EXPORT_DIR / filename).resolve() path = (_BACKUP_EXPORT_DIR / filename).resolve()

View File

@@ -2,8 +2,6 @@ from flask import Blueprint, render_template, flash
from markupsafe import escape from markupsafe import escape
import json import json
import logging import logging
import os
import re
from pathlib import Path from pathlib import Path
from src.web_interface.secret_helpers import mask_secret_fields from src.web_interface.secret_helpers import mask_secret_fields
@@ -86,7 +84,7 @@ def load_partial(partial_name):
elif partial_name == 'operation-history': elif partial_name == 'operation-history':
return _load_operation_history_partial() return _load_operation_history_partial()
else: else:
return "Partial not found", 404 return f"Partial '{escape(partial_name)}' not found", 404
except Exception as e: except Exception as e:
logger.error("Error loading partial %s", partial_name, exc_info=True) logger.error("Error loading partial %s", partial_name, exc_info=True)
@@ -219,7 +217,7 @@ def _load_plugins_partial():
plugin_info.update(fresh_manifest) plugin_info.update(fresh_manifest)
except Exception as e: except Exception as e:
# If we can't read the fresh manifest, use the cached one # If we can't read the fresh manifest, use the cached one
logger.warning("Could not read fresh manifest for plugin: %s", plugin_id) logger.warning("Could not read fresh manifest for {plugin_id}")
# Get enabled status from config (source of truth) # Get enabled status from config (source of truth)
# Read from config file first, fall back to plugin instance if config doesn't have the key # Read from config file first, fall back to plugin instance if config doesn't have the key
@@ -355,9 +353,9 @@ def _load_plugin_config_partial(plugin_id):
Load plugin configuration partial - server-side rendered form. Load plugin configuration partial - server-side rendered form.
This replaces the client-side generateConfigForm() JavaScript. This replaces the client-side generateConfigForm() JavaScript.
""" """
# Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format import re as _re
plugin_id = os.path.basename(plugin_id or '') # Reject plugin IDs containing path-traversal characters before any filesystem use
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._\-:]*$', plugin_id): if not _re.match(r'^[a-zA-Z0-9_\-.:]+$', plugin_id or ''):
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400 return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
try: try:
@@ -368,14 +366,6 @@ def _load_plugin_config_partial(plugin_id):
if plugin_id.startswith('starlark:'): if plugin_id.startswith('starlark:'):
return _load_starlark_config_partial(plugin_id[len('starlark:'):]) return _load_starlark_config_partial(plugin_id[len('starlark:'):])
# Resolve and validate all plugin paths against the plugins base directory
_plugins_base = Path(pages_v3.plugin_manager.plugins_dir).resolve()
_plugin_dir = (_plugins_base / plugin_id).resolve()
try:
_plugin_dir.relative_to(_plugins_base)
except ValueError:
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
# Try to get plugin info first # Try to get plugin info first
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id) plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
@@ -385,7 +375,7 @@ def _load_plugin_config_partial(plugin_id):
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id) plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
if not plugin_info: if not plugin_info:
return '<div class="text-red-500 p-4">Plugin not found</div>', 404 return f'<div class="text-red-500 p-4">Plugin "{escape(plugin_id)}" not found</div>', 404
# Get plugin instance (may be None if not loaded) # Get plugin instance (may be None if not loaded)
plugin_instance = pages_v3.plugin_manager.get_plugin(plugin_id) plugin_instance = pages_v3.plugin_manager.get_plugin(plugin_id)
@@ -397,56 +387,59 @@ def _load_plugin_config_partial(plugin_id):
config = full_config.get(plugin_id, {}) config = full_config.get(plugin_id, {})
# Load uploaded images from metadata file if images field exists in schema # Load uploaded images from metadata file if images field exists in schema
schema_path_temp = _plugin_dir / "config_schema.json" # This ensures uploaded images appear even if config hasn't been saved yet
schema_path_temp = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json"
if schema_path_temp.exists(): if schema_path_temp.exists():
try: try:
with open(schema_path_temp, 'r', encoding='utf-8') as f: with open(schema_path_temp, 'r', encoding='utf-8') as f:
temp_schema = json.load(f) temp_schema = json.load(f)
# Check if schema has an images field with x-widget: file-upload
if (temp_schema.get('properties', {}).get('images', {}).get('x-widget') == 'file-upload' or if (temp_schema.get('properties', {}).get('images', {}).get('x-widget') == 'file-upload' or
temp_schema.get('properties', {}).get('images', {}).get('x_widget') == 'file-upload'): temp_schema.get('properties', {}).get('images', {}).get('x_widget') == 'file-upload'):
_assets_base = (Path(__file__).parent.parent.parent / 'assets' / 'plugins').resolve() # Load metadata file
metadata_file = (_assets_base / plugin_id / 'uploads' / '.metadata.json').resolve() # Get PROJECT_ROOT relative to this file
try: project_root = Path(__file__).parent.parent.parent
metadata_file.relative_to(_assets_base) metadata_file = project_root / 'assets' / 'plugins' / plugin_id / 'uploads' / '.metadata.json'
except ValueError: if metadata_file.exists():
metadata_file = None
if metadata_file and metadata_file.exists():
try: try:
with open(metadata_file, 'r', encoding='utf-8') as mf: with open(metadata_file, 'r', encoding='utf-8') as mf:
metadata = json.load(mf) metadata = json.load(mf)
# Convert metadata dict to list of image objects
images_from_metadata = list(metadata.values()) images_from_metadata = list(metadata.values())
# Only use metadata images if config doesn't have images or config images is empty
if not config.get('images') or len(config.get('images', [])) == 0: if not config.get('images') or len(config.get('images', [])) == 0:
config['images'] = images_from_metadata config['images'] = images_from_metadata
else: else:
# Merge: add metadata images that aren't already in config
config_image_ids = {img.get('id') for img in config.get('images', []) if img.get('id')} config_image_ids = {img.get('id') for img in config.get('images', []) if img.get('id')}
new_images = [img for img in images_from_metadata if img.get('id') not in config_image_ids] new_images = [img for img in images_from_metadata if img.get('id') not in config_image_ids]
if new_images: if new_images:
config['images'] = config.get('images', []) + new_images config['images'] = config.get('images', []) + new_images
except Exception as e: except Exception as e:
logger.warning("Could not load plugin upload metadata: %s", e) logger.warning("Could not load metadata for {plugin_id}")
except Exception as e: # nosec B110 - metadata pre-load is optional; schema loads fully below except Exception as e: # nosec B110 - metadata pre-load is optional; schema loads fully below
logger.debug("Metadata pre-load skipped for plugin %s: %s", plugin_id, e) logger.debug("Metadata pre-load skipped for plugin %s: %s", plugin_id, e)
# Get plugin schema # Get plugin schema
schema = {} schema = {}
schema_path = _plugin_dir / "config_schema.json" schema_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json"
if schema_path.exists(): if schema_path.exists():
try: try:
with open(schema_path, 'r', encoding='utf-8') as f: with open(schema_path, 'r', encoding='utf-8') as f:
schema = json.load(f) schema = json.load(f)
except Exception as e: except Exception as e:
logger.warning("Could not load schema for plugin: %s", e) logger.warning("Could not load schema for {plugin_id}")
# Get web UI actions from plugin manifest # Get web UI actions from plugin manifest
web_ui_actions = [] web_ui_actions = []
manifest_path = _plugin_dir / "manifest.json" manifest_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "manifest.json"
if manifest_path.exists(): if manifest_path.exists():
try: try:
with open(manifest_path, 'r', encoding='utf-8') as f: with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f) manifest = json.load(f)
web_ui_actions = manifest.get('web_ui_actions', []) web_ui_actions = manifest.get('web_ui_actions', [])
except Exception as e: except Exception as e:
logger.warning("Could not load manifest for plugin: %s", e) logger.warning("Could not load manifest for {plugin_id}")
# Mask secret fields before rendering template (fail closed — never leak secrets) # Mask secret fields before rendering template (fail closed — never leak secrets)
schema_properties = schema.get('properties') if isinstance(schema, dict) else None schema_properties = schema.get('properties') if isinstance(schema, dict) else None
@@ -488,18 +481,13 @@ def _load_plugin_config_partial(plugin_id):
def _load_starlark_config_partial(app_id): def _load_starlark_config_partial(app_id):
"""Load configuration partial for a Starlark app.""" """Load configuration partial for a Starlark app."""
# Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format
app_id = os.path.basename(app_id or '')
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_\-]*$', app_id):
return '<div class="text-red-500 p-4">Invalid app ID</div>', 400
try: try:
starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None
if starlark_plugin and hasattr(starlark_plugin, 'apps'): if starlark_plugin and hasattr(starlark_plugin, 'apps'):
app = starlark_plugin.apps.get(app_id) app = starlark_plugin.apps.get(app_id)
if not app: if not app:
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404 return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
return render_template( return render_template(
'v3/partials/starlark_config.html', 'v3/partials/starlark_config.html',
app_id=app_id, app_id=app_id,
@@ -515,45 +503,36 @@ def _load_starlark_config_partial(app_id):
) )
# Standalone: read from manifest file # Standalone: read from manifest file
starlark_base = (Path(__file__).resolve().parent.parent.parent / 'starlark-apps').resolve() manifest_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / 'manifest.json'
manifest_file = starlark_base / 'manifest.json'
if not manifest_file.exists(): if not manifest_file.exists():
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404 return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
with open(manifest_file, 'r') as f: with open(manifest_file, 'r') as f:
manifest = json.load(f) manifest = json.load(f)
app_data = manifest.get('apps', {}).get(app_id) app_data = manifest.get('apps', {}).get(app_id)
if not app_data: if not app_data:
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404 return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
# Load schema from schema.json if it exists — validate path stays within starlark_base # Load schema from schema.json if it exists
schema = None schema = None
schema_file = (starlark_base / app_id / 'schema.json').resolve() schema_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'schema.json'
try: if schema_file.exists():
schema_file.relative_to(starlark_base)
except ValueError:
schema_file = None
if schema_file and schema_file.exists():
try: try:
with open(schema_file, 'r') as f: with open(schema_file, 'r') as f:
schema = json.load(f) schema = json.load(f)
except (OSError, json.JSONDecodeError) as e: except (OSError, json.JSONDecodeError) as e:
logger.warning("Could not load starlark schema for app: %s", e) logger.warning(f"[Pages V3] Could not load schema for {app_id}: {e}", exc_info=True)
# Load config from config.json if it exists — validate path stays within starlark_base # Load config from config.json if it exists
config = {} config = {}
config_file = (starlark_base / app_id / 'config.json').resolve() config_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'config.json'
try: if config_file.exists():
config_file.relative_to(starlark_base)
except ValueError:
config_file = None
if config_file and config_file.exists():
try: try:
with open(config_file, 'r') as f: with open(config_file, 'r') as f:
config = json.load(f) config = json.load(f)
except (OSError, json.JSONDecodeError) as e: except (OSError, json.JSONDecodeError) as e:
logger.warning("Could not load starlark config for app: %s", e) logger.warning(f"[Pages V3] Could not load config for {app_id}: {e}", exc_info=True)
return render_template( return render_template(
'v3/partials/starlark_config.html', 'v3/partials/starlark_config.html',
@@ -570,5 +549,5 @@ def _load_starlark_config_partial(app_id):
) )
except Exception as e: except Exception as e:
logger.error("[Pages V3] Error loading starlark config for app", exc_info=True) logger.exception(f"[Pages V3] Error loading starlark config for {app_id}")
return '<div class="text-red-500 p-4">Error loading starlark config; see logs for details</div>', 500 return f'<div class="text-red-500 p-4">Error loading starlark config: {str(e)}</div>', 500