mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-06-20 11:38:37 +00:00
Compare commits
13 Commits
fix/wifi-a
...
909db0993f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
909db0993f | ||
|
|
1d2303e620 | ||
|
|
8652aacf37 | ||
|
|
76507014ce | ||
|
|
53806da8c5 | ||
|
|
3d4de89fd5 | ||
|
|
505fed70e3 | ||
|
|
c8d2eaeb85 | ||
|
|
745ba8101e | ||
|
|
ddc53ff1e0 | ||
|
|
2cd3dbabe5 | ||
|
|
f4e7fea7bb | ||
|
|
a5c7ef20ec |
@@ -67,9 +67,8 @@ def main():
|
|||||||
print(" 📍 Will run on: http://0.0.0.0:5000")
|
print(" 📍 Will run on: http://0.0.0.0:5000")
|
||||||
print(" ⏹️ Press Ctrl+C to stop")
|
print(" ⏹️ Press Ctrl+C to stop")
|
||||||
|
|
||||||
# Run the app (debug mode controlled by env var to satisfy security scanners)
|
# Run the app (this should start the server)
|
||||||
_debug = os.environ.get('LEDMATRIX_FLASK_DEBUG', '0') == '1'
|
app.run(host='0.0.0.0', port=5000, debug=True)
|
||||||
app.run(host='0.0.0.0', port=5000, debug=_debug)
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("\n ⏹️ Server stopped by user")
|
print("\n ⏹️ Server stopped by user")
|
||||||
|
|||||||
@@ -410,8 +410,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
|
|||||||
try:
|
try:
|
||||||
manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8")
|
manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8")
|
||||||
manifest = json.loads(manifest_raw)
|
manifest = json.loads(manifest_raw)
|
||||||
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
|
except (OSError, UnicodeDecodeError, json.JSONDecodeError) as e:
|
||||||
return False, "Invalid manifest.json", {}
|
return False, f"Invalid manifest.json: {e}", {}
|
||||||
|
|
||||||
if not isinstance(manifest, dict) or "schema_version" not in manifest:
|
if not isinstance(manifest, dict) or "schema_version" not in manifest:
|
||||||
return False, "Invalid manifest structure", {}
|
return False, "Invalid manifest structure", {}
|
||||||
@@ -456,8 +456,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
|
|||||||
return True, "", result_manifest
|
return True, "", result_manifest
|
||||||
except zipfile.BadZipFile:
|
except zipfile.BadZipFile:
|
||||||
return False, "File is not a valid ZIP archive", {}
|
return False, "File is not a valid ZIP archive", {}
|
||||||
except OSError:
|
except OSError as e:
|
||||||
return False, "Could not read backup", {}
|
return False, f"Could not read backup: {e}", {}
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ Extracted from PluginManager to improve separation of concerns.
|
|||||||
import json
|
import json
|
||||||
import importlib
|
import importlib
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
import threading
|
import threading
|
||||||
@@ -69,11 +68,6 @@ class PluginLoader:
|
|||||||
Returns:
|
Returns:
|
||||||
Path to plugin directory or None if not found
|
Path to plugin directory or None if not found
|
||||||
"""
|
"""
|
||||||
# Sanitize plugin_id — os.path.basename is a CodeQL-recognized path sanitizer
|
|
||||||
plugin_id = os.path.basename(plugin_id or '')
|
|
||||||
if not plugin_id:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Strategy 1: Use mapping from discovery
|
# Strategy 1: Use mapping from discovery
|
||||||
if plugin_directories and plugin_id in plugin_directories:
|
if plugin_directories and plugin_id in plugin_directories:
|
||||||
plugin_dir = plugin_directories[plugin_id]
|
plugin_dir = plugin_directories[plugin_id]
|
||||||
@@ -81,16 +75,14 @@ class PluginLoader:
|
|||||||
self.logger.debug("Using plugin directory from discovery mapping: %s", plugin_dir)
|
self.logger.debug("Using plugin directory from discovery mapping: %s", plugin_dir)
|
||||||
return plugin_dir
|
return plugin_dir
|
||||||
|
|
||||||
# Strategy 2: Direct paths — resolve and validate they stay within plugins_dir
|
# Strategy 2: Direct paths
|
||||||
plugins_dir_resolved = plugins_dir.resolve()
|
plugin_dir = plugins_dir / plugin_id
|
||||||
for _candidate_name in (plugin_id, f"ledmatrix-{plugin_id}"):
|
if plugin_dir.exists():
|
||||||
_candidate = (plugins_dir_resolved / _candidate_name).resolve()
|
return plugin_dir
|
||||||
try:
|
|
||||||
_candidate.relative_to(plugins_dir_resolved)
|
plugin_dir = plugins_dir / f"ledmatrix-{plugin_id}"
|
||||||
except ValueError:
|
if plugin_dir.exists():
|
||||||
continue
|
return plugin_dir
|
||||||
if _candidate.exists():
|
|
||||||
return _candidate
|
|
||||||
|
|
||||||
# Strategy 3: Case-insensitive search
|
# Strategy 3: Case-insensitive search
|
||||||
normalized_id = plugin_id.lower()
|
normalized_id = plugin_id.lower()
|
||||||
@@ -151,19 +143,21 @@ class PluginLoader:
|
|||||||
Returns:
|
Returns:
|
||||||
True if dependencies installed or not needed, False on error
|
True if dependencies installed or not needed, False on error
|
||||||
"""
|
"""
|
||||||
plugin_id = os.path.basename(plugin_id or '')
|
requirements_file = plugin_dir / "requirements.txt"
|
||||||
if not plugin_id:
|
if not requirements_file.exists():
|
||||||
return False
|
return True # No dependencies needed
|
||||||
# Resolve and validate plugin_dir before constructing any derived paths
|
|
||||||
|
# Resolve and validate plugin_dir before constructing derived paths from it
|
||||||
try:
|
try:
|
||||||
plugin_dir_resolved = plugin_dir.resolve(strict=True)
|
plugin_dir_resolved = plugin_dir.resolve(strict=True)
|
||||||
except OSError:
|
except OSError:
|
||||||
self.logger.error("Plugin directory does not exist: %s", plugin_dir)
|
self.logger.error("Plugin directory does not exist: %s", plugin_dir)
|
||||||
return False
|
return False
|
||||||
requirements_file = plugin_dir_resolved / "requirements.txt"
|
|
||||||
if not requirements_file.exists():
|
|
||||||
return True # No dependencies needed
|
|
||||||
marker_path = plugin_dir_resolved / ".dependencies_installed"
|
marker_path = plugin_dir_resolved / ".dependencies_installed"
|
||||||
|
try:
|
||||||
|
marker_path.relative_to(plugin_dir_resolved)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
# Check if already installed
|
# Check if already installed
|
||||||
if marker_path.exists():
|
if marker_path.exists():
|
||||||
@@ -380,20 +374,9 @@ class PluginLoader:
|
|||||||
Returns:
|
Returns:
|
||||||
Loaded module or None on error
|
Loaded module or None on error
|
||||||
"""
|
"""
|
||||||
plugin_id = os.path.basename(plugin_id or '')
|
entry_file = plugin_dir / entry_point
|
||||||
if not plugin_id:
|
|
||||||
raise PluginError("Invalid plugin ID")
|
|
||||||
try:
|
|
||||||
plugin_dir_resolved = plugin_dir.resolve(strict=True)
|
|
||||||
except OSError:
|
|
||||||
raise PluginError("Plugin directory not found", plugin_id=plugin_id)
|
|
||||||
entry_file = (plugin_dir_resolved / entry_point).resolve()
|
|
||||||
try:
|
|
||||||
entry_file.relative_to(plugin_dir_resolved)
|
|
||||||
except ValueError:
|
|
||||||
raise PluginError("Invalid entry point path", plugin_id=plugin_id)
|
|
||||||
if not entry_file.exists():
|
if not entry_file.exists():
|
||||||
error_msg = f"Entry point file not found for plugin {plugin_id}"
|
error_msg = f"Entry point file not found: {entry_file} for plugin {plugin_id}"
|
||||||
self.logger.error(error_msg)
|
self.logger.error(error_msg)
|
||||||
raise PluginError(error_msg, plugin_id=plugin_id, context={'entry_file': str(entry_file)})
|
raise PluginError(error_msg, plugin_id=plugin_id, context={'entry_file': str(entry_file)})
|
||||||
|
|
||||||
|
|||||||
@@ -150,18 +150,6 @@ class WiFiManager:
|
|||||||
logger.info(f"WiFi Manager initialized - nmcli: {self.has_nmcli}, iwlist: {self.has_iwlist}, "
|
logger.info(f"WiFi Manager initialized - nmcli: {self.has_nmcli}, iwlist: {self.has_iwlist}, "
|
||||||
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
|
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
|
||||||
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
|
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
|
||||||
|
|
||||||
# Once per process: remove a stale force-AP flag left by a prior crash.
|
|
||||||
# Guard with a class-level flag so the nmcli AP-state check only runs
|
|
||||||
# once even though WiFiManager is instantiated per-request.
|
|
||||||
if not WiFiManager._startup_cleanup_done:
|
|
||||||
WiFiManager._startup_cleanup_done = True
|
|
||||||
if self._FORCE_AP_FLAG_PATH.exists() and not self._is_ap_mode_active():
|
|
||||||
try:
|
|
||||||
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
|
|
||||||
logger.debug("Removed stale force-AP flag on startup (AP not active)")
|
|
||||||
except OSError as exc:
|
|
||||||
logger.warning(f"Could not remove stale force-AP flag: {exc}")
|
|
||||||
|
|
||||||
def _show_led_message(self, message: str, duration: int = 5):
|
def _show_led_message(self, message: str, duration: int = 5):
|
||||||
"""
|
"""
|
||||||
@@ -486,10 +474,7 @@ class WiFiManager:
|
|||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
for line in result.stdout.strip().split('\n'):
|
for line in result.stdout.strip().split('\n'):
|
||||||
if '/' in line:
|
if '/' in line:
|
||||||
# nmcli -t output is "IP4.ADDRESS[1]:x.x.x.x/prefix";
|
ip_address = line.split('/')[0].strip()
|
||||||
# bare "x.x.x.x/prefix" is also accepted defensively.
|
|
||||||
_, sep, rest = line.partition(':')
|
|
||||||
ip_address = (rest if sep else line).split('/')[0].strip()
|
|
||||||
break
|
break
|
||||||
|
|
||||||
# Final fallback: Get signal strength by matching SSID in WiFi list
|
# Final fallback: Get signal strength by matching SSID in WiFi list
|
||||||
@@ -515,13 +500,6 @@ class WiFiManager:
|
|||||||
|
|
||||||
# Check if AP mode is active
|
# Check if AP mode is active
|
||||||
ap_active = self._is_ap_mode_active()
|
ap_active = self._is_ap_mode_active()
|
||||||
# wlan0 shows as "connected" in AP mode; clear client-station fields so
|
|
||||||
# callers don't mistake the AP for an outbound WiFi connection.
|
|
||||||
if ap_active and wifi_connected:
|
|
||||||
wifi_connected = False
|
|
||||||
ssid = None
|
|
||||||
ip_address = None
|
|
||||||
logger.debug(f"{wlan_device} is in AP mode — overriding wifi_connected to False")
|
|
||||||
|
|
||||||
return WiFiStatus(
|
return WiFiStatus(
|
||||||
connected=wifi_connected,
|
connected=wifi_connected,
|
||||||
@@ -712,10 +690,6 @@ class WiFiManager:
|
|||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi
|
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi
|
||||||
# Written when AP mode is manually force-enabled; prevents daemon auto-disable
|
|
||||||
_FORCE_AP_FLAG_PATH = Path("/tmp/ledmatrix_force_ap_active") # nosec B108 - process-specific named file; device is single-user RPi
|
|
||||||
# Ensures the startup stale-flag cleanup runs once per process, not per instantiation
|
|
||||||
_startup_cleanup_done: bool = False
|
|
||||||
|
|
||||||
def _validate_ap_config(self) -> Tuple[str, int]:
|
def _validate_ap_config(self) -> Tuple[str, int]:
|
||||||
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
|
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
|
||||||
@@ -1393,7 +1367,7 @@ class WiFiManager:
|
|||||||
logger.error(f"Failed to restore original connection: {original_ssid}")
|
logger.error(f"Failed to restore original connection: {original_ssid}")
|
||||||
# Trigger AP mode as last resort
|
# Trigger AP mode as last resort
|
||||||
self._show_led_message("Enabling AP mode...", duration=5)
|
self._show_led_message("Enabling AP mode...", duration=5)
|
||||||
ap_success, ap_msg = self.enable_ap_mode(force=True)
|
ap_success, ap_msg = self.enable_ap_mode()
|
||||||
if ap_success:
|
if ap_success:
|
||||||
logger.info("AP mode enabled as failsafe")
|
logger.info("AP mode enabled as failsafe")
|
||||||
return False, "Connection failed and restoration failed. AP mode enabled."
|
return False, "Connection failed and restoration failed. AP mode enabled."
|
||||||
@@ -1405,7 +1379,7 @@ class WiFiManager:
|
|||||||
elif not success:
|
elif not success:
|
||||||
logger.warning(f"Connection to {ssid} failed and no original connection to restore")
|
logger.warning(f"Connection to {ssid} failed and no original connection to restore")
|
||||||
self._show_led_message("Enabling AP mode...", duration=5)
|
self._show_led_message("Enabling AP mode...", duration=5)
|
||||||
ap_success, ap_msg = self.enable_ap_mode(force=True)
|
ap_success, ap_msg = self.enable_ap_mode()
|
||||||
if ap_success:
|
if ap_success:
|
||||||
logger.info("AP mode enabled as failsafe")
|
logger.info("AP mode enabled as failsafe")
|
||||||
return False, "Connection failed. AP mode enabled."
|
return False, "Connection failed. AP mode enabled."
|
||||||
@@ -1426,7 +1400,7 @@ class WiFiManager:
|
|||||||
logger.error(f"Failed to restore after exception: {restore_error}")
|
logger.error(f"Failed to restore after exception: {restore_error}")
|
||||||
# Last resort: enable AP mode
|
# Last resort: enable AP mode
|
||||||
try:
|
try:
|
||||||
self.enable_ap_mode(force=True)
|
self.enable_ap_mode()
|
||||||
except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging
|
except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging
|
||||||
logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True)
|
logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True)
|
||||||
return False, str(e)
|
return False, str(e)
|
||||||
@@ -1490,29 +1464,26 @@ class WiFiManager:
|
|||||||
# Show LED message
|
# Show LED message
|
||||||
self._show_led_message(f"Connecting to {ssid}...", duration=10)
|
self._show_led_message(f"Connecting to {ssid}...", duration=10)
|
||||||
|
|
||||||
# Find existing NM connection for this SSID.
|
# First, check if connection already exists and try to activate it
|
||||||
# 802-11-wireless.ssid is not a valid column in 'nmcli connection show',
|
# NetworkManager connection names might not match SSID exactly, so search by SSID
|
||||||
# so list all wifi connections then query each one's SSID individually.
|
check_result = subprocess.run(
|
||||||
list_result = subprocess.run( # nosec B603 B607 - fixed args, no user input
|
["nmcli", "-t", "-f", "NAME,802-11-wireless.ssid", "connection", "show"],
|
||||||
["nmcli", "-t", "-f", "NAME,TYPE", "connection", "show"],
|
capture_output=True,
|
||||||
capture_output=True, text=True, timeout=5
|
text=True,
|
||||||
|
timeout=5
|
||||||
)
|
)
|
||||||
|
|
||||||
existing_conn_name = None
|
existing_conn_name = None
|
||||||
if list_result.returncode == 0:
|
if check_result.returncode == 0:
|
||||||
for line in list_result.stdout.strip().split('\n'):
|
for line in check_result.stdout.strip().split('\n'):
|
||||||
if ':' not in line:
|
if ':' in line:
|
||||||
continue
|
parts = line.split(':')
|
||||||
parts = line.split(':')
|
if len(parts) >= 2:
|
||||||
if len(parts) < 2 or parts[1].strip() != '802-11-wireless':
|
conn_name = parts[0].strip()
|
||||||
continue
|
conn_ssid = parts[1].strip() if len(parts) > 1 else ""
|
||||||
conn_name = parts[0].strip()
|
if conn_ssid == ssid:
|
||||||
ssid_r = subprocess.run( # nosec B603 B607 - conn_name from nmcli output, not user input
|
existing_conn_name = conn_name
|
||||||
["nmcli", "-g", "802-11-wireless.ssid", "connection", "show", conn_name],
|
break
|
||||||
capture_output=True, text=True, timeout=5
|
|
||||||
)
|
|
||||||
if ssid_r.returncode == 0 and ssid_r.stdout.strip() == ssid:
|
|
||||||
existing_conn_name = conn_name
|
|
||||||
break
|
|
||||||
|
|
||||||
# Also try direct lookup by SSID (in case connection name matches SSID)
|
# Also try direct lookup by SSID (in case connection name matches SSID)
|
||||||
if not existing_conn_name:
|
if not existing_conn_name:
|
||||||
@@ -1884,7 +1855,7 @@ class WiFiManager:
|
|||||||
logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts")
|
logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def enable_ap_mode(self, force: bool = False) -> Tuple[bool, str]:
|
def enable_ap_mode(self) -> Tuple[bool, str]:
|
||||||
"""
|
"""
|
||||||
Enable access point mode
|
Enable access point mode
|
||||||
|
|
||||||
@@ -1906,29 +1877,20 @@ class WiFiManager:
|
|||||||
if not self._ensure_wifi_radio_enabled():
|
if not self._ensure_wifi_radio_enabled():
|
||||||
return False, "WiFi radio is disabled and could not be enabled"
|
return False, "WiFi radio is disabled and could not be enabled"
|
||||||
|
|
||||||
# Check if WiFi is connected (skip when force=True)
|
# Check if WiFi is connected
|
||||||
status = self.get_wifi_status()
|
status = self.get_wifi_status()
|
||||||
if not force and status.connected:
|
if status.connected:
|
||||||
return False, "Cannot enable AP mode while WiFi is connected"
|
return False, "Cannot enable AP mode while WiFi is connected"
|
||||||
|
|
||||||
# Check if Ethernet is connected (skip when force=True)
|
# Check if Ethernet is connected
|
||||||
if not force and self._is_ethernet_connected():
|
if self._is_ethernet_connected():
|
||||||
return False, "Cannot enable AP mode while Ethernet is connected"
|
return False, "Cannot enable AP mode while Ethernet is connected"
|
||||||
|
|
||||||
if force:
|
|
||||||
logger.debug(f"enable_ap_mode: force=True — WiFi/Ethernet guards bypassed; will create {self._FORCE_AP_FLAG_PATH}")
|
|
||||||
|
|
||||||
# Try hostapd/dnsmasq first (captive portal mode)
|
# Try hostapd/dnsmasq first (captive portal mode)
|
||||||
if self.has_hostapd and self.has_dnsmasq:
|
if self.has_hostapd and self.has_dnsmasq:
|
||||||
result = self._enable_ap_mode_hostapd()
|
result = self._enable_ap_mode_hostapd()
|
||||||
if result[0]:
|
if result[0]:
|
||||||
self._ap_enabled_at = time.time()
|
self._ap_enabled_at = time.time()
|
||||||
if force:
|
|
||||||
try:
|
|
||||||
self._FORCE_AP_FLAG_PATH.touch()
|
|
||||||
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
|
|
||||||
except OSError as exc:
|
|
||||||
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# Fallback to nmcli hotspot (simpler, no captive portal)
|
# Fallback to nmcli hotspot (simpler, no captive portal)
|
||||||
@@ -1938,12 +1900,6 @@ class WiFiManager:
|
|||||||
result = self._enable_ap_mode_nmcli_hotspot()
|
result = self._enable_ap_mode_nmcli_hotspot()
|
||||||
if result[0]:
|
if result[0]:
|
||||||
self._ap_enabled_at = time.time()
|
self._ap_enabled_at = time.time()
|
||||||
if force:
|
|
||||||
try:
|
|
||||||
self._FORCE_AP_FLAG_PATH.touch()
|
|
||||||
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
|
|
||||||
except OSError as exc:
|
|
||||||
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
|
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
|
||||||
@@ -2135,14 +2091,8 @@ class WiFiManager:
|
|||||||
self._clear_led_message()
|
self._clear_led_message()
|
||||||
return False, "AP started but captive-portal redirect setup failed"
|
return False, "AP started but captive-portal redirect setup failed"
|
||||||
|
|
||||||
# Verify the AP is actually running (retry up to 5x with 2s delay for NM async activation)
|
# Verify the AP is actually running
|
||||||
status = {}
|
status = self._get_ap_status_nmcli()
|
||||||
for _attempt in range(5):
|
|
||||||
status = self._get_ap_status_nmcli()
|
|
||||||
if status.get('active'):
|
|
||||||
break
|
|
||||||
logger.debug(f"AP verification attempt {_attempt + 1}/5 not yet active, waiting 2s")
|
|
||||||
time.sleep(2)
|
|
||||||
if status.get('active'):
|
if status.get('active'):
|
||||||
ip = status.get('ip', '192.168.4.1')
|
ip = status.get('ip', '192.168.4.1')
|
||||||
logger.info(f"AP mode confirmed active at {ip} (open network, no password)")
|
logger.info(f"AP mode confirmed active at {ip} (open network, no password)")
|
||||||
@@ -2340,7 +2290,6 @@ class WiFiManager:
|
|||||||
logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
|
logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
|
||||||
|
|
||||||
self._ap_enabled_at = None
|
self._ap_enabled_at = None
|
||||||
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
|
|
||||||
logger.info("AP mode disabled successfully")
|
logger.info("AP mode disabled successfully")
|
||||||
return True, "AP mode disabled"
|
return True, "AP mode disabled"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -2529,29 +2478,22 @@ address=/detectportal.firefox.com/192.168.4.1
|
|||||||
else:
|
else:
|
||||||
logger.warning(f"Failed to enable AP mode: {message}")
|
logger.warning(f"Failed to enable AP mode: {message}")
|
||||||
elif not should_have_ap and ap_active:
|
elif not should_have_ap and ap_active:
|
||||||
# Should not have AP but do - check if it was manually force-enabled
|
# Should not have AP but do - disable AP mode
|
||||||
force_active = self._FORCE_AP_FLAG_PATH.exists()
|
# Always disable if WiFi or Ethernet connects, regardless of auto_enable setting
|
||||||
if status.connected:
|
if status.connected or ethernet_connected:
|
||||||
# WiFi connected: always disable AP (user successfully configured WiFi)
|
|
||||||
success, message = self.disable_ap_mode()
|
success, message = self.disable_ap_mode()
|
||||||
if success:
|
if success:
|
||||||
logger.info("Auto-disabled AP mode (WiFi connected)")
|
if status.connected:
|
||||||
self._disconnected_checks = 0
|
logger.info("Auto-disabled AP mode (WiFi connected)")
|
||||||
|
elif ethernet_connected:
|
||||||
|
logger.info("Auto-disabled AP mode (Ethernet connected)")
|
||||||
|
self._disconnected_checks = 0 # Reset counter
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Failed to auto-disable AP mode: {message}")
|
logger.warning(f"Failed to auto-disable AP mode: {message}")
|
||||||
elif ethernet_connected and not force_active:
|
|
||||||
# Ethernet connected, AP not manually forced: auto-disable
|
|
||||||
success, message = self.disable_ap_mode()
|
|
||||||
if success:
|
|
||||||
logger.info("Auto-disabled AP mode (Ethernet connected)")
|
|
||||||
self._disconnected_checks = 0
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logger.warning(f"Failed to auto-disable AP mode: {message}")
|
|
||||||
elif ethernet_connected and force_active:
|
|
||||||
logger.debug("AP mode is force-active; Ethernet connected but auto-disable suppressed")
|
|
||||||
elif not auto_enable:
|
elif not auto_enable:
|
||||||
|
# AP is active but auto_enable is disabled - this means it was manually enabled
|
||||||
|
# Don't disable it automatically, let it stay active
|
||||||
logger.debug("AP mode is active (manually enabled), keeping active")
|
logger.debug("AP mode is active (manually enabled), keeping active")
|
||||||
|
|
||||||
# Idle-timeout check: disable AP if no client has connected within the window.
|
# Idle-timeout check: disable AP if no client has connected within the window.
|
||||||
|
|||||||
@@ -1,342 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/base_classes/api_extractors.py
|
|
||||||
|
|
||||||
Covers ESPNFootballExtractor, ESPNBaseballExtractor, ESPNHockeyExtractor,
|
|
||||||
SoccerAPIExtractor, and the shared _extract_common_details logic.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import pytest
|
|
||||||
from src.base_classes.api_extractors import (
|
|
||||||
ESPNFootballExtractor,
|
|
||||||
ESPNBaseballExtractor,
|
|
||||||
ESPNHockeyExtractor,
|
|
||||||
SoccerAPIExtractor,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Shared test data factories
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _make_espn_event(state: str = "in", home_abbr: str = "KC", away_abbr: str = "BUF",
|
|
||||||
home_score: str = "14", away_score: str = "7",
|
|
||||||
date_str: str = "2024-01-15T20:00:00Z",
|
|
||||||
include_situation: bool = False,
|
|
||||||
situation: dict | None = None,
|
|
||||||
status_detail: str = "2nd Qtr 8:42",
|
|
||||||
period: int = 2) -> dict:
|
|
||||||
"""Build a minimal ESPN-style game event dict."""
|
|
||||||
comp_status = {
|
|
||||||
"type": {
|
|
||||||
"state": state,
|
|
||||||
"shortDetail": status_detail,
|
|
||||||
"detail": status_detail,
|
|
||||||
"name": "STATUS_IN_PROGRESS",
|
|
||||||
},
|
|
||||||
"period": period,
|
|
||||||
"displayClock": "8:42",
|
|
||||||
}
|
|
||||||
comp = {
|
|
||||||
"status": comp_status,
|
|
||||||
"competitors": [
|
|
||||||
{
|
|
||||||
"homeAway": "home",
|
|
||||||
"team": {"abbreviation": home_abbr, "displayName": f"{home_abbr} Team"},
|
|
||||||
"score": home_score,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"homeAway": "away",
|
|
||||||
"team": {"abbreviation": away_abbr, "displayName": f"{away_abbr} Team"},
|
|
||||||
"score": away_score,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
}
|
|
||||||
if include_situation:
|
|
||||||
comp["situation"] = situation or {}
|
|
||||||
return {
|
|
||||||
"id": "test-game-1",
|
|
||||||
"date": date_str,
|
|
||||||
"competitions": [comp],
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _make_logger() -> logging.Logger:
|
|
||||||
return logging.getLogger("test_extractor")
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# ESPNFootballExtractor
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestESPNFootballExtractor:
|
|
||||||
def setup_method(self):
|
|
||||||
self.extractor = ESPNFootballExtractor(_make_logger())
|
|
||||||
|
|
||||||
def test_extract_live_game_basic_fields(self):
|
|
||||||
event = _make_espn_event(state="in", home_score="14", away_score="7")
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result is not None
|
|
||||||
assert result["home_abbr"] == "KC"
|
|
||||||
assert result["away_abbr"] == "BUF"
|
|
||||||
assert result["home_score"] == "14"
|
|
||||||
assert result["away_score"] == "7"
|
|
||||||
assert result["is_live"] is True
|
|
||||||
assert result["is_final"] is False
|
|
||||||
assert result["is_upcoming"] is False
|
|
||||||
|
|
||||||
def test_extract_final_game(self):
|
|
||||||
event = _make_espn_event(state="post")
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result is not None
|
|
||||||
assert result["is_final"] is True
|
|
||||||
assert result["is_live"] is False
|
|
||||||
|
|
||||||
def test_extract_upcoming_game(self):
|
|
||||||
event = _make_espn_event(state="pre")
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result is not None
|
|
||||||
assert result["is_upcoming"] is True
|
|
||||||
|
|
||||||
def test_sport_specific_fields_default_when_pregame(self):
|
|
||||||
event = _make_espn_event(state="pre")
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert "down" in fields
|
|
||||||
assert "distance" in fields
|
|
||||||
assert "possession" in fields
|
|
||||||
assert "is_redzone" in fields
|
|
||||||
assert fields["is_redzone"] is False
|
|
||||||
|
|
||||||
def test_sport_specific_fields_live_with_situation(self):
|
|
||||||
situation = {
|
|
||||||
"down": 3,
|
|
||||||
"distance": 7,
|
|
||||||
"possession": "KC",
|
|
||||||
"isRedZone": True,
|
|
||||||
"homeTimeouts": 2,
|
|
||||||
"awayTimeouts": 1,
|
|
||||||
}
|
|
||||||
event = _make_espn_event(state="in", include_situation=True, situation=situation)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["down"] == 3
|
|
||||||
assert fields["distance"] == 7
|
|
||||||
assert fields["is_redzone"] is True
|
|
||||||
assert fields["home_timeouts"] == 2
|
|
||||||
assert fields["away_timeouts"] == 1
|
|
||||||
|
|
||||||
def test_scoring_event_detected(self):
|
|
||||||
# situation must be non-empty (truthy) for the live block to execute
|
|
||||||
situation = {"down": 1, "distance": 10}
|
|
||||||
event = _make_espn_event(
|
|
||||||
state="in",
|
|
||||||
include_situation=True,
|
|
||||||
situation=situation,
|
|
||||||
status_detail="touchdown scored",
|
|
||||||
)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert "touchdown" in fields.get("scoring_event", "").lower()
|
|
||||||
|
|
||||||
def test_returns_none_on_empty_event(self):
|
|
||||||
assert self.extractor.extract_game_details({}) is None
|
|
||||||
|
|
||||||
def test_returns_none_when_teams_missing(self):
|
|
||||||
event = {
|
|
||||||
"id": "x",
|
|
||||||
"date": "2024-01-15T20:00:00Z",
|
|
||||||
"competitions": [
|
|
||||||
{
|
|
||||||
"status": {"type": {"state": "in", "shortDetail": "", "detail": "", "name": ""}},
|
|
||||||
"competitors": [], # no competitors
|
|
||||||
}
|
|
||||||
],
|
|
||||||
}
|
|
||||||
assert self.extractor.extract_game_details(event) is None
|
|
||||||
|
|
||||||
def test_date_z_suffix_parsed(self):
|
|
||||||
event = _make_espn_event(date_str="2024-01-15T20:00:00Z")
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
# Should not raise and should return a result
|
|
||||||
assert result is not None
|
|
||||||
|
|
||||||
def test_id_propagated(self):
|
|
||||||
event = _make_espn_event()
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result["id"] == "test-game-1"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# ESPNBaseballExtractor
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestESPNBaseballExtractor:
|
|
||||||
def setup_method(self):
|
|
||||||
self.extractor = ESPNBaseballExtractor(_make_logger())
|
|
||||||
|
|
||||||
def test_extract_live_game(self):
|
|
||||||
event = _make_espn_event(
|
|
||||||
state="in", home_abbr="NYY", away_abbr="BOS",
|
|
||||||
home_score="3", away_score="2"
|
|
||||||
)
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result is not None
|
|
||||||
assert result["home_abbr"] == "NYY"
|
|
||||||
assert result["is_live"] is True
|
|
||||||
|
|
||||||
def test_baseball_sport_fields_defaults(self):
|
|
||||||
event = _make_espn_event(state="pre")
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert "inning" in fields
|
|
||||||
assert "outs" in fields
|
|
||||||
assert "bases" in fields
|
|
||||||
assert "strikes" in fields
|
|
||||||
assert "balls" in fields
|
|
||||||
|
|
||||||
def test_baseball_sport_fields_live(self):
|
|
||||||
situation = {
|
|
||||||
"inning": 7,
|
|
||||||
"outs": 2,
|
|
||||||
"bases": "110",
|
|
||||||
"strikes": 2,
|
|
||||||
"balls": 3,
|
|
||||||
"pitcher": "Smith",
|
|
||||||
"batter": "Jones",
|
|
||||||
}
|
|
||||||
event = _make_espn_event(state="in", include_situation=True, situation=situation)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["inning"] == 7
|
|
||||||
assert fields["outs"] == 2
|
|
||||||
assert fields["strikes"] == 2
|
|
||||||
assert fields["pitcher"] == "Smith"
|
|
||||||
|
|
||||||
def test_returns_none_on_empty(self):
|
|
||||||
assert self.extractor.extract_game_details({}) is None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# ESPNHockeyExtractor
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestESPNHockeyExtractor:
|
|
||||||
def setup_method(self):
|
|
||||||
self.extractor = ESPNHockeyExtractor(_make_logger())
|
|
||||||
|
|
||||||
def test_extract_live_game(self):
|
|
||||||
event = _make_espn_event(
|
|
||||||
state="in", home_abbr="BOS", away_abbr="TOR",
|
|
||||||
home_score="2", away_score="1"
|
|
||||||
)
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result is not None
|
|
||||||
assert result["is_live"] is True
|
|
||||||
|
|
||||||
def test_hockey_period_text_p1(self):
|
|
||||||
situation = {"isPowerPlay": False}
|
|
||||||
event = _make_espn_event(
|
|
||||||
state="in", include_situation=True, situation=situation, period=1
|
|
||||||
)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["period_text"] == "P1"
|
|
||||||
|
|
||||||
def test_hockey_period_text_p2(self):
|
|
||||||
situation = {"isPowerPlay": False} # non-empty so the live block executes
|
|
||||||
event = _make_espn_event(
|
|
||||||
state="in", include_situation=True, situation=situation, period=2
|
|
||||||
)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["period_text"] == "P2"
|
|
||||||
|
|
||||||
def test_hockey_period_text_p3(self):
|
|
||||||
situation = {"isPowerPlay": False}
|
|
||||||
event = _make_espn_event(
|
|
||||||
state="in", include_situation=True, situation=situation, period=3
|
|
||||||
)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["period_text"] == "P3"
|
|
||||||
|
|
||||||
def test_hockey_period_text_ot(self):
|
|
||||||
situation = {"isPowerPlay": False}
|
|
||||||
event = _make_espn_event(
|
|
||||||
state="in", include_situation=True, situation=situation, period=4
|
|
||||||
)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["period_text"] == "OT1"
|
|
||||||
|
|
||||||
def test_hockey_power_play(self):
|
|
||||||
situation = {"isPowerPlay": True, "homeShots": 12, "awayShots": 8}
|
|
||||||
event = _make_espn_event(state="in", include_situation=True, situation=situation, period=2)
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["power_play"] is True
|
|
||||||
assert fields["shots_on_goal"]["home"] == 12
|
|
||||||
assert fields["shots_on_goal"]["away"] == 8
|
|
||||||
|
|
||||||
def test_hockey_fields_defaults_pregame(self):
|
|
||||||
event = _make_espn_event(state="pre")
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert "period" in fields
|
|
||||||
assert "power_play" in fields
|
|
||||||
assert fields["power_play"] is False
|
|
||||||
|
|
||||||
def test_returns_none_on_empty(self):
|
|
||||||
assert self.extractor.extract_game_details({}) is None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# SoccerAPIExtractor
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestSoccerAPIExtractor:
|
|
||||||
def setup_method(self):
|
|
||||||
self.extractor = SoccerAPIExtractor(_make_logger())
|
|
||||||
|
|
||||||
def _make_soccer_event(self, is_live: bool = True) -> dict:
|
|
||||||
return {
|
|
||||||
"id": "soccer-1",
|
|
||||||
"home_team": {"abbreviation": "ARS", "name": "Arsenal"},
|
|
||||||
"away_team": {"abbreviation": "CHE", "name": "Chelsea"},
|
|
||||||
"home_score": "2",
|
|
||||||
"away_score": "1",
|
|
||||||
"status": "LIVE",
|
|
||||||
"is_live": is_live,
|
|
||||||
"is_final": not is_live,
|
|
||||||
"is_upcoming": False,
|
|
||||||
"half": "1",
|
|
||||||
"stoppage_time": "2",
|
|
||||||
"home_yellow_cards": 1,
|
|
||||||
"away_yellow_cards": 2,
|
|
||||||
"home_red_cards": 0,
|
|
||||||
"away_red_cards": 0,
|
|
||||||
"home_possession": 55,
|
|
||||||
"away_possession": 45,
|
|
||||||
}
|
|
||||||
|
|
||||||
def test_extract_live_game(self):
|
|
||||||
event = self._make_soccer_event(is_live=True)
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result is not None
|
|
||||||
assert result["home_abbr"] == "ARS"
|
|
||||||
assert result["away_abbr"] == "CHE"
|
|
||||||
assert result["is_live"] is True
|
|
||||||
|
|
||||||
def test_sport_specific_cards(self):
|
|
||||||
event = self._make_soccer_event()
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["cards"]["home_yellow"] == 1
|
|
||||||
assert fields["cards"]["away_yellow"] == 2
|
|
||||||
assert fields["cards"]["home_red"] == 0
|
|
||||||
|
|
||||||
def test_sport_specific_possession(self):
|
|
||||||
event = self._make_soccer_event()
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["possession"]["home"] == 55
|
|
||||||
assert fields["possession"]["away"] == 45
|
|
||||||
|
|
||||||
def test_sport_specific_half(self):
|
|
||||||
event = self._make_soccer_event()
|
|
||||||
fields = self.extractor.get_sport_specific_fields(event)
|
|
||||||
assert fields["half"] == "1"
|
|
||||||
|
|
||||||
def test_scores_as_strings(self):
|
|
||||||
event = self._make_soccer_event()
|
|
||||||
result = self.extractor.extract_game_details(event)
|
|
||||||
assert result["home_score"] == "2"
|
|
||||||
assert result["away_score"] == "1"
|
|
||||||
@@ -1,299 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/background_data_service.py
|
|
||||||
|
|
||||||
Covers BackgroundDataService: submit_fetch_request, get_result,
|
|
||||||
is_request_complete, get_request_status, cancel_request, get_statistics,
|
|
||||||
_cleanup_completed_requests, shutdown, and get_background_service singleton.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import time
|
|
||||||
import pytest
|
|
||||||
from unittest.mock import MagicMock, patch, Mock
|
|
||||||
from concurrent.futures import Future
|
|
||||||
|
|
||||||
from src.background_data_service import (
|
|
||||||
BackgroundDataService,
|
|
||||||
FetchStatus,
|
|
||||||
FetchResult,
|
|
||||||
FetchRequest,
|
|
||||||
get_background_service,
|
|
||||||
shutdown_background_service,
|
|
||||||
)
|
|
||||||
import src.background_data_service as bds_module
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Fixtures
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def reset_global_service():
|
|
||||||
"""Ensure each test starts with no global singleton."""
|
|
||||||
shutdown_background_service()
|
|
||||||
yield
|
|
||||||
shutdown_background_service()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mock_cache_manager():
|
|
||||||
m = MagicMock()
|
|
||||||
m.get.return_value = None
|
|
||||||
m.set.return_value = None
|
|
||||||
m.generate_sport_cache_key.return_value = "test_key"
|
|
||||||
return m
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def service(mock_cache_manager):
|
|
||||||
svc = BackgroundDataService(mock_cache_manager, max_workers=2, request_timeout=5)
|
|
||||||
yield svc
|
|
||||||
svc.shutdown(wait=False)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Initialisation
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestInitialisation:
|
|
||||||
def test_stats_zeroed(self, service):
|
|
||||||
stats = service.get_statistics()
|
|
||||||
assert stats["total_requests"] == 0
|
|
||||||
assert stats["completed_requests"] == 0
|
|
||||||
assert stats["failed_requests"] == 0
|
|
||||||
|
|
||||||
def test_no_active_requests(self, service):
|
|
||||||
assert len(service.active_requests) == 0
|
|
||||||
|
|
||||||
def test_not_shutdown(self, service):
|
|
||||||
assert service._shutdown is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Cache hit path
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestCacheHit:
|
|
||||||
def test_cache_hit_returns_request_id(self, service, mock_cache_manager):
|
|
||||||
mock_cache_manager.get.return_value = {"events": [{"id": "1"}]}
|
|
||||||
req_id = service.submit_fetch_request(
|
|
||||||
sport="nfl", year=2024,
|
|
||||||
url="https://example.com/nfl",
|
|
||||||
cache_key="nfl_key",
|
|
||||||
)
|
|
||||||
assert req_id is not None
|
|
||||||
# Request should be immediately complete due to cache hit
|
|
||||||
result = service.get_result(req_id)
|
|
||||||
assert result is not None
|
|
||||||
assert result.success is True
|
|
||||||
assert result.cached is True
|
|
||||||
|
|
||||||
def test_cache_hit_increments_stat(self, service, mock_cache_manager):
|
|
||||||
mock_cache_manager.get.return_value = {"events": []}
|
|
||||||
service.submit_fetch_request(sport="nba", year=2024, url="https://x.com", cache_key="k")
|
|
||||||
stats = service.get_statistics()
|
|
||||||
assert stats["cached_hits"] == 1
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Actual fetch path (mocked HTTP)
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestFetchPath:
|
|
||||||
def _valid_payload(self) -> dict:
|
|
||||||
return {"events": [{"id": "g1"}, {"id": "g2"}]}
|
|
||||||
|
|
||||||
def test_successful_fetch_completes(self, service, mock_cache_manager):
|
|
||||||
mock_resp = Mock()
|
|
||||||
mock_resp.json.return_value = self._valid_payload()
|
|
||||||
mock_resp.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
with patch.object(service.session, "get", return_value=mock_resp):
|
|
||||||
req_id = service.submit_fetch_request(
|
|
||||||
sport="nfl", year=2024,
|
|
||||||
url="https://example.com/nfl",
|
|
||||||
cache_key="nfl_test",
|
|
||||||
)
|
|
||||||
# Wait for the background thread
|
|
||||||
deadline = time.time() + 5
|
|
||||||
while not service.is_request_complete(req_id) and time.time() < deadline:
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
result = service.get_result(req_id)
|
|
||||||
assert result is not None
|
|
||||||
assert result.success is True
|
|
||||||
assert result.data == self._valid_payload()
|
|
||||||
|
|
||||||
def test_failed_fetch_records_error(self, service, mock_cache_manager):
|
|
||||||
with patch.object(service.session, "get", side_effect=Exception("network error")):
|
|
||||||
req_id = service.submit_fetch_request(
|
|
||||||
sport="nba", year=2024,
|
|
||||||
url="https://example.com/nba",
|
|
||||||
cache_key="nba_test",
|
|
||||||
max_retries=0,
|
|
||||||
)
|
|
||||||
deadline = time.time() + 5
|
|
||||||
while not service.is_request_complete(req_id) and time.time() < deadline:
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
result = service.get_result(req_id)
|
|
||||||
assert result is not None
|
|
||||||
assert result.success is False
|
|
||||||
assert result.error is not None
|
|
||||||
|
|
||||||
def test_cache_miss_increments_stat(self, service, mock_cache_manager):
|
|
||||||
mock_resp = Mock()
|
|
||||||
mock_resp.json.return_value = self._valid_payload()
|
|
||||||
mock_resp.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
with patch.object(service.session, "get", return_value=mock_resp):
|
|
||||||
service.submit_fetch_request(
|
|
||||||
sport="nfl", year=2024, url="https://x.com", cache_key="new_key",
|
|
||||||
)
|
|
||||||
stats = service.get_statistics()
|
|
||||||
assert stats["cache_misses"] == 1
|
|
||||||
|
|
||||||
def test_callback_called_on_success(self, service, mock_cache_manager):
|
|
||||||
callback = Mock()
|
|
||||||
mock_resp = Mock()
|
|
||||||
mock_resp.json.return_value = self._valid_payload()
|
|
||||||
mock_resp.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
with patch.object(service.session, "get", return_value=mock_resp):
|
|
||||||
req_id = service.submit_fetch_request(
|
|
||||||
sport="nfl", year=2024, url="https://x.com",
|
|
||||||
cache_key="cb_key", callback=callback, max_retries=0,
|
|
||||||
)
|
|
||||||
deadline = time.time() + 5
|
|
||||||
while not service.is_request_complete(req_id) and time.time() < deadline:
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
callback.assert_called_once()
|
|
||||||
call_arg = callback.call_args[0][0]
|
|
||||||
assert isinstance(call_arg, FetchResult)
|
|
||||||
|
|
||||||
def test_data_cached_after_successful_fetch(self, service, mock_cache_manager):
|
|
||||||
mock_resp = Mock()
|
|
||||||
mock_resp.json.return_value = self._valid_payload()
|
|
||||||
mock_resp.raise_for_status.return_value = None
|
|
||||||
|
|
||||||
with patch.object(service.session, "get", return_value=mock_resp):
|
|
||||||
req_id = service.submit_fetch_request(
|
|
||||||
sport="nfl", year=2024, url="https://x.com", cache_key="cache_after_key",
|
|
||||||
)
|
|
||||||
deadline = time.time() + 5
|
|
||||||
while not service.is_request_complete(req_id) and time.time() < deadline:
|
|
||||||
time.sleep(0.05)
|
|
||||||
|
|
||||||
mock_cache_manager.set.assert_called()
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Request status / cancel
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestRequestStatusAndCancel:
|
|
||||||
def test_unknown_request_status_is_none(self, service):
|
|
||||||
assert service.get_request_status("nonexistent") is None
|
|
||||||
|
|
||||||
def test_cancel_active_request(self, service, mock_cache_manager):
|
|
||||||
# Manually insert an active request
|
|
||||||
req = FetchRequest(
|
|
||||||
id="r1", sport="nfl", year=2024,
|
|
||||||
cache_key="k", url="https://x.com",
|
|
||||||
)
|
|
||||||
req.status = FetchStatus.PENDING
|
|
||||||
service.active_requests["r1"] = req
|
|
||||||
result = service.cancel_request("r1")
|
|
||||||
assert result is True
|
|
||||||
assert "r1" not in service.active_requests
|
|
||||||
|
|
||||||
def test_cancel_nonexistent_request(self, service):
|
|
||||||
assert service.cancel_request("does-not-exist") is False
|
|
||||||
|
|
||||||
def test_is_request_complete_false_for_active(self, service, mock_cache_manager):
|
|
||||||
req = FetchRequest(
|
|
||||||
id="r2", sport="mlb", year=2024,
|
|
||||||
cache_key="k2", url="https://x.com",
|
|
||||||
)
|
|
||||||
service.active_requests["r2"] = req
|
|
||||||
assert service.is_request_complete("r2") is False
|
|
||||||
|
|
||||||
def test_is_request_complete_true_for_done(self, service):
|
|
||||||
result = FetchResult(request_id="r3", success=True)
|
|
||||||
service.completed_requests["r3"] = result
|
|
||||||
assert service.is_request_complete("r3") is True
|
|
||||||
|
|
||||||
def test_get_result_returns_none_for_unknown(self, service):
|
|
||||||
assert service.get_result("unknown") is None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Shutdown
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestShutdown:
|
|
||||||
def test_shutdown_sets_flag(self, service):
|
|
||||||
service.shutdown(wait=False)
|
|
||||||
assert service._shutdown is True
|
|
||||||
|
|
||||||
def test_submit_after_shutdown_raises(self, service, mock_cache_manager):
|
|
||||||
service.shutdown(wait=False)
|
|
||||||
with pytest.raises(RuntimeError, match="shutting down"):
|
|
||||||
service.submit_fetch_request(
|
|
||||||
sport="nfl", year=2024, url="https://x.com", cache_key="k"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Cleanup
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestCleanup:
|
|
||||||
def test_cleanup_removes_old_requests(self, service):
|
|
||||||
old_result = FetchResult(request_id="old", success=True)
|
|
||||||
old_result.completed_at = time.time() - 7200 # 2 hours ago
|
|
||||||
service.completed_requests["old"] = old_result
|
|
||||||
service._last_completed_requests_cleanup = 0 # force cleanup
|
|
||||||
removed = service._cleanup_completed_requests(force=True)
|
|
||||||
assert removed >= 1
|
|
||||||
assert "old" not in service.completed_requests
|
|
||||||
|
|
||||||
def test_cleanup_respects_interval(self, service):
|
|
||||||
old_result = FetchResult(request_id="r", success=True)
|
|
||||||
old_result.completed_at = time.time() - 7200
|
|
||||||
service.completed_requests["r"] = old_result
|
|
||||||
# Cleanup interval not passed, should skip
|
|
||||||
service._last_completed_requests_cleanup = time.time()
|
|
||||||
removed = service._cleanup_completed_requests(force=False)
|
|
||||||
assert removed == 0
|
|
||||||
|
|
||||||
def test_size_limit_enforcement(self, service):
|
|
||||||
service._max_completed_requests = 3
|
|
||||||
for i in range(5):
|
|
||||||
result = FetchResult(request_id=str(i), success=True)
|
|
||||||
result.completed_at = time.time() - (5 - i) * 100 # oldest first
|
|
||||||
service.completed_requests[str(i)] = result
|
|
||||||
service._last_completed_requests_cleanup = 0
|
|
||||||
service._cleanup_completed_requests(force=True)
|
|
||||||
assert len(service.completed_requests) <= 3
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Singleton get_background_service
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetBackgroundService:
|
|
||||||
def test_first_call_requires_cache_manager(self):
|
|
||||||
with pytest.raises(ValueError, match="cache_manager is required"):
|
|
||||||
get_background_service()
|
|
||||||
|
|
||||||
def test_creates_singleton(self, mock_cache_manager):
|
|
||||||
svc1 = get_background_service(mock_cache_manager)
|
|
||||||
svc2 = get_background_service()
|
|
||||||
assert svc1 is svc2
|
|
||||||
|
|
||||||
def test_shutdown_clears_singleton(self, mock_cache_manager):
|
|
||||||
get_background_service(mock_cache_manager)
|
|
||||||
shutdown_background_service()
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
get_background_service()
|
|
||||||
@@ -1,209 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/base_classes/data_sources.py
|
|
||||||
|
|
||||||
Covers ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource.
|
|
||||||
All HTTP calls are mocked to avoid network access.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from datetime import datetime, date
|
|
||||||
from unittest.mock import MagicMock, patch, Mock
|
|
||||||
import pytest
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from src.base_classes.data_sources import ESPNDataSource, MLBAPIDataSource, SoccerAPIDataSource
|
|
||||||
|
|
||||||
|
|
||||||
def _make_logger() -> logging.Logger:
|
|
||||||
return logging.getLogger("test_data_sources")
|
|
||||||
|
|
||||||
|
|
||||||
def _mock_response(json_data: dict, status_code: int = 200):
|
|
||||||
resp = Mock(spec=requests.Response)
|
|
||||||
resp.status_code = status_code
|
|
||||||
resp.json.return_value = json_data
|
|
||||||
resp.raise_for_status = Mock()
|
|
||||||
if status_code >= 400:
|
|
||||||
resp.raise_for_status.side_effect = requests.HTTPError(response=resp)
|
|
||||||
return resp
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# ESPNDataSource
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestESPNDataSource:
|
|
||||||
def setup_method(self):
|
|
||||||
self.source = ESPNDataSource(_make_logger())
|
|
||||||
|
|
||||||
def test_get_headers(self):
|
|
||||||
headers = self.source.get_headers()
|
|
||||||
assert headers["Accept"] == "application/json"
|
|
||||||
assert "LEDMatrix" in headers["User-Agent"]
|
|
||||||
|
|
||||||
def test_fetch_live_games_returns_live_events(self):
|
|
||||||
live_event = {
|
|
||||||
"competitions": [{"status": {"type": {"state": "in"}}}]
|
|
||||||
}
|
|
||||||
non_live_event = {
|
|
||||||
"competitions": [{"status": {"type": {"state": "pre"}}}]
|
|
||||||
}
|
|
||||||
payload = {"events": [live_event, non_live_event]}
|
|
||||||
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_live_games("football", "nfl")
|
|
||||||
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0] is live_event
|
|
||||||
|
|
||||||
def test_fetch_live_games_empty_when_none_live(self):
|
|
||||||
payload = {"events": [
|
|
||||||
{"competitions": [{"status": {"type": {"state": "post"}}}]}
|
|
||||||
]}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_live_games("football", "nfl")
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_live_games_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("network failure")):
|
|
||||||
result = self.source.fetch_live_games("football", "nfl")
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_schedule_returns_all_events(self):
|
|
||||||
events = [{"id": "1"}, {"id": "2"}]
|
|
||||||
payload = {"events": events}
|
|
||||||
start = datetime(2024, 1, 1)
|
|
||||||
end = datetime(2024, 1, 7)
|
|
||||||
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_schedule("football", "nfl", (start, end))
|
|
||||||
|
|
||||||
assert len(result) == 2
|
|
||||||
|
|
||||||
def test_fetch_schedule_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("timeout")):
|
|
||||||
result = self.source.fetch_schedule("football", "nfl", (datetime.now(), datetime.now()))
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_standings_success(self):
|
|
||||||
payload = {"standings": []}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_standings("football", "nfl")
|
|
||||||
assert result == payload
|
|
||||||
|
|
||||||
def test_fetch_standings_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("error")):
|
|
||||||
result = self.source.fetch_standings("football", "nfl")
|
|
||||||
assert result == {}
|
|
||||||
|
|
||||||
def test_base_url_set_correctly(self):
|
|
||||||
assert "espn.com" in self.source.base_url
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# MLBAPIDataSource
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestMLBAPIDataSource:
|
|
||||||
def setup_method(self):
|
|
||||||
self.source = MLBAPIDataSource(_make_logger())
|
|
||||||
|
|
||||||
def test_fetch_live_games_filters_live(self):
|
|
||||||
live_game = {"status": {"abstractGameState": "Live"}}
|
|
||||||
final_game = {"status": {"abstractGameState": "Final"}}
|
|
||||||
payload = {"dates": [{"games": [live_game, final_game]}]}
|
|
||||||
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_live_games("baseball", "mlb")
|
|
||||||
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0] is live_game
|
|
||||||
|
|
||||||
def test_fetch_live_games_empty_dates(self):
|
|
||||||
payload = {"dates": []}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_live_games("baseball", "mlb")
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_live_games_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("err")):
|
|
||||||
result = self.source.fetch_live_games("baseball", "mlb")
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_schedule_aggregates_all_dates(self):
|
|
||||||
payload = {
|
|
||||||
"dates": [
|
|
||||||
{"games": [{"id": "1"}, {"id": "2"}]},
|
|
||||||
{"games": [{"id": "3"}]},
|
|
||||||
]
|
|
||||||
}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
|
|
||||||
assert len(result) == 3
|
|
||||||
|
|
||||||
def test_fetch_schedule_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("err")):
|
|
||||||
result = self.source.fetch_schedule("baseball", "mlb", (datetime.now(), datetime.now()))
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_standings_success(self):
|
|
||||||
payload = {"records": []}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_standings("baseball", "mlb")
|
|
||||||
assert result == payload
|
|
||||||
|
|
||||||
def test_fetch_standings_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("err")):
|
|
||||||
result = self.source.fetch_standings("baseball", "mlb")
|
|
||||||
assert result == {}
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# SoccerAPIDataSource
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestSoccerAPIDataSource:
|
|
||||||
def setup_method(self):
|
|
||||||
self.source = SoccerAPIDataSource(_make_logger(), api_key="test-key-123")
|
|
||||||
|
|
||||||
def test_headers_include_api_key(self):
|
|
||||||
headers = self.source.get_headers()
|
|
||||||
assert headers["X-Auth-Token"] == "test-key-123"
|
|
||||||
|
|
||||||
def test_headers_without_api_key(self):
|
|
||||||
source = SoccerAPIDataSource(_make_logger())
|
|
||||||
headers = source.get_headers()
|
|
||||||
assert "X-Auth-Token" not in headers
|
|
||||||
|
|
||||||
def test_fetch_live_games_success(self):
|
|
||||||
payload = {"matches": [{"id": "m1"}, {"id": "m2"}]}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_live_games("soccer", "eng.1")
|
|
||||||
assert len(result) == 2
|
|
||||||
|
|
||||||
def test_fetch_live_games_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("err")):
|
|
||||||
result = self.source.fetch_live_games("soccer", "eng.1")
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_schedule_success(self):
|
|
||||||
payload = {"matches": [{"id": "m1"}]}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
|
|
||||||
assert len(result) == 1
|
|
||||||
|
|
||||||
def test_fetch_schedule_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("err")):
|
|
||||||
result = self.source.fetch_schedule("soccer", "eng.1", (datetime.now(), datetime.now()))
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
def test_fetch_standings_success(self):
|
|
||||||
payload = {"standings": []}
|
|
||||||
with patch.object(self.source.session, "get", return_value=_mock_response(payload)):
|
|
||||||
result = self.source.fetch_standings("soccer", "PL")
|
|
||||||
assert result == payload
|
|
||||||
|
|
||||||
def test_fetch_standings_returns_empty_on_error(self):
|
|
||||||
with patch.object(self.source.session, "get", side_effect=Exception("err")):
|
|
||||||
result = self.source.fetch_standings("soccer", "PL")
|
|
||||||
assert result == {}
|
|
||||||
@@ -1,317 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/common/game_helper.py
|
|
||||||
|
|
||||||
Covers GameHelper: extract_game_details, filter_*, sort_games_by_time,
|
|
||||||
process_games, get_game_summary, and all private helpers.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import pytest
|
|
||||||
from datetime import datetime, timezone, timedelta
|
|
||||||
|
|
||||||
from src.common.game_helper import GameHelper
|
|
||||||
|
|
||||||
|
|
||||||
def _make_logger() -> logging.Logger:
|
|
||||||
return logging.getLogger("test_game_helper")
|
|
||||||
|
|
||||||
|
|
||||||
def _make_espn_event(
|
|
||||||
state: str = "in",
|
|
||||||
home_abbr: str = "LAL",
|
|
||||||
away_abbr: str = "BOS",
|
|
||||||
home_score: str = "105",
|
|
||||||
away_score: str = "98",
|
|
||||||
date_str: str = "2024-01-15T20:00:00Z",
|
|
||||||
period: int = 4,
|
|
||||||
status_name: str = "STATUS_IN_PROGRESS",
|
|
||||||
home_record: str = "30-10",
|
|
||||||
away_record: str = "25-15",
|
|
||||||
event_id: str = "game-1",
|
|
||||||
) -> dict:
|
|
||||||
return {
|
|
||||||
"id": event_id,
|
|
||||||
"date": date_str,
|
|
||||||
"competitions": [
|
|
||||||
{
|
|
||||||
"status": {
|
|
||||||
"type": {
|
|
||||||
"state": state,
|
|
||||||
"shortDetail": "Q4 2:30",
|
|
||||||
"name": status_name,
|
|
||||||
},
|
|
||||||
"period": period,
|
|
||||||
"displayClock": "2:30",
|
|
||||||
},
|
|
||||||
"competitors": [
|
|
||||||
{
|
|
||||||
"homeAway": "home",
|
|
||||||
"id": "h1",
|
|
||||||
"team": {"abbreviation": home_abbr, "displayName": f"{home_abbr} Team"},
|
|
||||||
"score": home_score,
|
|
||||||
"records": [{"summary": home_record}],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"homeAway": "away",
|
|
||||||
"id": "a1",
|
|
||||||
"team": {"abbreviation": away_abbr, "displayName": f"{away_abbr} Team"},
|
|
||||||
"score": away_score,
|
|
||||||
"records": [{"summary": away_record}],
|
|
||||||
},
|
|
||||||
],
|
|
||||||
}
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def helper():
|
|
||||||
return GameHelper(timezone_str="UTC", logger=_make_logger())
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# extract_game_details
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestExtractGameDetails:
|
|
||||||
def test_live_game(self, helper):
|
|
||||||
event = _make_espn_event(state="in")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result is not None
|
|
||||||
assert result["is_live"] is True
|
|
||||||
assert result["is_final"] is False
|
|
||||||
assert result["is_upcoming"] is False
|
|
||||||
|
|
||||||
def test_final_game(self, helper):
|
|
||||||
event = _make_espn_event(state="post")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result["is_final"] is True
|
|
||||||
|
|
||||||
def test_upcoming_game(self, helper):
|
|
||||||
event = _make_espn_event(state="pre")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result["is_upcoming"] is True
|
|
||||||
|
|
||||||
def test_halftime_detection(self, helper):
|
|
||||||
event = _make_espn_event(state="halftime", status_name="STATUS_HALFTIME")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result["is_halftime"] is True
|
|
||||||
|
|
||||||
def test_basic_fields_present(self, helper):
|
|
||||||
event = _make_espn_event()
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
for key in ("id", "home_abbr", "away_abbr", "home_score", "away_score",
|
|
||||||
"home_record", "away_record", "start_time_utc"):
|
|
||||||
assert key in result
|
|
||||||
|
|
||||||
def test_team_abbreviations(self, helper):
|
|
||||||
event = _make_espn_event(home_abbr="MIA", away_abbr="PHX")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result["home_abbr"] == "MIA"
|
|
||||||
assert result["away_abbr"] == "PHX"
|
|
||||||
|
|
||||||
def test_scores_as_strings(self, helper):
|
|
||||||
event = _make_espn_event(home_score="110", away_score="99")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result["home_score"] == "110"
|
|
||||||
assert result["away_score"] == "99"
|
|
||||||
|
|
||||||
def test_returns_none_on_empty(self, helper):
|
|
||||||
assert helper.extract_game_details({}) is None
|
|
||||||
assert helper.extract_game_details(None) is None
|
|
||||||
|
|
||||||
def test_returns_none_when_no_competitors(self, helper):
|
|
||||||
event = _make_espn_event()
|
|
||||||
event["competitions"][0]["competitors"] = []
|
|
||||||
assert helper.extract_game_details(event) is None
|
|
||||||
|
|
||||||
def test_date_z_suffix_parsed(self, helper):
|
|
||||||
event = _make_espn_event(date_str="2024-06-01T19:30:00Z")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result["start_time_utc"] is not None
|
|
||||||
assert result["start_time_utc"].tzinfo is not None
|
|
||||||
|
|
||||||
def test_zero_zero_record_suppressed(self, helper):
|
|
||||||
event = _make_espn_event(home_record="0-0", away_record="0-0-0")
|
|
||||||
result = helper.extract_game_details(event)
|
|
||||||
assert result["home_record"] == ""
|
|
||||||
assert result["away_record"] == ""
|
|
||||||
|
|
||||||
def test_basketball_sport_fields(self, helper):
|
|
||||||
event = _make_espn_event(period=3)
|
|
||||||
result = helper.extract_game_details(event, sport="basketball")
|
|
||||||
assert result["period_text"] == "Q3"
|
|
||||||
assert "clock" in result
|
|
||||||
|
|
||||||
def test_basketball_overtime_period(self, helper):
|
|
||||||
event = _make_espn_event(period=5)
|
|
||||||
result = helper.extract_game_details(event, sport="basketball")
|
|
||||||
assert result["period_text"] == "OT1"
|
|
||||||
|
|
||||||
def test_football_sport_fields(self, helper):
|
|
||||||
event = _make_espn_event(period=2)
|
|
||||||
result = helper.extract_game_details(event, sport="football")
|
|
||||||
assert result["period_text"] == "Q2"
|
|
||||||
|
|
||||||
def test_hockey_sport_fields_period_1(self, helper):
|
|
||||||
event = _make_espn_event(period=1)
|
|
||||||
result = helper.extract_game_details(event, sport="hockey")
|
|
||||||
assert result["period_text"] == "P1"
|
|
||||||
|
|
||||||
def test_hockey_sport_fields_ot(self, helper):
|
|
||||||
event = _make_espn_event(period=4)
|
|
||||||
result = helper.extract_game_details(event, sport="hockey")
|
|
||||||
assert result["period_text"] == "OT1"
|
|
||||||
|
|
||||||
def test_baseball_sport_fields(self, helper):
|
|
||||||
event = _make_espn_event(period=7)
|
|
||||||
result = helper.extract_game_details(event, sport="baseball")
|
|
||||||
assert result["period_text"] == "INN 7"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Filter methods
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestFilterMethods:
|
|
||||||
def _make_games(self):
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
return [
|
|
||||||
{"is_live": True, "is_final": False, "is_upcoming": False, "home_abbr": "LAL", "away_abbr": "BOS", "start_time_utc": now},
|
|
||||||
{"is_live": False, "is_final": True, "is_upcoming": False, "home_abbr": "MIA", "away_abbr": "PHX", "start_time_utc": now - timedelta(hours=3)},
|
|
||||||
{"is_live": False, "is_final": False, "is_upcoming": True, "home_abbr": "DAL", "away_abbr": "CHI", "start_time_utc": now + timedelta(hours=2)},
|
|
||||||
]
|
|
||||||
|
|
||||||
def test_filter_live_games(self, helper):
|
|
||||||
games = self._make_games()
|
|
||||||
result = helper.filter_live_games(games)
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0]["home_abbr"] == "LAL"
|
|
||||||
|
|
||||||
def test_filter_final_games(self, helper):
|
|
||||||
games = self._make_games()
|
|
||||||
result = helper.filter_final_games(games)
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0]["home_abbr"] == "MIA"
|
|
||||||
|
|
||||||
def test_filter_upcoming_games(self, helper):
|
|
||||||
games = self._make_games()
|
|
||||||
result = helper.filter_upcoming_games(games)
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0]["home_abbr"] == "DAL"
|
|
||||||
|
|
||||||
def test_filter_favorite_teams_match(self, helper):
|
|
||||||
games = self._make_games()
|
|
||||||
result = helper.filter_favorite_teams(games, ["LAL"])
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0]["home_abbr"] == "LAL"
|
|
||||||
|
|
||||||
def test_filter_favorite_teams_empty_list_returns_all(self, helper):
|
|
||||||
games = self._make_games()
|
|
||||||
result = helper.filter_favorite_teams(games, [])
|
|
||||||
assert len(result) == 3
|
|
||||||
|
|
||||||
def test_filter_favorite_teams_away_match(self, helper):
|
|
||||||
games = self._make_games()
|
|
||||||
result = helper.filter_favorite_teams(games, ["BOS"])
|
|
||||||
assert len(result) == 1
|
|
||||||
|
|
||||||
def test_filter_recent_games_within_window(self, helper):
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
games = [
|
|
||||||
{"start_time_utc": now - timedelta(days=2), "is_final": True},
|
|
||||||
{"start_time_utc": now - timedelta(days=10), "is_final": True},
|
|
||||||
]
|
|
||||||
result = helper.filter_recent_games(games, days_back=7)
|
|
||||||
assert len(result) == 1
|
|
||||||
|
|
||||||
def test_filter_recent_games_all_within(self, helper):
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
games = [
|
|
||||||
{"start_time_utc": now - timedelta(days=1)},
|
|
||||||
{"start_time_utc": now - timedelta(days=3)},
|
|
||||||
]
|
|
||||||
result = helper.filter_recent_games(games, days_back=7)
|
|
||||||
assert len(result) == 2
|
|
||||||
|
|
||||||
def test_sort_games_ascending(self, helper):
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
games = [
|
|
||||||
{"start_time_utc": now + timedelta(hours=2), "id": "late"},
|
|
||||||
{"start_time_utc": now + timedelta(hours=1), "id": "early"},
|
|
||||||
]
|
|
||||||
result = helper.sort_games_by_time(games)
|
|
||||||
assert result[0]["id"] == "early"
|
|
||||||
|
|
||||||
def test_sort_games_descending(self, helper):
|
|
||||||
now = datetime.now(timezone.utc)
|
|
||||||
games = [
|
|
||||||
{"start_time_utc": now + timedelta(hours=1), "id": "early"},
|
|
||||||
{"start_time_utc": now + timedelta(hours=2), "id": "late"},
|
|
||||||
]
|
|
||||||
result = helper.sort_games_by_time(games, reverse=True)
|
|
||||||
assert result[0]["id"] == "late"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# process_games
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestProcessGames:
|
|
||||||
def test_processes_valid_events(self, helper):
|
|
||||||
events = [
|
|
||||||
_make_espn_event(event_id="1"),
|
|
||||||
_make_espn_event(event_id="2"),
|
|
||||||
]
|
|
||||||
result = helper.process_games(events)
|
|
||||||
assert len(result) == 2
|
|
||||||
|
|
||||||
def test_skips_invalid_events(self, helper):
|
|
||||||
events = [
|
|
||||||
_make_espn_event(event_id="1"),
|
|
||||||
{}, # invalid
|
|
||||||
]
|
|
||||||
result = helper.process_games(events)
|
|
||||||
assert len(result) == 1
|
|
||||||
|
|
||||||
def test_empty_events(self, helper):
|
|
||||||
assert helper.process_games([]) == []
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_game_summary
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetGameSummary:
|
|
||||||
def test_live_summary(self, helper):
|
|
||||||
game = {
|
|
||||||
"home_abbr": "LAL", "away_abbr": "BOS",
|
|
||||||
"home_score": "105", "away_score": "98",
|
|
||||||
"status_text": "Q4 2:30",
|
|
||||||
"is_live": True, "is_final": False,
|
|
||||||
}
|
|
||||||
summary = helper.get_game_summary(game)
|
|
||||||
assert "BOS" in summary
|
|
||||||
assert "LAL" in summary
|
|
||||||
assert "98" in summary
|
|
||||||
assert "105" in summary
|
|
||||||
|
|
||||||
def test_final_summary(self, helper):
|
|
||||||
game = {
|
|
||||||
"home_abbr": "LAL", "away_abbr": "BOS",
|
|
||||||
"home_score": "110", "away_score": "102",
|
|
||||||
"status_text": "Final",
|
|
||||||
"is_live": False, "is_final": True,
|
|
||||||
}
|
|
||||||
summary = helper.get_game_summary(game)
|
|
||||||
assert "Final" in summary
|
|
||||||
|
|
||||||
def test_upcoming_summary(self, helper):
|
|
||||||
game = {
|
|
||||||
"home_abbr": "LAL", "away_abbr": "BOS",
|
|
||||||
"home_score": "0", "away_score": "0",
|
|
||||||
"status_text": "7:30 PM",
|
|
||||||
"is_live": False, "is_final": False,
|
|
||||||
}
|
|
||||||
summary = helper.get_game_summary(game)
|
|
||||||
assert "7:30 PM" in summary
|
|
||||||
@@ -1,307 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/plugin_system/health_monitor.py
|
|
||||||
|
|
||||||
Covers PluginHealthMonitor: get_plugin_health_status, get_plugin_health_metrics,
|
|
||||||
get_all_plugin_health, _get_recovery_suggestions, start/stop_monitoring,
|
|
||||||
register_health_check.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from src.plugin_system.health_monitor import (
|
|
||||||
PluginHealthMonitor,
|
|
||||||
HealthStatus,
|
|
||||||
HealthMetrics,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Fixtures
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _make_health_tracker(
|
|
||||||
summary: dict | None = None,
|
|
||||||
all_summaries: dict | None = None,
|
|
||||||
):
|
|
||||||
"""Return a mock PluginHealthTracker."""
|
|
||||||
tracker = MagicMock()
|
|
||||||
tracker.get_health_summary.return_value = summary
|
|
||||||
tracker.get_all_health_summaries.return_value = all_summaries or {}
|
|
||||||
return tracker
|
|
||||||
|
|
||||||
|
|
||||||
def _healthy_summary() -> dict:
|
|
||||||
return {
|
|
||||||
"success_rate": 100.0,
|
|
||||||
"circuit_state": "closed",
|
|
||||||
"consecutive_failures": 0,
|
|
||||||
"total_failures": 0,
|
|
||||||
"total_successes": 50,
|
|
||||||
"last_success_time": datetime.now().isoformat(),
|
|
||||||
"last_error": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _degraded_summary() -> dict:
|
|
||||||
return {
|
|
||||||
"success_rate": 40.0, # 60% error rate
|
|
||||||
"circuit_state": "closed",
|
|
||||||
"consecutive_failures": 3,
|
|
||||||
"total_failures": 6,
|
|
||||||
"total_successes": 4,
|
|
||||||
"last_success_time": None,
|
|
||||||
"last_error": "timeout occurred",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _unhealthy_summary() -> dict:
|
|
||||||
return {
|
|
||||||
"success_rate": 10.0, # 90% error rate
|
|
||||||
"circuit_state": "open",
|
|
||||||
"consecutive_failures": 10,
|
|
||||||
"total_failures": 9,
|
|
||||||
"total_successes": 1,
|
|
||||||
"last_success_time": None,
|
|
||||||
"last_error": "ImportError: missing module",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def monitor():
|
|
||||||
tracker = _make_health_tracker(_healthy_summary())
|
|
||||||
return PluginHealthMonitor(health_tracker=tracker)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_plugin_health_status
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetPluginHealthStatus:
|
|
||||||
def test_healthy_status(self):
|
|
||||||
tracker = _make_health_tracker(_healthy_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
status = monitor.get_plugin_health_status("plugin_a")
|
|
||||||
assert status == HealthStatus.HEALTHY
|
|
||||||
|
|
||||||
def test_degraded_status(self):
|
|
||||||
tracker = _make_health_tracker(_degraded_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
|
|
||||||
status = monitor.get_plugin_health_status("plugin_b")
|
|
||||||
assert status == HealthStatus.DEGRADED
|
|
||||||
|
|
||||||
def test_unhealthy_status(self):
|
|
||||||
tracker = _make_health_tracker(_unhealthy_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
|
|
||||||
status = monitor.get_plugin_health_status("plugin_c")
|
|
||||||
assert status == HealthStatus.UNHEALTHY
|
|
||||||
|
|
||||||
def test_open_circuit_breaker_is_unhealthy(self):
|
|
||||||
summary = _healthy_summary()
|
|
||||||
summary["circuit_state"] = "open"
|
|
||||||
tracker = _make_health_tracker(summary)
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
status = monitor.get_plugin_health_status("plugin_d")
|
|
||||||
assert status == HealthStatus.UNHEALTHY
|
|
||||||
|
|
||||||
def test_unknown_when_no_tracker(self):
|
|
||||||
monitor = PluginHealthMonitor(health_tracker=None)
|
|
||||||
status = monitor.get_plugin_health_status("plugin_e")
|
|
||||||
assert status == HealthStatus.UNKNOWN
|
|
||||||
|
|
||||||
def test_unknown_when_no_summary(self):
|
|
||||||
tracker = _make_health_tracker(None)
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
status = monitor.get_plugin_health_status("plugin_f")
|
|
||||||
assert status == HealthStatus.UNKNOWN
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_plugin_health_metrics
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetPluginHealthMetrics:
|
|
||||||
def test_healthy_metrics(self):
|
|
||||||
tracker = _make_health_tracker(_healthy_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_a")
|
|
||||||
assert isinstance(metrics, HealthMetrics)
|
|
||||||
assert metrics.status == HealthStatus.HEALTHY
|
|
||||||
assert metrics.success_rate == pytest.approx(1.0)
|
|
||||||
assert metrics.error_rate == pytest.approx(0.0)
|
|
||||||
|
|
||||||
def test_degraded_metrics(self):
|
|
||||||
tracker = _make_health_tracker(_degraded_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_b")
|
|
||||||
assert metrics.status == HealthStatus.DEGRADED
|
|
||||||
assert metrics.consecutive_failures == 3
|
|
||||||
|
|
||||||
def test_unhealthy_metrics(self):
|
|
||||||
tracker = _make_health_tracker(_unhealthy_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_c")
|
|
||||||
assert metrics.status == HealthStatus.UNHEALTHY
|
|
||||||
assert metrics.circuit_breaker_state == "open"
|
|
||||||
assert metrics.last_error is not None
|
|
||||||
|
|
||||||
def test_metrics_without_tracker(self):
|
|
||||||
monitor = PluginHealthMonitor(health_tracker=None)
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_d")
|
|
||||||
assert metrics.status == HealthStatus.UNKNOWN
|
|
||||||
assert metrics.plugin_id == "plugin_d"
|
|
||||||
|
|
||||||
def test_metrics_without_summary(self):
|
|
||||||
tracker = _make_health_tracker(None)
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_e")
|
|
||||||
assert metrics.status == HealthStatus.UNKNOWN
|
|
||||||
|
|
||||||
def test_last_successful_update_parsed(self):
|
|
||||||
summary = _healthy_summary()
|
|
||||||
summary["last_success_time"] = "2024-06-01T12:00:00"
|
|
||||||
tracker = _make_health_tracker(summary)
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_a")
|
|
||||||
assert metrics.last_successful_update is not None
|
|
||||||
assert isinstance(metrics.last_successful_update, datetime)
|
|
||||||
|
|
||||||
def test_invalid_last_success_time_handled(self):
|
|
||||||
summary = _healthy_summary()
|
|
||||||
summary["last_success_time"] = "not-a-date"
|
|
||||||
tracker = _make_health_tracker(summary)
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
# Should not raise
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_a")
|
|
||||||
assert metrics.last_successful_update is None
|
|
||||||
|
|
||||||
def test_total_successes_failures(self):
|
|
||||||
tracker = _make_health_tracker(_degraded_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
|
|
||||||
metrics = monitor.get_plugin_health_metrics("plugin_b")
|
|
||||||
assert metrics.total_failures == 6
|
|
||||||
assert metrics.total_successes == 4
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_all_plugin_health
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetAllPluginHealth:
|
|
||||||
def test_returns_empty_without_tracker(self):
|
|
||||||
monitor = PluginHealthMonitor(health_tracker=None)
|
|
||||||
result = monitor.get_all_plugin_health()
|
|
||||||
assert result == {}
|
|
||||||
|
|
||||||
def test_returns_metrics_for_each_plugin(self):
|
|
||||||
all_summaries = {
|
|
||||||
"plugin_a": _healthy_summary(),
|
|
||||||
"plugin_b": _degraded_summary(),
|
|
||||||
}
|
|
||||||
tracker = MagicMock()
|
|
||||||
tracker.get_all_health_summaries.return_value = all_summaries
|
|
||||||
tracker.get_health_summary.side_effect = lambda pid: all_summaries.get(pid)
|
|
||||||
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
|
|
||||||
result = monitor.get_all_plugin_health()
|
|
||||||
assert "plugin_a" in result
|
|
||||||
assert "plugin_b" in result
|
|
||||||
assert isinstance(result["plugin_a"], HealthMetrics)
|
|
||||||
|
|
||||||
def test_returns_empty_when_no_summaries(self):
|
|
||||||
tracker = _make_health_tracker(all_summaries={})
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
result = monitor.get_all_plugin_health()
|
|
||||||
assert result == {}
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# _get_recovery_suggestions
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetRecoverySuggestions:
|
|
||||||
def test_healthy_plugin_suggestion(self):
|
|
||||||
tracker = _make_health_tracker(_healthy_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker)
|
|
||||||
suggestions = monitor._get_recovery_suggestions("p", _healthy_summary(), HealthStatus.HEALTHY)
|
|
||||||
assert any("healthy" in s.lower() for s in suggestions)
|
|
||||||
|
|
||||||
def test_unhealthy_suggestions(self):
|
|
||||||
tracker = _make_health_tracker(_unhealthy_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
|
|
||||||
suggestions = monitor._get_recovery_suggestions("p", _unhealthy_summary(), HealthStatus.UNHEALTHY)
|
|
||||||
assert len(suggestions) > 0
|
|
||||||
assert any("unhealthy" in s.lower() for s in suggestions)
|
|
||||||
|
|
||||||
def test_open_circuit_breaker_suggestion(self):
|
|
||||||
summary = _unhealthy_summary()
|
|
||||||
summary["circuit_state"] = "open"
|
|
||||||
tracker = _make_health_tracker(summary)
|
|
||||||
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
|
|
||||||
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
|
|
||||||
assert any("circuit" in s.lower() for s in suggestions)
|
|
||||||
|
|
||||||
def test_timeout_error_suggestion(self):
|
|
||||||
summary = _degraded_summary()
|
|
||||||
summary["last_error"] = "connection timeout occurred"
|
|
||||||
tracker = _make_health_tracker(summary)
|
|
||||||
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
|
|
||||||
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.DEGRADED)
|
|
||||||
assert any("timeout" in s.lower() for s in suggestions)
|
|
||||||
|
|
||||||
def test_import_error_suggestion(self):
|
|
||||||
summary = _unhealthy_summary()
|
|
||||||
summary["last_error"] = "ImportError: missing module"
|
|
||||||
tracker = _make_health_tracker(summary)
|
|
||||||
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
|
|
||||||
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
|
|
||||||
assert any("dependencies" in s.lower() or "import" in s.lower() or "missing" in s.lower()
|
|
||||||
for s in suggestions)
|
|
||||||
|
|
||||||
def test_permission_error_suggestion(self):
|
|
||||||
summary = _unhealthy_summary()
|
|
||||||
summary["last_error"] = "permission denied to access resource"
|
|
||||||
tracker = _make_health_tracker(summary)
|
|
||||||
monitor = PluginHealthMonitor(tracker, unhealthy_threshold=0.8)
|
|
||||||
suggestions = monitor._get_recovery_suggestions("p", summary, HealthStatus.UNHEALTHY)
|
|
||||||
assert any("permission" in s.lower() for s in suggestions)
|
|
||||||
|
|
||||||
def test_degraded_suggestions_include_error_rate(self):
|
|
||||||
tracker = _make_health_tracker(_degraded_summary())
|
|
||||||
monitor = PluginHealthMonitor(tracker, degraded_threshold=0.5, unhealthy_threshold=0.8)
|
|
||||||
suggestions = monitor._get_recovery_suggestions("p", _degraded_summary(), HealthStatus.DEGRADED)
|
|
||||||
assert any("%" in s for s in suggestions)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# start / stop monitoring
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestMonitorLifecycle:
|
|
||||||
def test_start_monitoring(self, monitor):
|
|
||||||
monitor.start_monitoring()
|
|
||||||
try:
|
|
||||||
assert monitor._monitor_thread is not None
|
|
||||||
assert monitor._monitor_thread.is_alive()
|
|
||||||
finally:
|
|
||||||
monitor.stop_monitoring()
|
|
||||||
|
|
||||||
def test_stop_monitoring(self, monitor):
|
|
||||||
monitor.start_monitoring()
|
|
||||||
monitor.stop_monitoring()
|
|
||||||
# Thread should no longer be alive
|
|
||||||
assert not monitor._monitor_thread.is_alive()
|
|
||||||
|
|
||||||
def test_double_start_no_duplicate_threads(self, monitor):
|
|
||||||
monitor.start_monitoring()
|
|
||||||
try:
|
|
||||||
thread1 = monitor._monitor_thread
|
|
||||||
monitor.start_monitoring() # should be idempotent
|
|
||||||
assert monitor._monitor_thread is thread1
|
|
||||||
finally:
|
|
||||||
monitor.stop_monitoring()
|
|
||||||
|
|
||||||
def test_register_health_check(self, monitor):
|
|
||||||
callback = MagicMock()
|
|
||||||
monitor.register_health_check(callback)
|
|
||||||
assert callback in monitor._health_check_callbacks
|
|
||||||
@@ -1,129 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/logo_downloader.py
|
|
||||||
|
|
||||||
Focuses on the pure/static methods that don't require network calls:
|
|
||||||
normalize_abbreviation, get_logo_filename_variations, get_logo_directory,
|
|
||||||
ensure_logo_directory, and the download_missing_logo function path
|
|
||||||
(with HTTP mocked).
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import pytest
|
|
||||||
from pathlib import Path
|
|
||||||
from unittest.mock import patch, Mock, MagicMock
|
|
||||||
|
|
||||||
from src.logo_downloader import LogoDownloader
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# normalize_abbreviation
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestNormalizeAbbreviation:
|
|
||||||
def test_basic_lowercase(self):
|
|
||||||
result = LogoDownloader.normalize_abbreviation("lal")
|
|
||||||
assert result == "LAL"
|
|
||||||
|
|
||||||
def test_uppercases(self):
|
|
||||||
result = LogoDownloader.normalize_abbreviation("bos")
|
|
||||||
assert result == "BOS"
|
|
||||||
|
|
||||||
def test_ampersand_replaced(self):
|
|
||||||
result = LogoDownloader.normalize_abbreviation("TA&M")
|
|
||||||
assert "&" not in result
|
|
||||||
assert "AND" in result
|
|
||||||
|
|
||||||
def test_forward_slash_replaced(self):
|
|
||||||
result = LogoDownloader.normalize_abbreviation("A/B")
|
|
||||||
assert "/" not in result
|
|
||||||
|
|
||||||
def test_empty_returns_empty(self):
|
|
||||||
result = LogoDownloader.normalize_abbreviation("")
|
|
||||||
assert result == ""
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_logo_filename_variations
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetLogoFilenameVariations:
|
|
||||||
def test_returns_list(self):
|
|
||||||
result = LogoDownloader.get_logo_filename_variations("LAL")
|
|
||||||
assert isinstance(result, list)
|
|
||||||
assert len(result) > 0
|
|
||||||
|
|
||||||
def test_includes_png(self):
|
|
||||||
result = LogoDownloader.get_logo_filename_variations("KC")
|
|
||||||
filenames = " ".join(result)
|
|
||||||
assert ".png" in filenames
|
|
||||||
|
|
||||||
def test_includes_original(self):
|
|
||||||
result = LogoDownloader.get_logo_filename_variations("LAL")
|
|
||||||
assert any("LAL" in f for f in result)
|
|
||||||
|
|
||||||
def test_ampersand_variation(self):
|
|
||||||
result = LogoDownloader.get_logo_filename_variations("TA&M")
|
|
||||||
# Should produce at least the normalized version
|
|
||||||
assert len(result) > 0
|
|
||||||
|
|
||||||
def test_empty_string_no_crash(self):
|
|
||||||
result = LogoDownloader.get_logo_filename_variations("")
|
|
||||||
assert isinstance(result, list)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_logo_directory
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetLogoDirectory:
|
|
||||||
def test_known_sport_returns_string(self):
|
|
||||||
downloader = LogoDownloader()
|
|
||||||
result = downloader.get_logo_directory("nfl")
|
|
||||||
assert isinstance(result, str)
|
|
||||||
assert len(result) > 0
|
|
||||||
|
|
||||||
def test_known_sport_nba(self):
|
|
||||||
downloader = LogoDownloader()
|
|
||||||
result = downloader.get_logo_directory("nba")
|
|
||||||
assert "nba" in result.lower() or "sports" in result.lower()
|
|
||||||
|
|
||||||
def test_unknown_sport_returns_string(self):
|
|
||||||
downloader = LogoDownloader()
|
|
||||||
result = downloader.get_logo_directory("unknown_sport_xyz")
|
|
||||||
assert isinstance(result, str)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# ensure_logo_directory
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestEnsureLogoDirectory:
|
|
||||||
def test_creates_writable_directory(self, tmp_path):
|
|
||||||
downloader = LogoDownloader()
|
|
||||||
test_dir = str(tmp_path / "logos" / "nfl")
|
|
||||||
result = downloader.ensure_logo_directory(test_dir)
|
|
||||||
assert result is True
|
|
||||||
assert Path(test_dir).is_dir()
|
|
||||||
|
|
||||||
def test_existing_writable_directory(self, tmp_path):
|
|
||||||
downloader = LogoDownloader()
|
|
||||||
test_dir = str(tmp_path)
|
|
||||||
result = downloader.ensure_logo_directory(test_dir)
|
|
||||||
assert result is True
|
|
||||||
|
|
||||||
def test_returns_false_when_write_test_fails(self, tmp_path):
|
|
||||||
"""Simulate a directory that exists but raises PermissionError on write."""
|
|
||||||
downloader = LogoDownloader()
|
|
||||||
test_dir = str(tmp_path / "logos")
|
|
||||||
|
|
||||||
import builtins
|
|
||||||
original_open = builtins.open
|
|
||||||
|
|
||||||
def mock_open(path, *args, **kwargs):
|
|
||||||
if ".write_test" in str(path):
|
|
||||||
raise PermissionError("no write access")
|
|
||||||
return original_open(path, *args, **kwargs)
|
|
||||||
|
|
||||||
with patch("builtins.open", side_effect=mock_open):
|
|
||||||
result = downloader.ensure_logo_directory(test_dir)
|
|
||||||
assert result is False
|
|
||||||
@@ -1,317 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/common/scroll_helper.py
|
|
||||||
|
|
||||||
Covers ScrollHelper: create_scrolling_image, update_scroll_position,
|
|
||||||
get_visible_portion, calculate_dynamic_duration, set_* methods,
|
|
||||||
reset_scroll, clear_cache, get_scroll_info.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import time
|
|
||||||
from unittest.mock import patch
|
|
||||||
from PIL import Image
|
|
||||||
|
|
||||||
from src.common.scroll_helper import ScrollHelper
|
|
||||||
|
|
||||||
|
|
||||||
DISPLAY_W = 64
|
|
||||||
DISPLAY_H = 32
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def helper():
|
|
||||||
return ScrollHelper(display_width=DISPLAY_W, display_height=DISPLAY_H)
|
|
||||||
|
|
||||||
|
|
||||||
def _make_image(width: int = 64, height: int = 32, color=(255, 0, 0)) -> Image.Image:
|
|
||||||
img = Image.new("RGB", (width, height), color)
|
|
||||||
return img
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# __init__ / initial state
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestScrollHelperInit:
|
|
||||||
def test_initial_scroll_position(self, helper):
|
|
||||||
assert helper.scroll_position == 0.0
|
|
||||||
|
|
||||||
def test_initial_scroll_complete_false(self, helper):
|
|
||||||
assert helper.scroll_complete is False
|
|
||||||
|
|
||||||
def test_display_dimensions(self, helper):
|
|
||||||
assert helper.display_width == DISPLAY_W
|
|
||||||
assert helper.display_height == DISPLAY_H
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# create_scrolling_image
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestCreateScrollingImage:
|
|
||||||
def test_empty_content_returns_blank_image(self, helper):
|
|
||||||
result = helper.create_scrolling_image([])
|
|
||||||
assert isinstance(result, Image.Image)
|
|
||||||
assert helper.total_scroll_width == 0
|
|
||||||
|
|
||||||
def test_single_item_creates_image(self, helper):
|
|
||||||
img = _make_image(width=100)
|
|
||||||
result = helper.create_scrolling_image([img])
|
|
||||||
assert isinstance(result, Image.Image)
|
|
||||||
assert result.width > DISPLAY_W # includes leading gap
|
|
||||||
|
|
||||||
def test_multiple_items_wider_image(self, helper):
|
|
||||||
items = [_make_image(width=50), _make_image(width=50)]
|
|
||||||
result = helper.create_scrolling_image(items)
|
|
||||||
# Should be wider than two items alone
|
|
||||||
assert result.width > 100
|
|
||||||
|
|
||||||
def test_scroll_position_reset(self, helper):
|
|
||||||
helper.scroll_position = 500.0
|
|
||||||
helper.create_scrolling_image([_make_image()])
|
|
||||||
assert helper.scroll_position == 0.0
|
|
||||||
|
|
||||||
def test_cached_array_set(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image()])
|
|
||||||
assert helper.cached_array is not None
|
|
||||||
|
|
||||||
def test_scroll_complete_reset(self, helper):
|
|
||||||
helper.scroll_complete = True
|
|
||||||
helper.create_scrolling_image([_make_image()])
|
|
||||||
assert helper.scroll_complete is False
|
|
||||||
|
|
||||||
def test_total_scroll_width_matches_image(self, helper):
|
|
||||||
img = _make_image(width=200)
|
|
||||||
result = helper.create_scrolling_image([img])
|
|
||||||
assert helper.total_scroll_width == result.width
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# set_scrolling_image
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestSetScrollingImage:
|
|
||||||
def test_sets_cached_image(self, helper):
|
|
||||||
img = _make_image(width=200)
|
|
||||||
helper.set_scrolling_image(img)
|
|
||||||
assert helper.cached_image is img
|
|
||||||
|
|
||||||
def test_sets_cached_array(self, helper):
|
|
||||||
img = _make_image(width=200)
|
|
||||||
helper.set_scrolling_image(img)
|
|
||||||
assert helper.cached_array is not None
|
|
||||||
|
|
||||||
def test_scroll_width_matches_image(self, helper):
|
|
||||||
img = _make_image(width=300)
|
|
||||||
helper.set_scrolling_image(img)
|
|
||||||
assert helper.total_scroll_width == 300
|
|
||||||
|
|
||||||
def test_none_clears_cache(self, helper):
|
|
||||||
helper.set_scrolling_image(_make_image())
|
|
||||||
helper.set_scrolling_image(None)
|
|
||||||
assert helper.cached_image is None
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# update_scroll_position (time-based mode)
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestUpdateScrollPosition:
|
|
||||||
def test_position_advances_over_time(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=200)])
|
|
||||||
helper.scroll_speed = 100.0 # 100 px/s
|
|
||||||
helper.last_update_time = time.time() - 0.1 # pretend 100ms elapsed
|
|
||||||
initial = helper.scroll_position
|
|
||||||
helper.update_scroll_position()
|
|
||||||
assert helper.scroll_position > initial
|
|
||||||
|
|
||||||
def test_no_advance_without_image(self, helper):
|
|
||||||
helper.update_scroll_position() # no image, should not crash
|
|
||||||
assert helper.scroll_position == 0.0
|
|
||||||
|
|
||||||
def test_zero_width_content_stays_zero(self, helper):
|
|
||||||
helper.create_scrolling_image([]) # empty → width 0
|
|
||||||
helper.update_scroll_position()
|
|
||||||
assert helper.scroll_position == 0.0
|
|
||||||
|
|
||||||
def test_scroll_complete_clamped(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=100)])
|
|
||||||
# Force position past the end
|
|
||||||
helper.scroll_position = helper.total_scroll_width + 50
|
|
||||||
helper.total_distance_scrolled = helper.total_scroll_width + 50
|
|
||||||
helper.update_scroll_position()
|
|
||||||
assert helper.scroll_complete is True
|
|
||||||
assert helper.scroll_position <= helper.total_scroll_width
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_visible_portion
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetVisiblePortion:
|
|
||||||
def test_returns_none_without_image(self, helper):
|
|
||||||
assert helper.get_visible_portion() is None
|
|
||||||
|
|
||||||
def test_returns_image_sized_to_display(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=200)])
|
|
||||||
visible = helper.get_visible_portion()
|
|
||||||
assert visible is not None
|
|
||||||
assert visible.width == DISPLAY_W
|
|
||||||
assert visible.height == DISPLAY_H
|
|
||||||
|
|
||||||
def test_different_positions_give_different_images(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=300)])
|
|
||||||
img1 = helper.get_visible_portion()
|
|
||||||
helper.scroll_position = 50
|
|
||||||
img2 = helper.get_visible_portion()
|
|
||||||
# Images should differ (colour from scrolled content)
|
|
||||||
# Just verify both are valid PIL images with correct size
|
|
||||||
assert img1.width == img2.width == DISPLAY_W
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# reset_scroll / clear_cache
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestResetAndClear:
|
|
||||||
def test_reset_restores_position(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=200)])
|
|
||||||
helper.scroll_position = 100.0
|
|
||||||
helper.reset_scroll()
|
|
||||||
assert helper.scroll_position == 0.0
|
|
||||||
|
|
||||||
def test_reset_clears_complete_flag(self, helper):
|
|
||||||
helper.scroll_complete = True
|
|
||||||
helper.reset_scroll()
|
|
||||||
assert helper.scroll_complete is False
|
|
||||||
|
|
||||||
def test_reset_alias(self, helper):
|
|
||||||
helper.scroll_position = 50.0
|
|
||||||
helper.reset()
|
|
||||||
assert helper.scroll_position == 0.0
|
|
||||||
|
|
||||||
def test_clear_cache(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image()])
|
|
||||||
helper.clear_cache()
|
|
||||||
assert helper.cached_image is None
|
|
||||||
assert helper.cached_array is None
|
|
||||||
assert helper.total_scroll_width == 0
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# calculate_dynamic_duration
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestCalculateDynamicDuration:
|
|
||||||
def test_returns_min_when_disabled(self, helper):
|
|
||||||
helper.dynamic_duration_enabled = False
|
|
||||||
helper.min_duration = 30
|
|
||||||
result = helper.calculate_dynamic_duration()
|
|
||||||
assert result == 30
|
|
||||||
|
|
||||||
def test_returns_min_when_no_content(self, helper):
|
|
||||||
helper.total_scroll_width = 0
|
|
||||||
helper.min_duration = 30
|
|
||||||
result = helper.calculate_dynamic_duration()
|
|
||||||
assert result == 30
|
|
||||||
|
|
||||||
def test_respects_min_duration(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=50)])
|
|
||||||
helper.min_duration = 60
|
|
||||||
helper.max_duration = 300
|
|
||||||
helper.scroll_speed = 500.0 # very fast → very short time
|
|
||||||
result = helper.calculate_dynamic_duration()
|
|
||||||
assert result >= 60
|
|
||||||
|
|
||||||
def test_respects_max_duration(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=5000)])
|
|
||||||
helper.min_duration = 10
|
|
||||||
helper.max_duration = 60
|
|
||||||
helper.scroll_speed = 1.0 # very slow → very long time
|
|
||||||
result = helper.calculate_dynamic_duration()
|
|
||||||
assert result <= 60
|
|
||||||
|
|
||||||
def test_time_based_calculation(self, helper):
|
|
||||||
helper.create_scrolling_image([_make_image(width=200)])
|
|
||||||
helper.scroll_speed = 100.0
|
|
||||||
helper.min_duration = 1
|
|
||||||
helper.max_duration = 600
|
|
||||||
helper.frame_based_scrolling = False
|
|
||||||
result = helper.calculate_dynamic_duration()
|
|
||||||
assert isinstance(result, int)
|
|
||||||
assert result > 0
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# set_* configuration methods
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestSetMethods:
|
|
||||||
def test_set_scroll_speed_time_based(self, helper):
|
|
||||||
helper.frame_based_scrolling = False
|
|
||||||
helper.set_scroll_speed(50.0)
|
|
||||||
assert helper.scroll_speed == 50.0
|
|
||||||
|
|
||||||
def test_set_scroll_speed_clamped_low(self, helper):
|
|
||||||
helper.frame_based_scrolling = False
|
|
||||||
helper.set_scroll_speed(0.0)
|
|
||||||
assert helper.scroll_speed >= 1.0
|
|
||||||
|
|
||||||
def test_set_scroll_speed_clamped_high(self, helper):
|
|
||||||
helper.frame_based_scrolling = False
|
|
||||||
helper.set_scroll_speed(10000.0)
|
|
||||||
assert helper.scroll_speed <= 500.0
|
|
||||||
|
|
||||||
def test_set_scroll_delay(self, helper):
|
|
||||||
helper.set_scroll_delay(0.05)
|
|
||||||
assert helper.scroll_delay == 0.05
|
|
||||||
|
|
||||||
def test_set_scroll_delay_clamped(self, helper):
|
|
||||||
helper.set_scroll_delay(0.0001)
|
|
||||||
assert helper.scroll_delay >= 0.001
|
|
||||||
|
|
||||||
def test_set_target_fps(self, helper):
|
|
||||||
helper.set_target_fps(60.0)
|
|
||||||
assert helper.target_fps == 60.0
|
|
||||||
|
|
||||||
def test_set_target_fps_clamped(self, helper):
|
|
||||||
helper.set_target_fps(1000.0)
|
|
||||||
assert helper.target_fps <= 200.0
|
|
||||||
|
|
||||||
def test_set_sub_pixel_scrolling(self, helper):
|
|
||||||
helper.set_sub_pixel_scrolling(True)
|
|
||||||
assert helper.sub_pixel_scrolling is True
|
|
||||||
helper.set_sub_pixel_scrolling(False)
|
|
||||||
assert helper.sub_pixel_scrolling is False
|
|
||||||
|
|
||||||
def test_set_frame_based_scrolling(self, helper):
|
|
||||||
helper.set_frame_based_scrolling(True)
|
|
||||||
assert helper.frame_based_scrolling is True
|
|
||||||
|
|
||||||
def test_set_dynamic_duration_settings(self, helper):
|
|
||||||
helper.set_dynamic_duration_settings(enabled=True, min_duration=20, max_duration=120, buffer=0.2)
|
|
||||||
assert helper.dynamic_duration_enabled is True
|
|
||||||
assert helper.min_duration == 20
|
|
||||||
assert helper.max_duration == 120
|
|
||||||
assert helper.duration_buffer == pytest.approx(0.2)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_scroll_info
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetScrollInfo:
|
|
||||||
def test_returns_dict(self, helper):
|
|
||||||
info = helper.get_scroll_info()
|
|
||||||
assert isinstance(info, dict)
|
|
||||||
|
|
||||||
def test_required_keys(self, helper):
|
|
||||||
info = helper.get_scroll_info()
|
|
||||||
for key in ("scroll_position", "total_distance_scrolled", "scroll_speed",
|
|
||||||
"scroll_complete", "dynamic_duration"):
|
|
||||||
assert key in info
|
|
||||||
|
|
||||||
def test_scroll_position_reflected(self, helper):
|
|
||||||
helper.scroll_position = 42.0
|
|
||||||
info = helper.get_scroll_info()
|
|
||||||
assert info["scroll_position"] == 42.0
|
|
||||||
@@ -1,329 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/common/utils.py
|
|
||||||
|
|
||||||
Covers all pure utility functions: normalize_team_abbreviation, format_time,
|
|
||||||
format_date, get_timezone, validate_dimensions, parse_team_abbreviation,
|
|
||||||
format_score, format_period, is_live_game, is_final_game, is_upcoming_game,
|
|
||||||
sanitize_filename, truncate_text, parse_boolean.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
import pytz
|
|
||||||
|
|
||||||
from src.common.utils import (
|
|
||||||
normalize_team_abbreviation,
|
|
||||||
format_time,
|
|
||||||
format_date,
|
|
||||||
get_timezone,
|
|
||||||
validate_dimensions,
|
|
||||||
parse_team_abbreviation,
|
|
||||||
format_score,
|
|
||||||
format_period,
|
|
||||||
is_live_game,
|
|
||||||
is_final_game,
|
|
||||||
is_upcoming_game,
|
|
||||||
sanitize_filename,
|
|
||||||
truncate_text,
|
|
||||||
parse_boolean,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# normalize_team_abbreviation
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestNormalizeTeamAbbreviation:
|
|
||||||
def test_basic_uppercase(self):
|
|
||||||
assert normalize_team_abbreviation("lal") == "LAL"
|
|
||||||
|
|
||||||
def test_strips_spaces(self):
|
|
||||||
assert normalize_team_abbreviation(" KC ") == "KC"
|
|
||||||
|
|
||||||
def test_replaces_ampersand(self):
|
|
||||||
assert normalize_team_abbreviation("TA&M") == "TAANDM"
|
|
||||||
|
|
||||||
def test_removes_internal_spaces(self):
|
|
||||||
assert normalize_team_abbreviation("A B") == "AB"
|
|
||||||
|
|
||||||
def test_removes_hyphens(self):
|
|
||||||
assert normalize_team_abbreviation("A-B") == "AB"
|
|
||||||
|
|
||||||
def test_empty_string_returns_empty(self):
|
|
||||||
assert normalize_team_abbreviation("") == ""
|
|
||||||
|
|
||||||
def test_none_returns_empty(self):
|
|
||||||
assert normalize_team_abbreviation(None) == ""
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# format_time / format_date
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestFormatTime:
|
|
||||||
def _utc_dt(self, hour=20, minute=30):
|
|
||||||
return datetime(2024, 1, 15, hour, minute, 0, tzinfo=timezone.utc)
|
|
||||||
|
|
||||||
def test_formats_utc_to_utc(self):
|
|
||||||
dt = self._utc_dt(20, 30)
|
|
||||||
result = format_time(dt, timezone_str="UTC")
|
|
||||||
# 20:30 UTC → "8:30PM" (leading zero stripped)
|
|
||||||
assert "8:30PM" in result or "8:30 PM" in result or result != ""
|
|
||||||
|
|
||||||
def test_naive_datetime_treated_as_utc(self):
|
|
||||||
dt = datetime(2024, 1, 15, 12, 0, 0) # naive
|
|
||||||
result = format_time(dt, timezone_str="UTC")
|
|
||||||
assert result != ""
|
|
||||||
|
|
||||||
def test_invalid_timezone_returns_empty(self):
|
|
||||||
dt = self._utc_dt()
|
|
||||||
result = format_time(dt, timezone_str="Invalid/TZ")
|
|
||||||
assert result == ""
|
|
||||||
|
|
||||||
def test_eastern_timezone(self):
|
|
||||||
dt = self._utc_dt(20, 0) # 8 PM UTC = 3 PM ET
|
|
||||||
result = format_time(dt, timezone_str="America/New_York")
|
|
||||||
assert result != ""
|
|
||||||
|
|
||||||
|
|
||||||
class TestFormatDate:
|
|
||||||
def test_formats_date(self):
|
|
||||||
dt = datetime(2024, 6, 15, 18, 0, 0, tzinfo=timezone.utc)
|
|
||||||
result = format_date(dt, timezone_str="UTC")
|
|
||||||
assert "June" in result or "15" in result
|
|
||||||
|
|
||||||
def test_naive_datetime(self):
|
|
||||||
dt = datetime(2024, 3, 10, 12, 0, 0)
|
|
||||||
result = format_date(dt, timezone_str="UTC")
|
|
||||||
assert result != ""
|
|
||||||
|
|
||||||
def test_invalid_timezone_returns_empty(self):
|
|
||||||
dt = datetime(2024, 6, 15, 18, 0, 0, tzinfo=timezone.utc)
|
|
||||||
result = format_date(dt, timezone_str="BadZone/Here")
|
|
||||||
assert result == ""
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_timezone
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetTimezone:
|
|
||||||
def test_valid_timezone(self):
|
|
||||||
tz = get_timezone("America/New_York")
|
|
||||||
assert tz is not None
|
|
||||||
|
|
||||||
def test_utc(self):
|
|
||||||
tz = get_timezone("UTC")
|
|
||||||
assert tz is pytz.utc or str(tz) == "UTC"
|
|
||||||
|
|
||||||
def test_invalid_returns_utc(self):
|
|
||||||
tz = get_timezone("Not/ATimezone")
|
|
||||||
assert tz is pytz.utc
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# validate_dimensions
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestValidateDimensions:
|
|
||||||
def test_valid(self):
|
|
||||||
assert validate_dimensions(64, 32) is True
|
|
||||||
|
|
||||||
def test_zero_width(self):
|
|
||||||
assert validate_dimensions(0, 32) is False
|
|
||||||
|
|
||||||
def test_zero_height(self):
|
|
||||||
assert validate_dimensions(64, 0) is False
|
|
||||||
|
|
||||||
def test_negative(self):
|
|
||||||
assert validate_dimensions(-1, 32) is False
|
|
||||||
|
|
||||||
def test_too_large(self):
|
|
||||||
assert validate_dimensions(1001, 32) is False
|
|
||||||
|
|
||||||
def test_max_valid(self):
|
|
||||||
assert validate_dimensions(1000, 1000) is True
|
|
||||||
|
|
||||||
def test_non_integer(self):
|
|
||||||
assert validate_dimensions("64", 32) is False # type: ignore[arg-type]
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# parse_team_abbreviation
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestParseTeamAbbreviation:
|
|
||||||
def test_empty_string(self):
|
|
||||||
assert parse_team_abbreviation("") == ""
|
|
||||||
|
|
||||||
def test_none_returns_empty(self):
|
|
||||||
assert parse_team_abbreviation(None) == ""
|
|
||||||
|
|
||||||
def test_extracts_uppercase(self):
|
|
||||||
result = parse_team_abbreviation("LAL")
|
|
||||||
assert result == "LAL"
|
|
||||||
|
|
||||||
def test_fallback_first_three(self):
|
|
||||||
# text without recognisable 2-4 char uppercase block
|
|
||||||
result = parse_team_abbreviation("ab")
|
|
||||||
assert len(result) <= 3
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# format_score
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestFormatScore:
|
|
||||||
def test_format_score(self):
|
|
||||||
assert format_score(14, 7) == "7-14"
|
|
||||||
|
|
||||||
def test_format_score_strings(self):
|
|
||||||
assert format_score("21", "14") == "14-21"
|
|
||||||
|
|
||||||
def test_zero_zero(self):
|
|
||||||
assert format_score(0, 0) == "0-0"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# format_period
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestFormatPeriod:
|
|
||||||
def test_basketball_q1(self):
|
|
||||||
assert format_period(1, "basketball") == "Q1"
|
|
||||||
|
|
||||||
def test_basketball_q4(self):
|
|
||||||
assert format_period(4, "basketball") == "Q4"
|
|
||||||
|
|
||||||
def test_basketball_ot1(self):
|
|
||||||
assert format_period(5, "basketball") == "OT1"
|
|
||||||
|
|
||||||
def test_basketball_ot2(self):
|
|
||||||
assert format_period(6, "basketball") == "OT2"
|
|
||||||
|
|
||||||
def test_football_q1(self):
|
|
||||||
assert format_period(1, "football") == "Q1"
|
|
||||||
|
|
||||||
def test_football_ot(self):
|
|
||||||
assert format_period(5, "football") == "OT1"
|
|
||||||
|
|
||||||
def test_hockey_p1(self):
|
|
||||||
assert format_period(1, "hockey") == "P1"
|
|
||||||
|
|
||||||
def test_hockey_p3(self):
|
|
||||||
assert format_period(3, "hockey") == "P3"
|
|
||||||
|
|
||||||
def test_hockey_ot(self):
|
|
||||||
assert format_period(4, "hockey") == "OT1"
|
|
||||||
|
|
||||||
def test_baseball_inning(self):
|
|
||||||
assert format_period(7, "baseball") == "INN 7"
|
|
||||||
|
|
||||||
def test_unknown_sport(self):
|
|
||||||
result = format_period(2, "unknown")
|
|
||||||
assert "2" in result
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# is_live_game / is_final_game / is_upcoming_game
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGameStatusHelpers:
|
|
||||||
def test_is_live_game_true(self):
|
|
||||||
assert is_live_game("In Progress") is True
|
|
||||||
assert is_live_game("halftime") is True
|
|
||||||
assert is_live_game("overtime") is True
|
|
||||||
|
|
||||||
def test_is_live_game_false(self):
|
|
||||||
assert is_live_game("Final") is False
|
|
||||||
assert is_live_game("Scheduled") is False
|
|
||||||
|
|
||||||
def test_is_final_game_true(self):
|
|
||||||
assert is_final_game("Final") is True
|
|
||||||
assert is_final_game("COMPLETED") is True
|
|
||||||
|
|
||||||
def test_is_final_game_false(self):
|
|
||||||
assert is_final_game("In Progress") is False
|
|
||||||
|
|
||||||
def test_is_upcoming_game_true(self):
|
|
||||||
assert is_upcoming_game("Scheduled") is True
|
|
||||||
assert is_upcoming_game("upcoming") is True
|
|
||||||
|
|
||||||
def test_is_upcoming_game_false(self):
|
|
||||||
assert is_upcoming_game("Final") is False
|
|
||||||
assert is_upcoming_game("In Progress") is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# sanitize_filename
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestSanitizeFilename:
|
|
||||||
def test_removes_invalid_chars(self):
|
|
||||||
result = sanitize_filename('file<>:"/\\|?*.txt')
|
|
||||||
assert "<" not in result
|
|
||||||
assert ">" not in result
|
|
||||||
assert ":" not in result
|
|
||||||
|
|
||||||
def test_collapses_underscores(self):
|
|
||||||
result = sanitize_filename("file___name")
|
|
||||||
assert "__" not in result
|
|
||||||
|
|
||||||
def test_strips_leading_trailing(self):
|
|
||||||
result = sanitize_filename("_file_")
|
|
||||||
assert not result.startswith("_")
|
|
||||||
assert not result.endswith("_")
|
|
||||||
|
|
||||||
def test_normal_filename_unchanged(self):
|
|
||||||
result = sanitize_filename("my_logo")
|
|
||||||
assert result == "my_logo"
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# truncate_text
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestTruncateText:
|
|
||||||
def test_no_truncation_needed(self):
|
|
||||||
assert truncate_text("hello", 10) == "hello"
|
|
||||||
|
|
||||||
def test_truncation_adds_suffix(self):
|
|
||||||
result = truncate_text("hello world", 8)
|
|
||||||
assert result.endswith("...")
|
|
||||||
assert len(result) == 8
|
|
||||||
|
|
||||||
def test_exact_length(self):
|
|
||||||
assert truncate_text("hello", 5) == "hello"
|
|
||||||
|
|
||||||
def test_custom_suffix(self):
|
|
||||||
result = truncate_text("hello world", 8, suffix="~")
|
|
||||||
assert result.endswith("~")
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# parse_boolean
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestParseBoolean:
|
|
||||||
def test_true_bool(self):
|
|
||||||
assert parse_boolean(True) is True
|
|
||||||
|
|
||||||
def test_false_bool(self):
|
|
||||||
assert parse_boolean(False) is False
|
|
||||||
|
|
||||||
def test_int_1(self):
|
|
||||||
assert parse_boolean(1) is True
|
|
||||||
|
|
||||||
def test_int_0(self):
|
|
||||||
assert parse_boolean(0) is False
|
|
||||||
|
|
||||||
def test_string_true(self):
|
|
||||||
for val in ("true", "True", "TRUE", "1", "yes", "on", "enabled"):
|
|
||||||
assert parse_boolean(val) is True, f"Expected True for {val!r}"
|
|
||||||
|
|
||||||
def test_string_false(self):
|
|
||||||
for val in ("false", "False", "0", "no", "off", "disabled"):
|
|
||||||
assert parse_boolean(val) is False, f"Expected False for {val!r}"
|
|
||||||
|
|
||||||
def test_none_returns_false(self):
|
|
||||||
assert parse_boolean(None) is False # type: ignore[arg-type]
|
|
||||||
@@ -1,310 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for src/vegas_mode/config.py
|
|
||||||
|
|
||||||
Covers VegasModeConfig: from_config, to_dict, get_frame_interval,
|
|
||||||
is_plugin_included, get_ordered_plugins, validate, update.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from src.vegas_mode.config import VegasModeConfig
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Default construction
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestVegasModeConfigDefaults:
|
|
||||||
def test_default_disabled(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
assert cfg.enabled is False
|
|
||||||
|
|
||||||
def test_default_scroll_speed(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
assert cfg.scroll_speed == 50.0
|
|
||||||
|
|
||||||
def test_default_separator_width(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
assert cfg.separator_width == 32
|
|
||||||
|
|
||||||
def test_default_target_fps(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
assert cfg.target_fps == 125
|
|
||||||
|
|
||||||
def test_default_plugin_order_empty(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
assert cfg.plugin_order == []
|
|
||||||
|
|
||||||
def test_default_excluded_plugins_empty(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
assert len(cfg.excluded_plugins) == 0
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# from_config
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestFromConfig:
|
|
||||||
def _cfg(self, **kwargs) -> dict:
|
|
||||||
return {"display": {"vegas_scroll": kwargs}}
|
|
||||||
|
|
||||||
def test_enabled_flag(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(enabled=True))
|
|
||||||
assert cfg.enabled is True
|
|
||||||
|
|
||||||
def test_scroll_speed(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(scroll_speed=80.0))
|
|
||||||
assert cfg.scroll_speed == 80.0
|
|
||||||
|
|
||||||
def test_separator_width(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(separator_width=16))
|
|
||||||
assert cfg.separator_width == 16
|
|
||||||
|
|
||||||
def test_plugin_order(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(plugin_order=["a", "b", "c"]))
|
|
||||||
assert cfg.plugin_order == ["a", "b", "c"]
|
|
||||||
|
|
||||||
def test_excluded_plugins(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(excluded_plugins=["x", "y"]))
|
|
||||||
assert "x" in cfg.excluded_plugins
|
|
||||||
assert "y" in cfg.excluded_plugins
|
|
||||||
|
|
||||||
def test_target_fps(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(target_fps=60))
|
|
||||||
assert cfg.target_fps == 60
|
|
||||||
|
|
||||||
def test_buffer_ahead(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(buffer_ahead=3))
|
|
||||||
assert cfg.buffer_ahead == 3
|
|
||||||
|
|
||||||
def test_min_max_cycle_duration(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(min_cycle_duration=30, max_cycle_duration=120))
|
|
||||||
assert cfg.min_cycle_duration == 30
|
|
||||||
assert cfg.max_cycle_duration == 120
|
|
||||||
|
|
||||||
def test_defaults_when_missing(self):
|
|
||||||
cfg = VegasModeConfig.from_config({})
|
|
||||||
assert cfg.enabled is False
|
|
||||||
assert cfg.scroll_speed == 50.0
|
|
||||||
|
|
||||||
def test_frame_based_scrolling(self):
|
|
||||||
cfg = VegasModeConfig.from_config(self._cfg(frame_based_scrolling=False))
|
|
||||||
assert cfg.frame_based_scrolling is False
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# to_dict
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestToDict:
|
|
||||||
def test_roundtrip(self):
|
|
||||||
original = VegasModeConfig(
|
|
||||||
enabled=True,
|
|
||||||
scroll_speed=75.0,
|
|
||||||
separator_width=24,
|
|
||||||
plugin_order=["a", "b"],
|
|
||||||
excluded_plugins={"z"},
|
|
||||||
target_fps=100,
|
|
||||||
)
|
|
||||||
d = original.to_dict()
|
|
||||||
assert d["enabled"] is True
|
|
||||||
assert d["scroll_speed"] == 75.0
|
|
||||||
assert d["separator_width"] == 24
|
|
||||||
assert d["plugin_order"] == ["a", "b"]
|
|
||||||
assert "z" in d["excluded_plugins"]
|
|
||||||
assert d["target_fps"] == 100
|
|
||||||
|
|
||||||
def test_excluded_plugins_is_list(self):
|
|
||||||
cfg = VegasModeConfig(excluded_plugins={"x"})
|
|
||||||
d = cfg.to_dict()
|
|
||||||
assert isinstance(d["excluded_plugins"], list)
|
|
||||||
|
|
||||||
def test_all_keys_present(self):
|
|
||||||
d = VegasModeConfig().to_dict()
|
|
||||||
for key in ("enabled", "scroll_speed", "separator_width", "plugin_order",
|
|
||||||
"excluded_plugins", "target_fps", "buffer_ahead",
|
|
||||||
"frame_based_scrolling", "scroll_delay",
|
|
||||||
"dynamic_duration_enabled", "min_cycle_duration", "max_cycle_duration"):
|
|
||||||
assert key in d
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_frame_interval
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetFrameInterval:
|
|
||||||
def test_125fps(self):
|
|
||||||
cfg = VegasModeConfig(target_fps=125)
|
|
||||||
assert abs(cfg.get_frame_interval() - 1.0 / 125) < 1e-9
|
|
||||||
|
|
||||||
def test_60fps(self):
|
|
||||||
cfg = VegasModeConfig(target_fps=60)
|
|
||||||
assert abs(cfg.get_frame_interval() - 1.0 / 60) < 1e-6
|
|
||||||
|
|
||||||
def test_zero_fps_guarded(self):
|
|
||||||
cfg = VegasModeConfig(target_fps=0)
|
|
||||||
# Should not raise ZeroDivisionError (max(1, fps) guard)
|
|
||||||
result = cfg.get_frame_interval()
|
|
||||||
assert result == 1.0
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# is_plugin_included
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestIsPluginIncluded:
|
|
||||||
def test_not_excluded_is_included(self):
|
|
||||||
cfg = VegasModeConfig(excluded_plugins={"bad_plugin"})
|
|
||||||
assert cfg.is_plugin_included("good_plugin") is True
|
|
||||||
|
|
||||||
def test_excluded_plugin_not_included(self):
|
|
||||||
cfg = VegasModeConfig(excluded_plugins={"bad_plugin"})
|
|
||||||
assert cfg.is_plugin_included("bad_plugin") is False
|
|
||||||
|
|
||||||
def test_empty_exclusions_all_included(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
assert cfg.is_plugin_included("anything") is True
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# get_ordered_plugins
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestGetOrderedPlugins:
|
|
||||||
def test_natural_order_when_no_order_configured(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
available = ["a", "b", "c"]
|
|
||||||
result = cfg.get_ordered_plugins(available)
|
|
||||||
assert result == ["a", "b", "c"]
|
|
||||||
|
|
||||||
def test_explicit_order_followed(self):
|
|
||||||
cfg = VegasModeConfig(plugin_order=["c", "a", "b"])
|
|
||||||
available = ["a", "b", "c"]
|
|
||||||
result = cfg.get_ordered_plugins(available)
|
|
||||||
assert result == ["c", "a", "b"]
|
|
||||||
|
|
||||||
def test_unavailable_plugins_skipped(self):
|
|
||||||
cfg = VegasModeConfig(plugin_order=["c", "x", "a"])
|
|
||||||
available = ["a", "b", "c"]
|
|
||||||
result = cfg.get_ordered_plugins(available)
|
|
||||||
assert "x" not in result
|
|
||||||
assert result[:2] == ["c", "a"]
|
|
||||||
|
|
||||||
def test_excluded_plugins_removed(self):
|
|
||||||
cfg = VegasModeConfig(excluded_plugins={"b"})
|
|
||||||
available = ["a", "b", "c"]
|
|
||||||
result = cfg.get_ordered_plugins(available)
|
|
||||||
assert "b" not in result
|
|
||||||
|
|
||||||
def test_unordered_available_appended(self):
|
|
||||||
cfg = VegasModeConfig(plugin_order=["a"])
|
|
||||||
available = ["a", "b", "c"]
|
|
||||||
result = cfg.get_ordered_plugins(available)
|
|
||||||
assert result[0] == "a"
|
|
||||||
assert "b" in result
|
|
||||||
assert "c" in result
|
|
||||||
|
|
||||||
def test_empty_available(self):
|
|
||||||
cfg = VegasModeConfig(plugin_order=["a"])
|
|
||||||
result = cfg.get_ordered_plugins([])
|
|
||||||
assert result == []
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# validate
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestValidate:
|
|
||||||
def test_valid_config_no_errors(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert errors == []
|
|
||||||
|
|
||||||
def test_scroll_speed_too_low(self):
|
|
||||||
cfg = VegasModeConfig(scroll_speed=0.5)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("scroll_speed" in e for e in errors)
|
|
||||||
|
|
||||||
def test_scroll_speed_too_high(self):
|
|
||||||
cfg = VegasModeConfig(scroll_speed=300.0)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("scroll_speed" in e for e in errors)
|
|
||||||
|
|
||||||
def test_separator_width_negative(self):
|
|
||||||
cfg = VegasModeConfig(separator_width=-1)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("separator_width" in e for e in errors)
|
|
||||||
|
|
||||||
def test_separator_width_too_large(self):
|
|
||||||
cfg = VegasModeConfig(separator_width=200)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("separator_width" in e for e in errors)
|
|
||||||
|
|
||||||
def test_target_fps_too_low(self):
|
|
||||||
cfg = VegasModeConfig(target_fps=10)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("target_fps" in e for e in errors)
|
|
||||||
|
|
||||||
def test_target_fps_too_high(self):
|
|
||||||
cfg = VegasModeConfig(target_fps=300)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("target_fps" in e for e in errors)
|
|
||||||
|
|
||||||
def test_buffer_ahead_too_low(self):
|
|
||||||
cfg = VegasModeConfig(buffer_ahead=0)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("buffer_ahead" in e for e in errors)
|
|
||||||
|
|
||||||
def test_buffer_ahead_too_high(self):
|
|
||||||
cfg = VegasModeConfig(buffer_ahead=10)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert any("buffer_ahead" in e for e in errors)
|
|
||||||
|
|
||||||
def test_multiple_errors_returned(self):
|
|
||||||
cfg = VegasModeConfig(scroll_speed=0.1, target_fps=5)
|
|
||||||
errors = cfg.validate()
|
|
||||||
assert len(errors) >= 2
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# update
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
class TestUpdate:
|
|
||||||
def _wrap(self, **kwargs) -> dict:
|
|
||||||
return {"display": {"vegas_scroll": kwargs}}
|
|
||||||
|
|
||||||
def test_update_enabled(self):
|
|
||||||
cfg = VegasModeConfig(enabled=False)
|
|
||||||
cfg.update(self._wrap(enabled=True))
|
|
||||||
assert cfg.enabled is True
|
|
||||||
|
|
||||||
def test_update_scroll_speed(self):
|
|
||||||
cfg = VegasModeConfig(scroll_speed=50.0)
|
|
||||||
cfg.update(self._wrap(scroll_speed=90.0))
|
|
||||||
assert cfg.scroll_speed == 90.0
|
|
||||||
|
|
||||||
def test_update_separator_width(self):
|
|
||||||
cfg = VegasModeConfig(separator_width=32)
|
|
||||||
cfg.update(self._wrap(separator_width=8))
|
|
||||||
assert cfg.separator_width == 8
|
|
||||||
|
|
||||||
def test_update_plugin_order(self):
|
|
||||||
cfg = VegasModeConfig(plugin_order=[])
|
|
||||||
cfg.update(self._wrap(plugin_order=["x", "y"]))
|
|
||||||
assert cfg.plugin_order == ["x", "y"]
|
|
||||||
|
|
||||||
def test_update_excluded_plugins(self):
|
|
||||||
cfg = VegasModeConfig()
|
|
||||||
cfg.update(self._wrap(excluded_plugins=["skip_me"]))
|
|
||||||
assert "skip_me" in cfg.excluded_plugins
|
|
||||||
|
|
||||||
def test_update_ignores_missing_keys(self):
|
|
||||||
cfg = VegasModeConfig(scroll_speed=50.0)
|
|
||||||
cfg.update(self._wrap(target_fps=80)) # only fps, not speed
|
|
||||||
assert cfg.scroll_speed == 50.0
|
|
||||||
assert cfg.target_fps == 80
|
|
||||||
|
|
||||||
def test_empty_update_no_change(self):
|
|
||||||
cfg = VegasModeConfig(scroll_speed=50.0)
|
|
||||||
cfg.update({})
|
|
||||||
assert cfg.scroll_speed == 50.0
|
|
||||||
@@ -402,7 +402,9 @@ def save_schedule_config():
|
|||||||
return success_response(message='Schedule configuration saved successfully')
|
return success_response(message='Schedule configuration saved successfully')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
import logging
|
import logging
|
||||||
logger.error("Error saving schedule config", exc_info=True)
|
import traceback
|
||||||
|
error_msg = f"Error saving schedule config: {str(e)}\n{traceback.format_exc()}"
|
||||||
|
logging.error(error_msg)
|
||||||
return error_response(
|
return error_response(
|
||||||
ErrorCode.CONFIG_SAVE_FAILED,
|
ErrorCode.CONFIG_SAVE_FAILED,
|
||||||
"An error occurred; see logs for details",
|
"An error occurred; see logs for details",
|
||||||
@@ -621,7 +623,9 @@ def save_dim_schedule_config():
|
|||||||
return success_response(message='Dim schedule configuration saved successfully')
|
return success_response(message='Dim schedule configuration saved successfully')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
import logging
|
import logging
|
||||||
logger.error("Error saving dim schedule config", exc_info=True)
|
import traceback
|
||||||
|
error_msg = f"Error saving dim schedule config: {str(e)}\n{traceback.format_exc()}"
|
||||||
|
logging.error(error_msg)
|
||||||
return error_response(
|
return error_response(
|
||||||
ErrorCode.CONFIG_SAVE_FAILED,
|
ErrorCode.CONFIG_SAVE_FAILED,
|
||||||
"An error occurred; see logs for details",
|
"An error occurred; see logs for details",
|
||||||
@@ -917,7 +921,7 @@ def save_main_config():
|
|||||||
if 'properties' in schema:
|
if 'properties' in schema:
|
||||||
secret_fields = find_secret_fields(schema['properties'])
|
secret_fields = find_secret_fields(schema['properties'])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Error reading schema for secret detection: %s", e)
|
print(f"Error reading schema for secret detection: {e}")
|
||||||
|
|
||||||
# Separate secrets from regular config (same logic as save_plugin_config)
|
# Separate secrets from regular config (same logic as save_plugin_config)
|
||||||
def separate_secrets(config, secrets_set, prefix=''):
|
def separate_secrets(config, secrets_set, prefix=''):
|
||||||
@@ -955,7 +959,7 @@ def save_main_config():
|
|||||||
if 'enabled' not in regular_config:
|
if 'enabled' not in regular_config:
|
||||||
regular_config['enabled'] = True
|
regular_config['enabled'] = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Error preserving enabled state: %s", e)
|
print(f"Error preserving enabled state for {plugin_id}: {e}")
|
||||||
# Default to True on error to avoid disabling plugins
|
# Default to True on error to avoid disabling plugins
|
||||||
regular_config['enabled'] = True
|
regular_config['enabled'] = True
|
||||||
|
|
||||||
@@ -990,7 +994,7 @@ def save_main_config():
|
|||||||
plugin_instance.on_config_change(plugin_full_config)
|
plugin_instance.on_config_change(plugin_full_config)
|
||||||
except Exception as hook_err:
|
except Exception as hook_err:
|
||||||
# Don't fail the save if hook fails
|
# Don't fail the save if hook fails
|
||||||
logger.warning("on_config_change failed: %s", hook_err)
|
print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}")
|
||||||
|
|
||||||
# Remove processed plugin keys from data (they're already in current_config)
|
# Remove processed plugin keys from data (they're already in current_config)
|
||||||
for key in plugin_keys_to_remove:
|
for key in plugin_keys_to_remove:
|
||||||
@@ -1039,10 +1043,14 @@ def save_main_config():
|
|||||||
|
|
||||||
return success_response(message='Configuration saved successfully')
|
return success_response(message='Configuration saved successfully')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error saving config", exc_info=True)
|
import logging
|
||||||
|
import traceback
|
||||||
|
error_msg = f"Error saving config: {str(e)}\n{traceback.format_exc()}"
|
||||||
|
logging.error(error_msg)
|
||||||
return error_response(
|
return error_response(
|
||||||
ErrorCode.CONFIG_SAVE_FAILED,
|
ErrorCode.CONFIG_SAVE_FAILED,
|
||||||
"An error occurred; see logs for details",
|
f"Error saving configuration: {e}",
|
||||||
|
|
||||||
status_code=500
|
status_code=500
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1079,8 +1087,13 @@ def save_raw_main_config():
|
|||||||
logger.error('Invalid JSON', exc_info=True)
|
logger.error('Invalid JSON', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
|
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import logging
|
||||||
|
import traceback
|
||||||
from src.exceptions import ConfigError
|
from src.exceptions import ConfigError
|
||||||
logger.error("Error saving raw main config", exc_info=True)
|
|
||||||
|
# Log the full error for debugging
|
||||||
|
error_msg = f"Error saving raw main config: {str(e)}\n{traceback.format_exc()}"
|
||||||
|
logging.error(error_msg)
|
||||||
|
|
||||||
# Extract more specific error message if it's a ConfigError
|
# Extract more specific error message if it's a ConfigError
|
||||||
if isinstance(e, ConfigError):
|
if isinstance(e, ConfigError):
|
||||||
@@ -1126,8 +1139,13 @@ def save_raw_secrets_config():
|
|||||||
logger.error('Invalid JSON', exc_info=True)
|
logger.error('Invalid JSON', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
|
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import logging
|
||||||
|
import traceback
|
||||||
from src.exceptions import ConfigError
|
from src.exceptions import ConfigError
|
||||||
logger.error("Error saving raw secrets config", exc_info=True)
|
|
||||||
|
# Log the full error for debugging
|
||||||
|
error_msg = f"Error saving raw secrets config: {str(e)}\n{traceback.format_exc()}"
|
||||||
|
logging.error(error_msg)
|
||||||
|
|
||||||
# Extract more specific error message if it's a ConfigError
|
# Extract more specific error message if it's a ConfigError
|
||||||
if isinstance(e, ConfigError):
|
if isinstance(e, ConfigError):
|
||||||
@@ -1347,9 +1365,7 @@ def get_git_version(project_dir=None):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
version_str = result.stdout.strip()
|
return result.stdout.strip()
|
||||||
if re.match(r'^[a-zA-Z0-9._\-]+$', version_str):
|
|
||||||
return version_str
|
|
||||||
|
|
||||||
# Fallback to short commit hash
|
# Fallback to short commit hash
|
||||||
result = subprocess.run(
|
result = subprocess.run(
|
||||||
@@ -1361,9 +1377,7 @@ def get_git_version(project_dir=None):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if result.returncode == 0:
|
if result.returncode == 0:
|
||||||
version_str = result.stdout.strip()
|
return result.stdout.strip()
|
||||||
if re.match(r'^[a-zA-Z0-9._\-]+$', version_str):
|
|
||||||
return version_str
|
|
||||||
|
|
||||||
return 'Unknown'
|
return 'Unknown'
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -1458,10 +1472,12 @@ def execute_system_action():
|
|||||||
# For now, just start the display service
|
# For now, just start the display service
|
||||||
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
|
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
|
||||||
capture_output=True, text=True)
|
capture_output=True, text=True)
|
||||||
logger.info("start_display (%s) returned code %d", mode, result.returncode)
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'success' if result.returncode == 0 else 'error',
|
'status': 'success' if result.returncode == 0 else 'error',
|
||||||
'message': 'Display started' if result.returncode == 0 else 'Failed to start display',
|
'message': f'Started display in {mode} mode',
|
||||||
|
'returncode': result.returncode,
|
||||||
|
'stdout': result.stdout,
|
||||||
|
'stderr': result.stderr
|
||||||
})
|
})
|
||||||
else:
|
else:
|
||||||
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
|
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
|
||||||
@@ -1522,12 +1538,13 @@ def execute_system_action():
|
|||||||
cwd=project_dir
|
cwd=project_dir
|
||||||
)
|
)
|
||||||
if stash_result.returncode == 0:
|
if stash_result.returncode == 0:
|
||||||
logger.debug("git stash: stashed local changes before pull")
|
print(f"Stashed local changes: {stash_result.stdout}")
|
||||||
stash_info = " Local changes were stashed."
|
stash_info = " Local changes were stashed."
|
||||||
else:
|
else:
|
||||||
logger.warning("git stash failed before pull (returncode=%d)", stash_result.returncode)
|
# If stash fails, log but continue with pull
|
||||||
|
print(f"Stash failed: {stash_result.stderr}")
|
||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
logger.warning("git stash timed out, proceeding with pull")
|
print("Stash operation timed out, proceeding with pull")
|
||||||
|
|
||||||
# Perform the git pull
|
# Perform the git pull
|
||||||
result = subprocess.run(
|
result = subprocess.run(
|
||||||
@@ -1546,12 +1563,14 @@ def execute_system_action():
|
|||||||
if result.stdout and "Already up to date" not in result.stdout:
|
if result.stdout and "Already up to date" not in result.stdout:
|
||||||
pull_message = f"Code updated successfully.{stash_info}"
|
pull_message = f"Code updated successfully.{stash_info}"
|
||||||
else:
|
else:
|
||||||
logger.warning("git pull failed (returncode=%d): %s", result.returncode, result.stderr)
|
pull_message = f"Update failed: {result.stderr or 'Unknown error'}"
|
||||||
pull_message = "Update failed; check logs for details"
|
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'success' if result.returncode == 0 else 'error',
|
'status': 'success' if result.returncode == 0 else 'error',
|
||||||
'message': pull_message,
|
'message': pull_message,
|
||||||
|
'returncode': result.returncode,
|
||||||
|
'stdout': result.stdout,
|
||||||
|
'stderr': result.stderr
|
||||||
})
|
})
|
||||||
elif action == 'restart_display_service':
|
elif action == 'restart_display_service':
|
||||||
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'],
|
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'],
|
||||||
@@ -1561,12 +1580,14 @@ def execute_system_action():
|
|||||||
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'],
|
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'],
|
||||||
capture_output=True, text=True)
|
capture_output=True, text=True)
|
||||||
else:
|
else:
|
||||||
return jsonify({'status': 'error', 'message': 'Unknown action'}), 400
|
return jsonify({'status': 'error', 'message': f'Unknown action: {action}'}), 400
|
||||||
|
|
||||||
logger.info("system action '%s' returncode=%d", action, result.returncode)
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'success' if result.returncode == 0 else 'error',
|
'status': 'success' if result.returncode == 0 else 'error',
|
||||||
'message': 'Action completed' if result.returncode == 0 else 'Action failed; check logs for details',
|
'message': f'Action {action} completed',
|
||||||
|
'returncode': result.returncode,
|
||||||
|
'stdout': result.stdout,
|
||||||
|
'stderr': result.stderr
|
||||||
})
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -1666,6 +1687,8 @@ def get_on_demand_status():
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_on_demand_status', exc_info=True)
|
logger.error('Error in get_on_demand_status', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -1732,11 +1755,11 @@ def start_on_demand_display():
|
|||||||
# Stop the display service first to ensure clean state when we will restart it
|
# Stop the display service first to ensure clean state when we will restart it
|
||||||
if service_was_running and start_service:
|
if service_was_running and start_service:
|
||||||
import time as time_module
|
import time as time_module
|
||||||
logger.debug("Stopping display service before starting on-demand mode")
|
print("Stopping display service before starting on-demand mode...")
|
||||||
_stop_display_service()
|
_stop_display_service()
|
||||||
# Wait a brief moment for the service to fully stop
|
# Wait a brief moment for the service to fully stop
|
||||||
time_module.sleep(1.5)
|
time_module.sleep(1.5)
|
||||||
logger.debug("Display service stopped, now starting with on-demand request")
|
print("Display service stopped, now starting with on-demand request...")
|
||||||
|
|
||||||
if not service_status.get('active') and not start_service:
|
if not service_status.get('active') and not start_service:
|
||||||
return jsonify({
|
return jsonify({
|
||||||
@@ -1769,6 +1792,8 @@ def start_on_demand_display():
|
|||||||
}
|
}
|
||||||
return jsonify({'status': 'success', 'data': response_data})
|
return jsonify({'status': 'success', 'data': response_data})
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in start_on_demand_display', exc_info=True)
|
logger.error('Error in start_on_demand_display', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -1805,6 +1830,8 @@ def stop_on_demand_display():
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in stop_on_demand_display', exc_info=True)
|
logger.error('Error in stop_on_demand_display', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -1934,6 +1961,8 @@ def get_installed_plugins():
|
|||||||
|
|
||||||
return jsonify({'status': 'success', 'data': {'plugins': plugins}})
|
return jsonify({'status': 'success', 'data': {'plugins': plugins}})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_installed_plugins', exc_info=True)
|
logger.error('Error in get_installed_plugins', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -1960,6 +1989,8 @@ def get_plugin_health():
|
|||||||
'data': health_summaries
|
'data': health_summaries
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_plugin_health', exc_info=True)
|
logger.error('Error in get_plugin_health', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -1985,6 +2016,8 @@ def get_plugin_health_single(plugin_id):
|
|||||||
'data': health_summary
|
'data': health_summary
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_plugin_health_single', exc_info=True)
|
logger.error('Error in get_plugin_health_single', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -2010,6 +2043,8 @@ def reset_plugin_health(plugin_id):
|
|||||||
'message': f'Health state reset for plugin {plugin_id}'
|
'message': f'Health state reset for plugin {plugin_id}'
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in reset_plugin_health', exc_info=True)
|
logger.error('Error in reset_plugin_health', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -2036,6 +2071,8 @@ def get_plugin_metrics():
|
|||||||
'data': metrics_summaries
|
'data': metrics_summaries
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_plugin_metrics', exc_info=True)
|
logger.error('Error in get_plugin_metrics', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -2061,6 +2098,8 @@ def get_plugin_metrics_single(plugin_id):
|
|||||||
'data': metrics_summary
|
'data': metrics_summary
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_plugin_metrics_single', exc_info=True)
|
logger.error('Error in get_plugin_metrics_single', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -2086,6 +2125,8 @@ def reset_plugin_metrics(plugin_id):
|
|||||||
'message': f'Metrics reset for plugin {plugin_id}'
|
'message': f'Metrics reset for plugin {plugin_id}'
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in reset_plugin_metrics', exc_info=True)
|
logger.error('Error in reset_plugin_metrics', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -2141,6 +2182,8 @@ def manage_plugin_limits(plugin_id):
|
|||||||
'message': f'Resource limits updated for plugin {plugin_id}'
|
'message': f'Resource limits updated for plugin {plugin_id}'
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in manage_plugin_limits', exc_info=True)
|
logger.error('Error in manage_plugin_limits', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -2185,7 +2228,7 @@ def toggle_plugin():
|
|||||||
|
|
||||||
# Check if plugin exists in manifests (discovered but may not be loaded)
|
# Check if plugin exists in manifests (discovered but may not be loaded)
|
||||||
if plugin_id not in api_v3.plugin_manager.plugin_manifests:
|
if plugin_id not in api_v3.plugin_manager.plugin_manifests:
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
# Update config (this is what the display controller reads)
|
# Update config (this is what the display controller reads)
|
||||||
config = api_v3.config_manager.load_config()
|
config = api_v3.config_manager.load_config()
|
||||||
@@ -2562,7 +2605,7 @@ def get_plugin_config():
|
|||||||
categories_from_files[category_name]['data_file'] = f'of_the_day/{filename}'
|
categories_from_files[category_name]['data_file'] = f'of_the_day/{filename}'
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Could not read json file: %s", e)
|
print(f"Warning: Could not read {json_file}: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Update plugin_config with scanned files
|
# Update plugin_config with scanned files
|
||||||
@@ -2665,7 +2708,7 @@ def update_plugin():
|
|||||||
manifest = json.load(f)
|
manifest = json.load(f)
|
||||||
current_last_updated = manifest.get('last_updated')
|
current_last_updated = manifest.get('last_updated')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Could not read local manifest for plugin: %s", e)
|
print(f"Warning: Could not read local manifest for {plugin_id}: {e}")
|
||||||
|
|
||||||
if api_v3.plugin_store_manager:
|
if api_v3.plugin_store_manager:
|
||||||
git_info_before = api_v3.plugin_store_manager._get_local_git_info(plugin_dir)
|
git_info_before = api_v3.plugin_store_manager._get_local_git_info(plugin_dir)
|
||||||
@@ -2680,14 +2723,16 @@ def update_plugin():
|
|||||||
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
|
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
|
||||||
is_git_repo = git_info is not None
|
is_git_repo = git_info is not None
|
||||||
if is_git_repo:
|
if is_git_repo:
|
||||||
logger.debug("Plugin is a git repository, will update via git pull")
|
print(f"[UPDATE] Plugin {plugin_id} is a git repository, will update via git pull")
|
||||||
|
|
||||||
remote_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id, fetch_latest_from_github=True)
|
remote_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id, fetch_latest_from_github=True)
|
||||||
remote_commit = remote_info.get('last_commit_sha') if remote_info else None
|
remote_commit = remote_info.get('last_commit_sha') if remote_info else None
|
||||||
remote_branch = remote_info.get('branch') if remote_info else None
|
remote_branch = remote_info.get('branch') if remote_info else None
|
||||||
|
|
||||||
# Update the plugin
|
# Update the plugin
|
||||||
|
print(f"[UPDATE] Attempting to update plugin {plugin_id}...")
|
||||||
success = api_v3.plugin_store_manager.update_plugin(plugin_id)
|
success = api_v3.plugin_store_manager.update_plugin(plugin_id)
|
||||||
|
print(f"[UPDATE] Update result for {plugin_id}: {success}")
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
updated_last_updated = current_last_updated
|
updated_last_updated = current_last_updated
|
||||||
@@ -2698,7 +2743,7 @@ def update_plugin():
|
|||||||
manifest = json.load(f)
|
manifest = json.load(f)
|
||||||
updated_last_updated = manifest.get('last_updated', current_last_updated)
|
updated_last_updated = manifest.get('last_updated', current_last_updated)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Could not read updated manifest after update: %s", e)
|
print(f"Warning: Could not read updated manifest for {plugin_id}: {e}")
|
||||||
|
|
||||||
updated_commit = None
|
updated_commit = None
|
||||||
updated_branch = remote_branch or current_branch
|
updated_branch = remote_branch or current_branch
|
||||||
@@ -2760,41 +2805,52 @@ def update_plugin():
|
|||||||
message=message
|
message=message
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
error_msg = f'Failed to update plugin {plugin_id}'
|
||||||
plugin_path_dir = Path(api_v3.plugin_store_manager.plugins_dir) / plugin_id
|
plugin_path_dir = Path(api_v3.plugin_store_manager.plugins_dir) / plugin_id
|
||||||
if not plugin_path_dir.exists():
|
if not plugin_path_dir.exists():
|
||||||
client_msg = 'Plugin update failed: plugin not found'
|
error_msg += ': Plugin not found'
|
||||||
else:
|
else:
|
||||||
|
# Check if it's a git repo (could be installed from URL, not in registry)
|
||||||
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
|
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
|
||||||
if not git_info:
|
if git_info:
|
||||||
|
# It's a git repo, so update should have worked - provide generic error
|
||||||
|
error_msg += ': Update failed (check logs for details)'
|
||||||
|
else:
|
||||||
|
# Not a git repo, check if it's in registry
|
||||||
plugin_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id)
|
plugin_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id)
|
||||||
if not plugin_info:
|
if not plugin_info:
|
||||||
client_msg = 'Plugin update failed: not found in registry'
|
error_msg += ': Plugin not found in registry and not a git repository'
|
||||||
else:
|
else:
|
||||||
client_msg = 'Plugin update failed; check logs for details'
|
error_msg += ': Update failed (check logs for details)'
|
||||||
else:
|
|
||||||
client_msg = 'Plugin update failed; check logs for details'
|
|
||||||
logger.error("update_plugin failed for plugin_id=%s: %s", plugin_id, client_msg)
|
|
||||||
|
|
||||||
if api_v3.operation_history:
|
if api_v3.operation_history:
|
||||||
api_v3.operation_history.record_operation(
|
api_v3.operation_history.record_operation(
|
||||||
"update",
|
"update",
|
||||||
plugin_id=plugin_id,
|
plugin_id=plugin_id,
|
||||||
status="failed",
|
status="failed",
|
||||||
error=client_msg,
|
error=error_msg,
|
||||||
details={
|
details={
|
||||||
"previous_commit": current_commit[:7] if current_commit else None,
|
"previous_commit": current_commit[:7] if current_commit else None,
|
||||||
"branch": current_branch
|
"branch": current_branch
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
|
print(f"[UPDATE] Update failed for {plugin_id}: {error_msg}")
|
||||||
|
print(f"[UPDATE] Traceback: {error_details}")
|
||||||
|
|
||||||
return error_response(
|
return error_response(
|
||||||
ErrorCode.PLUGIN_UPDATE_FAILED,
|
ErrorCode.PLUGIN_UPDATE_FAILED,
|
||||||
client_msg,
|
error_msg,
|
||||||
status_code=500
|
status_code=500
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error("Unhandled exception in update endpoint", exc_info=True)
|
logger.error("Unhandled exception in update endpoint", exc_info=True)
|
||||||
|
print(f"[UPDATE] Traceback: {error_details}")
|
||||||
|
|
||||||
from src.web_interface.errors import WebInterfaceError
|
from src.web_interface.errors import WebInterfaceError
|
||||||
error = WebInterfaceError.from_exception(e, ErrorCode.PLUGIN_UPDATE_FAILED)
|
error = WebInterfaceError.from_exception(e, ErrorCode.PLUGIN_UPDATE_FAILED)
|
||||||
@@ -2863,7 +2919,7 @@ def uninstall_plugin():
|
|||||||
try:
|
try:
|
||||||
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
|
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
|
||||||
except Exception as cleanup_err:
|
except Exception as cleanup_err:
|
||||||
logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err)
|
print(f"Warning: Failed to cleanup config for {plugin_id}: {cleanup_err}")
|
||||||
|
|
||||||
# Remove from state manager
|
# Remove from state manager
|
||||||
if api_v3.plugin_state_manager:
|
if api_v3.plugin_state_manager:
|
||||||
@@ -2878,7 +2934,7 @@ def uninstall_plugin():
|
|||||||
details={"preserve_config": preserve_config}
|
details={"preserve_config": preserve_config}
|
||||||
)
|
)
|
||||||
|
|
||||||
return {'success': True, 'message': 'Plugin uninstalled successfully'}
|
return {'success': True, 'message': f'Plugin {plugin_id} uninstalled successfully'}
|
||||||
|
|
||||||
# Enqueue operation
|
# Enqueue operation
|
||||||
operation_id = api_v3.operation_queue.enqueue_operation(
|
operation_id = api_v3.operation_queue.enqueue_operation(
|
||||||
@@ -2889,7 +2945,7 @@ def uninstall_plugin():
|
|||||||
|
|
||||||
return success_response(
|
return success_response(
|
||||||
data={'operation_id': operation_id},
|
data={'operation_id': operation_id},
|
||||||
message='Plugin uninstallation queued'
|
message=f'Plugin {plugin_id} uninstallation queued'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Fallback to direct uninstall
|
# Fallback to direct uninstall
|
||||||
@@ -2910,7 +2966,7 @@ def uninstall_plugin():
|
|||||||
try:
|
try:
|
||||||
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
|
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
|
||||||
except Exception as cleanup_err:
|
except Exception as cleanup_err:
|
||||||
logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err)
|
print(f"Warning: Failed to cleanup config for {plugin_id}: {cleanup_err}")
|
||||||
|
|
||||||
# Remove from state manager
|
# Remove from state manager
|
||||||
if api_v3.plugin_state_manager:
|
if api_v3.plugin_state_manager:
|
||||||
@@ -2925,19 +2981,19 @@ def uninstall_plugin():
|
|||||||
details={"preserve_config": preserve_config}
|
details={"preserve_config": preserve_config}
|
||||||
)
|
)
|
||||||
|
|
||||||
return success_response(message='Plugin uninstalled successfully')
|
return success_response(message=f'Plugin {plugin_id} uninstalled successfully')
|
||||||
else:
|
else:
|
||||||
if api_v3.operation_history:
|
if api_v3.operation_history:
|
||||||
api_v3.operation_history.record_operation(
|
api_v3.operation_history.record_operation(
|
||||||
"uninstall",
|
"uninstall",
|
||||||
plugin_id=plugin_id,
|
plugin_id=plugin_id,
|
||||||
status="failed",
|
status="failed",
|
||||||
error='Plugin uninstall failed'
|
error=f'Failed to uninstall plugin {plugin_id}'
|
||||||
)
|
)
|
||||||
|
|
||||||
return error_response(
|
return error_response(
|
||||||
ErrorCode.PLUGIN_UNINSTALL_FAILED,
|
ErrorCode.PLUGIN_UNINSTALL_FAILED,
|
||||||
'Plugin uninstall failed',
|
f'Failed to uninstall plugin {plugin_id}',
|
||||||
status_code=500
|
status_code=500
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -2977,7 +3033,7 @@ def install_plugin():
|
|||||||
# Log the plugins directory being used for debugging
|
# Log the plugins directory being used for debugging
|
||||||
plugins_dir = api_v3.plugin_store_manager.plugins_dir
|
plugins_dir = api_v3.plugin_store_manager.plugins_dir
|
||||||
branch_info = f" (branch: {branch})" if branch else ""
|
branch_info = f" (branch: {branch})" if branch else ""
|
||||||
logger.info("Installing plugin to directory: %s", plugins_dir)
|
print(f"Installing plugin {plugin_id}{branch_info} to directory: {plugins_dir}", flush=True)
|
||||||
|
|
||||||
# Use operation queue if available
|
# Use operation queue if available
|
||||||
if api_v3.operation_queue:
|
if api_v3.operation_queue:
|
||||||
@@ -3065,7 +3121,7 @@ def install_plugin():
|
|||||||
)
|
)
|
||||||
|
|
||||||
branch_msg = f" (branch: {branch})" if branch else ""
|
branch_msg = f" (branch: {branch})" if branch else ""
|
||||||
return success_response(message=f'Plugin installed successfully{branch_msg}')
|
return success_response(message=f'Plugin {plugin_id} installed successfully{branch_msg}')
|
||||||
else:
|
else:
|
||||||
error_msg = f'Failed to install plugin {plugin_id}'
|
error_msg = f'Failed to install plugin {plugin_id}'
|
||||||
if branch:
|
if branch:
|
||||||
@@ -3090,6 +3146,8 @@ def install_plugin():
|
|||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in install_plugin', exc_info=True)
|
logger.error('Error in install_plugin', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3145,6 +3203,8 @@ def install_plugin_from_url():
|
|||||||
}), 500
|
}), 500
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in install_plugin_from_url', exc_info=True)
|
logger.error('Error in install_plugin_from_url', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3177,6 +3237,8 @@ def get_registry_from_url():
|
|||||||
}), 400
|
}), 400
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_registry_from_url', exc_info=True)
|
logger.error('Error in get_registry_from_url', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3190,6 +3252,8 @@ def get_saved_repositories():
|
|||||||
repositories = api_v3.saved_repositories_manager.get_all()
|
repositories = api_v3.saved_repositories_manager.get_all()
|
||||||
return jsonify({'status': 'success', 'data': {'repositories': repositories}})
|
return jsonify({'status': 'success', 'data': {'repositories': repositories}})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_saved_repositories', exc_info=True)
|
logger.error('Error in get_saved_repositories', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3221,6 +3285,8 @@ def add_saved_repository():
|
|||||||
'message': 'Repository already exists or failed to save'
|
'message': 'Repository already exists or failed to save'
|
||||||
}), 400
|
}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in add_saved_repository', exc_info=True)
|
logger.error('Error in add_saved_repository', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3251,6 +3317,8 @@ def remove_saved_repository():
|
|||||||
'message': 'Repository not found'
|
'message': 'Repository not found'
|
||||||
}), 404
|
}), 404
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in remove_saved_repository', exc_info=True)
|
logger.error('Error in remove_saved_repository', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3304,6 +3372,8 @@ def list_plugin_store():
|
|||||||
|
|
||||||
return jsonify({'status': 'success', 'data': {'plugins': formatted_plugins}})
|
return jsonify({'status': 'success', 'data': {'plugins': formatted_plugins}})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in list_plugin_store', exc_info=True)
|
logger.error('Error in list_plugin_store', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3355,6 +3425,8 @@ def get_github_auth_status():
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_github_auth_status', exc_info=True)
|
logger.error('Error in get_github_auth_status', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -3382,6 +3454,8 @@ def refresh_plugin_store():
|
|||||||
'plugin_count': plugin_count
|
'plugin_count': plugin_count
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in refresh_plugin_store', exc_info=True)
|
logger.error('Error in refresh_plugin_store', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -4309,7 +4383,7 @@ def save_plugin_config():
|
|||||||
if 'enabled' not in plugin_config:
|
if 'enabled' not in plugin_config:
|
||||||
plugin_config['enabled'] = True
|
plugin_config['enabled'] = True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Error preserving enabled state: %s", e)
|
print(f"Error preserving enabled state: {e}")
|
||||||
# Default to True on error to avoid disabling plugins
|
# Default to True on error to avoid disabling plugins
|
||||||
plugin_config['enabled'] = True
|
plugin_config['enabled'] = True
|
||||||
|
|
||||||
@@ -4606,11 +4680,16 @@ def save_plugin_config():
|
|||||||
|
|
||||||
# Also print to console for immediate visibility
|
# Also print to console for immediate visibility
|
||||||
import json
|
import json
|
||||||
logger.warning("Config validation failed for plugin (see debug logs)")
|
print(f"[ERROR] Config validation failed for {plugin_id}")
|
||||||
|
print(f"[ERROR] Validation errors: {validation_errors}")
|
||||||
|
print(f"[ERROR] Config keys: {list(plugin_config.keys())}")
|
||||||
|
print(f"[ERROR] Schema property keys: {list(enhanced_schema.get('properties', {}).keys())}")
|
||||||
|
|
||||||
# Log raw form data if this was a form submission
|
# Log raw form data if this was a form submission
|
||||||
if 'application/json' not in (request.content_type or ''):
|
if 'application/json' not in (request.content_type or ''):
|
||||||
form_data = request.form.to_dict()
|
form_data = request.form.to_dict()
|
||||||
|
print(f"[ERROR] Raw form data: {json.dumps({k: str(v)[:200] for k, v in form_data.items()}, indent=2)}")
|
||||||
|
print(f"[ERROR] Parsed config: {json.dumps(plugin_config, indent=2, default=str)}")
|
||||||
return error_response(
|
return error_response(
|
||||||
ErrorCode.CONFIG_VALIDATION_FAILED,
|
ErrorCode.CONFIG_VALIDATION_FAILED,
|
||||||
'Configuration validation failed',
|
'Configuration validation failed',
|
||||||
@@ -4749,7 +4828,7 @@ def save_plugin_config():
|
|||||||
logging.warning(f"Lifecycle method error for {plugin_id}: {lifecycle_error}", exc_info=True)
|
logging.warning(f"Lifecycle method error for {plugin_id}: {lifecycle_error}", exc_info=True)
|
||||||
except Exception as hook_err:
|
except Exception as hook_err:
|
||||||
# Do not fail the save if hook fails; just log
|
# Do not fail the save if hook fails; just log
|
||||||
logger.warning("on_config_change failed: %s", hook_err)
|
print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}")
|
||||||
|
|
||||||
secret_count = len(secrets_config)
|
secret_count = len(secrets_config)
|
||||||
message = f'Plugin {plugin_id} configuration saved successfully'
|
message = f'Plugin {plugin_id} configuration saved successfully'
|
||||||
@@ -4817,6 +4896,8 @@ def get_plugin_schema():
|
|||||||
|
|
||||||
return jsonify({'status': 'success', 'data': {'schema': default_schema}})
|
return jsonify({'status': 'success', 'data': {'schema': default_schema}})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in get_plugin_schema', exc_info=True)
|
logger.error('Error in get_plugin_schema', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -4920,7 +5001,7 @@ def reset_plugin_config():
|
|||||||
if hasattr(plugin_instance, 'on_config_change'):
|
if hasattr(plugin_instance, 'on_config_change'):
|
||||||
plugin_instance.on_config_change(plugin_full_config)
|
plugin_instance.on_config_change(plugin_full_config)
|
||||||
except Exception as hook_err:
|
except Exception as hook_err:
|
||||||
logger.warning("on_config_change failed: %s", hook_err)
|
print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}")
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'success',
|
'status': 'success',
|
||||||
@@ -4928,6 +5009,8 @@ def reset_plugin_config():
|
|||||||
'data': {'config': defaults}
|
'data': {'config': defaults}
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in reset_plugin_config', exc_info=True)
|
logger.error('Error in reset_plugin_config', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -4965,7 +5048,7 @@ def execute_plugin_action():
|
|||||||
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
||||||
|
|
||||||
if not plugin_dir or not Path(plugin_dir).exists():
|
if not plugin_dir or not Path(plugin_dir).exists():
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
# Load manifest to get action definition
|
# Load manifest to get action definition
|
||||||
manifest_path = Path(plugin_dir) / 'manifest.json'
|
manifest_path = Path(plugin_dir) / 'manifest.json'
|
||||||
@@ -5186,7 +5269,9 @@ sys.exit(proc.returncode)
|
|||||||
'message': 'Could not generate authorization URL'
|
'message': 'Could not generate authorization URL'
|
||||||
}), 400
|
}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error executing action step 1", exc_info=True)
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
|
print(f"Error executing action step 1: {e}")
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'error',
|
'status': 'error',
|
||||||
'message': 'An error occurred; see logs for details'
|
'message': 'An error occurred; see logs for details'
|
||||||
@@ -5238,6 +5323,8 @@ sys.exit(proc.returncode)
|
|||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
return jsonify({'status': 'error', 'message': 'Action timed out'}), 408
|
return jsonify({'status': 'error', 'message': 'Action timed out'}), 408
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in execute_plugin_action', exc_info=True)
|
logger.error('Error in execute_plugin_action', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -5256,7 +5343,7 @@ def authenticate_spotify():
|
|||||||
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
||||||
|
|
||||||
if not plugin_dir or not Path(plugin_dir).exists():
|
if not plugin_dir or not Path(plugin_dir).exists():
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
auth_script = Path(plugin_dir) / 'authenticate_spotify.py'
|
auth_script = Path(plugin_dir) / 'authenticate_spotify.py'
|
||||||
if not auth_script.exists():
|
if not auth_script.exists():
|
||||||
@@ -5369,13 +5456,17 @@ sys.exit(proc.returncode)
|
|||||||
'auth_url': auth_url
|
'auth_url': auth_url
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error getting Spotify auth URL", exc_info=True)
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
|
print(f"Error getting Spotify auth URL: {e}")
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'error',
|
'status': 'error',
|
||||||
'message': 'An error occurred; see logs for details'
|
'message': 'An error occurred; see logs for details'
|
||||||
}), 500
|
}), 500
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in authenticate_spotify', exc_info=True)
|
logger.error('Error in authenticate_spotify', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -5391,7 +5482,7 @@ def authenticate_ytm():
|
|||||||
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
||||||
|
|
||||||
if not plugin_dir or not Path(plugin_dir).exists():
|
if not plugin_dir or not Path(plugin_dir).exists():
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
auth_script = Path(plugin_dir) / 'authenticate_ytm.py'
|
auth_script = Path(plugin_dir) / 'authenticate_ytm.py'
|
||||||
if not auth_script.exists():
|
if not auth_script.exists():
|
||||||
@@ -5426,6 +5517,8 @@ def authenticate_ytm():
|
|||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
return jsonify({'status': 'error', 'message': 'Authentication timed out'}), 408
|
return jsonify({'status': 'error', 'message': 'Authentication timed out'}), 408
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in authenticate_ytm', exc_info=True)
|
logger.error('Error in authenticate_ytm', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6022,6 +6115,7 @@ def upload_plugin_asset():
|
|||||||
})
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
logger.error('Unhandled exception', exc_info=True)
|
logger.error('Unhandled exception', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6044,7 +6138,7 @@ def upload_of_the_day_json():
|
|||||||
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
||||||
|
|
||||||
if not plugin_dir or not Path(plugin_dir).exists():
|
if not plugin_dir or not Path(plugin_dir).exists():
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
# Setup of_the_day directory
|
# Setup of_the_day directory
|
||||||
data_dir = Path(plugin_dir) / 'of_the_day'
|
data_dir = Path(plugin_dir) / 'of_the_day'
|
||||||
@@ -6147,7 +6241,7 @@ def upload_of_the_day_json():
|
|||||||
from scripts.update_config import add_category_to_config
|
from scripts.update_config import add_category_to_config
|
||||||
add_category_to_config(category_name, f'of_the_day/{safe_filename}', display_name)
|
add_category_to_config(category_name, f'of_the_day/{safe_filename}', display_name)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Could not update config: %s", e)
|
print(f"Warning: Could not update config: {e}")
|
||||||
# Continue anyway - file is uploaded
|
# Continue anyway - file is uploaded
|
||||||
|
|
||||||
# Generate file ID (use category name as ID for simplicity)
|
# Generate file ID (use category name as ID for simplicity)
|
||||||
@@ -6172,6 +6266,7 @@ def upload_of_the_day_json():
|
|||||||
})
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
logger.error('Unhandled exception', exc_info=True)
|
logger.error('Unhandled exception', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6193,7 +6288,7 @@ def delete_of_the_day_json():
|
|||||||
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
||||||
|
|
||||||
if not plugin_dir or not Path(plugin_dir).exists():
|
if not plugin_dir or not Path(plugin_dir).exists():
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
data_dir = Path(plugin_dir) / 'of_the_day'
|
data_dir = Path(plugin_dir) / 'of_the_day'
|
||||||
filename = f"{file_id}.json"
|
filename = f"{file_id}.json"
|
||||||
@@ -6211,7 +6306,7 @@ def delete_of_the_day_json():
|
|||||||
from scripts.update_config import remove_category_from_config
|
from scripts.update_config import remove_category_from_config
|
||||||
remove_category_from_config(file_id)
|
remove_category_from_config(file_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Could not update config: %s", e)
|
print(f"Warning: Could not update config: {e}")
|
||||||
|
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'success',
|
'status': 'success',
|
||||||
@@ -6219,6 +6314,7 @@ def delete_of_the_day_json():
|
|||||||
})
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
logger.error('Unhandled exception', exc_info=True)
|
logger.error('Unhandled exception', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6233,7 +6329,7 @@ def serve_plugin_static(plugin_id, file_path):
|
|||||||
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
||||||
|
|
||||||
if not plugin_dir or not Path(plugin_dir).exists():
|
if not plugin_dir or not Path(plugin_dir).exists():
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
# Resolve file path (prevent directory traversal)
|
# Resolve file path (prevent directory traversal)
|
||||||
plugin_dir = Path(plugin_dir).resolve()
|
plugin_dir = Path(plugin_dir).resolve()
|
||||||
@@ -6265,6 +6361,7 @@ def serve_plugin_static(plugin_id, file_path):
|
|||||||
return Response(content, mimetype=content_type)
|
return Response(content, mimetype=content_type)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
logger.error('Unhandled exception', exc_info=True)
|
logger.error('Unhandled exception', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6323,7 +6420,7 @@ def upload_calendar_credentials():
|
|||||||
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
|
||||||
|
|
||||||
if not plugin_dir or not Path(plugin_dir).exists():
|
if not plugin_dir or not Path(plugin_dir).exists():
|
||||||
return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
|
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
|
||||||
|
|
||||||
# Save file to plugin directory
|
# Save file to plugin directory
|
||||||
credentials_path = Path(plugin_dir) / 'credentials.json'
|
credentials_path = Path(plugin_dir) / 'credentials.json'
|
||||||
@@ -6347,6 +6444,8 @@ def upload_calendar_credentials():
|
|||||||
})
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in upload_calendar_credentials', exc_info=True)
|
logger.error('Error in upload_calendar_credentials', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6390,6 +6489,7 @@ def delete_plugin_asset():
|
|||||||
return jsonify({'status': 'success', 'message': 'Image deleted successfully'})
|
return jsonify({'status': 'success', 'message': 'Image deleted successfully'})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
logger.error('Unhandled exception', exc_info=True)
|
logger.error('Unhandled exception', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6418,6 +6518,7 @@ def list_plugin_assets():
|
|||||||
return jsonify({'status': 'success', 'data': {'assets': assets}})
|
return jsonify({'status': 'success', 'data': {'assets': assets}})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
logger.error('Unhandled exception', exc_info=True)
|
logger.error('Unhandled exception', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6542,7 +6643,7 @@ def scan_wifi_networks():
|
|||||||
ap_was_active = wifi_manager._is_ap_mode_active()
|
ap_was_active = wifi_manager._is_ap_mode_active()
|
||||||
|
|
||||||
# Perform the scan (this will handle AP mode disabling/enabling internally)
|
# Perform the scan (this will handle AP mode disabling/enabling internally)
|
||||||
networks, _was_cached = wifi_manager.scan_networks()
|
networks = wifi_manager.scan_networks()
|
||||||
|
|
||||||
# Convert to dict format
|
# Convert to dict format
|
||||||
networks_data = [
|
networks_data = [
|
||||||
@@ -6641,7 +6742,10 @@ def connect_wifi():
|
|||||||
'message': message or 'Failed to connect to network'
|
'message': message or 'Failed to connect to network'
|
||||||
}), 400
|
}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error connecting to WiFi", exc_info=True)
|
import logging
|
||||||
|
import traceback
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.error(f"Error connecting to WiFi: {e}\n{traceback.format_exc()}")
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'error',
|
'status': 'error',
|
||||||
'message': 'An error occurred; see logs for details'
|
'message': 'An error occurred; see logs for details'
|
||||||
@@ -6667,7 +6771,10 @@ def disconnect_wifi():
|
|||||||
'message': message or 'Failed to disconnect from network'
|
'message': message or 'Failed to disconnect from network'
|
||||||
}), 400
|
}), 400
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error disconnecting from WiFi", exc_info=True)
|
import logging
|
||||||
|
import traceback
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.error(f"Error disconnecting from WiFi: {e}\n{traceback.format_exc()}")
|
||||||
return jsonify({
|
return jsonify({
|
||||||
'status': 'error',
|
'status': 'error',
|
||||||
'message': 'An error occurred; see logs for details'
|
'message': 'An error occurred; see logs for details'
|
||||||
@@ -6680,9 +6787,7 @@ def enable_ap_mode():
|
|||||||
from src.wifi_manager import WiFiManager
|
from src.wifi_manager import WiFiManager
|
||||||
|
|
||||||
wifi_manager = WiFiManager()
|
wifi_manager = WiFiManager()
|
||||||
_force_raw = (request.get_json(silent=True) or {}).get('force', False)
|
success, message = wifi_manager.enable_ap_mode()
|
||||||
force = _force_raw is True or (isinstance(_force_raw, str) and _force_raw.lower() in ('true', '1'))
|
|
||||||
success, message = wifi_manager.enable_ap_mode(force=force)
|
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
return jsonify({
|
return jsonify({
|
||||||
@@ -6799,6 +6904,8 @@ def list_cache_files():
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in list_cache_files', exc_info=True)
|
logger.error('Error in list_cache_files', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6825,6 +6932,8 @@ def delete_cache_file():
|
|||||||
'message': f'Cache file for key "{cache_key}" deleted successfully'
|
'message': f'Cache file for key "{cache_key}" deleted successfully'
|
||||||
})
|
})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
import traceback
|
||||||
|
error_details = traceback.format_exc()
|
||||||
logger.error('Error in delete_cache_file', exc_info=True)
|
logger.error('Error in delete_cache_file', exc_info=True)
|
||||||
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
|
||||||
|
|
||||||
@@ -6866,7 +6975,7 @@ def get_plugin_errors(plugin_id):
|
|||||||
try:
|
try:
|
||||||
aggregator = get_error_aggregator()
|
aggregator = get_error_aggregator()
|
||||||
health = aggregator.get_plugin_health(plugin_id)
|
health = aggregator.get_plugin_health(plugin_id)
|
||||||
return success_response(data=health, message="Plugin health retrieved")
|
return success_response(data=health, message=f"Plugin {plugin_id} health retrieved")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error getting plugin health for {plugin_id}: {e}", exc_info=True)
|
logger.error(f"Error getting plugin health for {plugin_id}: {e}", exc_info=True)
|
||||||
return error_response(
|
return error_response(
|
||||||
@@ -6939,8 +7048,6 @@ _BACKUP_EXPORT_DIR = PROJECT_ROOT / "config" / "backups" / "exports"
|
|||||||
def _safe_backup_path(filename: str) -> Path:
|
def _safe_backup_path(filename: str) -> Path:
|
||||||
"""Resolve a filename to an absolute path inside the export dir,
|
"""Resolve a filename to an absolute path inside the export dir,
|
||||||
rejecting any traversal attempts. Returns None if unsafe."""
|
rejecting any traversal attempts. Returns None if unsafe."""
|
||||||
# Use basename first (CodeQL-recognized sanitizer) then validate format
|
|
||||||
filename = os.path.basename(filename or '')
|
|
||||||
if not filename or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$', filename):
|
if not filename or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$', filename):
|
||||||
return None
|
return None
|
||||||
path = (_BACKUP_EXPORT_DIR / filename).resolve()
|
path = (_BACKUP_EXPORT_DIR / filename).resolve()
|
||||||
|
|||||||
@@ -2,8 +2,6 @@ from flask import Blueprint, render_template, flash
|
|||||||
from markupsafe import escape
|
from markupsafe import escape
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import re
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from src.web_interface.secret_helpers import mask_secret_fields
|
from src.web_interface.secret_helpers import mask_secret_fields
|
||||||
|
|
||||||
@@ -86,7 +84,7 @@ def load_partial(partial_name):
|
|||||||
elif partial_name == 'operation-history':
|
elif partial_name == 'operation-history':
|
||||||
return _load_operation_history_partial()
|
return _load_operation_history_partial()
|
||||||
else:
|
else:
|
||||||
return "Partial not found", 404
|
return f"Partial '{escape(partial_name)}' not found", 404
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Error loading partial %s", partial_name, exc_info=True)
|
logger.error("Error loading partial %s", partial_name, exc_info=True)
|
||||||
@@ -219,7 +217,7 @@ def _load_plugins_partial():
|
|||||||
plugin_info.update(fresh_manifest)
|
plugin_info.update(fresh_manifest)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# If we can't read the fresh manifest, use the cached one
|
# If we can't read the fresh manifest, use the cached one
|
||||||
logger.warning("Could not read fresh manifest for plugin: %s", plugin_id)
|
logger.warning("Could not read fresh manifest for {plugin_id}")
|
||||||
|
|
||||||
# Get enabled status from config (source of truth)
|
# Get enabled status from config (source of truth)
|
||||||
# Read from config file first, fall back to plugin instance if config doesn't have the key
|
# Read from config file first, fall back to plugin instance if config doesn't have the key
|
||||||
@@ -355,9 +353,9 @@ def _load_plugin_config_partial(plugin_id):
|
|||||||
Load plugin configuration partial - server-side rendered form.
|
Load plugin configuration partial - server-side rendered form.
|
||||||
This replaces the client-side generateConfigForm() JavaScript.
|
This replaces the client-side generateConfigForm() JavaScript.
|
||||||
"""
|
"""
|
||||||
# Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format
|
import re as _re
|
||||||
plugin_id = os.path.basename(plugin_id or '')
|
# Reject plugin IDs containing path-traversal characters before any filesystem use
|
||||||
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._\-:]*$', plugin_id):
|
if not _re.match(r'^[a-zA-Z0-9_\-.:]+$', plugin_id or ''):
|
||||||
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
|
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -368,85 +366,80 @@ def _load_plugin_config_partial(plugin_id):
|
|||||||
if plugin_id.startswith('starlark:'):
|
if plugin_id.startswith('starlark:'):
|
||||||
return _load_starlark_config_partial(plugin_id[len('starlark:'):])
|
return _load_starlark_config_partial(plugin_id[len('starlark:'):])
|
||||||
|
|
||||||
# Resolve and validate all plugin paths against the plugins base directory
|
|
||||||
_plugins_base = Path(pages_v3.plugin_manager.plugins_dir).resolve()
|
|
||||||
_plugin_dir = (_plugins_base / plugin_id).resolve()
|
|
||||||
try:
|
|
||||||
_plugin_dir.relative_to(_plugins_base)
|
|
||||||
except ValueError:
|
|
||||||
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
|
|
||||||
|
|
||||||
# Try to get plugin info first
|
# Try to get plugin info first
|
||||||
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
|
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
|
||||||
|
|
||||||
# If not found, re-discover plugins (handles plugins added after startup)
|
# If not found, re-discover plugins (handles plugins added after startup)
|
||||||
if not plugin_info:
|
if not plugin_info:
|
||||||
pages_v3.plugin_manager.discover_plugins()
|
pages_v3.plugin_manager.discover_plugins()
|
||||||
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
|
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
|
||||||
|
|
||||||
if not plugin_info:
|
if not plugin_info:
|
||||||
return '<div class="text-red-500 p-4">Plugin not found</div>', 404
|
return f'<div class="text-red-500 p-4">Plugin "{escape(plugin_id)}" not found</div>', 404
|
||||||
|
|
||||||
# Get plugin instance (may be None if not loaded)
|
# Get plugin instance (may be None if not loaded)
|
||||||
plugin_instance = pages_v3.plugin_manager.get_plugin(plugin_id)
|
plugin_instance = pages_v3.plugin_manager.get_plugin(plugin_id)
|
||||||
|
|
||||||
# Get plugin configuration from config file
|
# Get plugin configuration from config file
|
||||||
config = {}
|
config = {}
|
||||||
if pages_v3.config_manager:
|
if pages_v3.config_manager:
|
||||||
full_config = pages_v3.config_manager.load_config()
|
full_config = pages_v3.config_manager.load_config()
|
||||||
config = full_config.get(plugin_id, {})
|
config = full_config.get(plugin_id, {})
|
||||||
|
|
||||||
# Load uploaded images from metadata file if images field exists in schema
|
# Load uploaded images from metadata file if images field exists in schema
|
||||||
schema_path_temp = _plugin_dir / "config_schema.json"
|
# This ensures uploaded images appear even if config hasn't been saved yet
|
||||||
|
schema_path_temp = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json"
|
||||||
if schema_path_temp.exists():
|
if schema_path_temp.exists():
|
||||||
try:
|
try:
|
||||||
with open(schema_path_temp, 'r', encoding='utf-8') as f:
|
with open(schema_path_temp, 'r', encoding='utf-8') as f:
|
||||||
temp_schema = json.load(f)
|
temp_schema = json.load(f)
|
||||||
|
# Check if schema has an images field with x-widget: file-upload
|
||||||
if (temp_schema.get('properties', {}).get('images', {}).get('x-widget') == 'file-upload' or
|
if (temp_schema.get('properties', {}).get('images', {}).get('x-widget') == 'file-upload' or
|
||||||
temp_schema.get('properties', {}).get('images', {}).get('x_widget') == 'file-upload'):
|
temp_schema.get('properties', {}).get('images', {}).get('x_widget') == 'file-upload'):
|
||||||
_assets_base = (Path(__file__).parent.parent.parent / 'assets' / 'plugins').resolve()
|
# Load metadata file
|
||||||
metadata_file = (_assets_base / plugin_id / 'uploads' / '.metadata.json').resolve()
|
# Get PROJECT_ROOT relative to this file
|
||||||
try:
|
project_root = Path(__file__).parent.parent.parent
|
||||||
metadata_file.relative_to(_assets_base)
|
metadata_file = project_root / 'assets' / 'plugins' / plugin_id / 'uploads' / '.metadata.json'
|
||||||
except ValueError:
|
if metadata_file.exists():
|
||||||
metadata_file = None
|
|
||||||
if metadata_file and metadata_file.exists():
|
|
||||||
try:
|
try:
|
||||||
with open(metadata_file, 'r', encoding='utf-8') as mf:
|
with open(metadata_file, 'r', encoding='utf-8') as mf:
|
||||||
metadata = json.load(mf)
|
metadata = json.load(mf)
|
||||||
|
# Convert metadata dict to list of image objects
|
||||||
images_from_metadata = list(metadata.values())
|
images_from_metadata = list(metadata.values())
|
||||||
|
# Only use metadata images if config doesn't have images or config images is empty
|
||||||
if not config.get('images') or len(config.get('images', [])) == 0:
|
if not config.get('images') or len(config.get('images', [])) == 0:
|
||||||
config['images'] = images_from_metadata
|
config['images'] = images_from_metadata
|
||||||
else:
|
else:
|
||||||
|
# Merge: add metadata images that aren't already in config
|
||||||
config_image_ids = {img.get('id') for img in config.get('images', []) if img.get('id')}
|
config_image_ids = {img.get('id') for img in config.get('images', []) if img.get('id')}
|
||||||
new_images = [img for img in images_from_metadata if img.get('id') not in config_image_ids]
|
new_images = [img for img in images_from_metadata if img.get('id') not in config_image_ids]
|
||||||
if new_images:
|
if new_images:
|
||||||
config['images'] = config.get('images', []) + new_images
|
config['images'] = config.get('images', []) + new_images
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Could not load plugin upload metadata: %s", e)
|
logger.warning("Could not load metadata for {plugin_id}")
|
||||||
except Exception as e: # nosec B110 - metadata pre-load is optional; schema loads fully below
|
except Exception as e: # nosec B110 - metadata pre-load is optional; schema loads fully below
|
||||||
logger.debug("Metadata pre-load skipped for plugin %s: %s", plugin_id, e)
|
logger.debug("Metadata pre-load skipped for plugin %s: %s", plugin_id, e)
|
||||||
|
|
||||||
# Get plugin schema
|
# Get plugin schema
|
||||||
schema = {}
|
schema = {}
|
||||||
schema_path = _plugin_dir / "config_schema.json"
|
schema_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json"
|
||||||
if schema_path.exists():
|
if schema_path.exists():
|
||||||
try:
|
try:
|
||||||
with open(schema_path, 'r', encoding='utf-8') as f:
|
with open(schema_path, 'r', encoding='utf-8') as f:
|
||||||
schema = json.load(f)
|
schema = json.load(f)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Could not load schema for plugin: %s", e)
|
logger.warning("Could not load schema for {plugin_id}")
|
||||||
|
|
||||||
# Get web UI actions from plugin manifest
|
# Get web UI actions from plugin manifest
|
||||||
web_ui_actions = []
|
web_ui_actions = []
|
||||||
manifest_path = _plugin_dir / "manifest.json"
|
manifest_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "manifest.json"
|
||||||
if manifest_path.exists():
|
if manifest_path.exists():
|
||||||
try:
|
try:
|
||||||
with open(manifest_path, 'r', encoding='utf-8') as f:
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
||||||
manifest = json.load(f)
|
manifest = json.load(f)
|
||||||
web_ui_actions = manifest.get('web_ui_actions', [])
|
web_ui_actions = manifest.get('web_ui_actions', [])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Could not load manifest for plugin: %s", e)
|
logger.warning("Could not load manifest for {plugin_id}")
|
||||||
|
|
||||||
# Mask secret fields before rendering template (fail closed — never leak secrets)
|
# Mask secret fields before rendering template (fail closed — never leak secrets)
|
||||||
schema_properties = schema.get('properties') if isinstance(schema, dict) else None
|
schema_properties = schema.get('properties') if isinstance(schema, dict) else None
|
||||||
@@ -488,18 +481,13 @@ def _load_plugin_config_partial(plugin_id):
|
|||||||
|
|
||||||
def _load_starlark_config_partial(app_id):
|
def _load_starlark_config_partial(app_id):
|
||||||
"""Load configuration partial for a Starlark app."""
|
"""Load configuration partial for a Starlark app."""
|
||||||
# Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format
|
|
||||||
app_id = os.path.basename(app_id or '')
|
|
||||||
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_\-]*$', app_id):
|
|
||||||
return '<div class="text-red-500 p-4">Invalid app ID</div>', 400
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None
|
starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None
|
||||||
|
|
||||||
if starlark_plugin and hasattr(starlark_plugin, 'apps'):
|
if starlark_plugin and hasattr(starlark_plugin, 'apps'):
|
||||||
app = starlark_plugin.apps.get(app_id)
|
app = starlark_plugin.apps.get(app_id)
|
||||||
if not app:
|
if not app:
|
||||||
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
|
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
|
||||||
return render_template(
|
return render_template(
|
||||||
'v3/partials/starlark_config.html',
|
'v3/partials/starlark_config.html',
|
||||||
app_id=app_id,
|
app_id=app_id,
|
||||||
@@ -515,45 +503,36 @@ def _load_starlark_config_partial(app_id):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Standalone: read from manifest file
|
# Standalone: read from manifest file
|
||||||
starlark_base = (Path(__file__).resolve().parent.parent.parent / 'starlark-apps').resolve()
|
manifest_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / 'manifest.json'
|
||||||
manifest_file = starlark_base / 'manifest.json'
|
|
||||||
if not manifest_file.exists():
|
if not manifest_file.exists():
|
||||||
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
|
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
|
||||||
|
|
||||||
with open(manifest_file, 'r') as f:
|
with open(manifest_file, 'r') as f:
|
||||||
manifest = json.load(f)
|
manifest = json.load(f)
|
||||||
|
|
||||||
app_data = manifest.get('apps', {}).get(app_id)
|
app_data = manifest.get('apps', {}).get(app_id)
|
||||||
if not app_data:
|
if not app_data:
|
||||||
return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
|
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
|
||||||
|
|
||||||
# Load schema from schema.json if it exists — validate path stays within starlark_base
|
# Load schema from schema.json if it exists
|
||||||
schema = None
|
schema = None
|
||||||
schema_file = (starlark_base / app_id / 'schema.json').resolve()
|
schema_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'schema.json'
|
||||||
try:
|
if schema_file.exists():
|
||||||
schema_file.relative_to(starlark_base)
|
|
||||||
except ValueError:
|
|
||||||
schema_file = None
|
|
||||||
if schema_file and schema_file.exists():
|
|
||||||
try:
|
try:
|
||||||
with open(schema_file, 'r') as f:
|
with open(schema_file, 'r') as f:
|
||||||
schema = json.load(f)
|
schema = json.load(f)
|
||||||
except (OSError, json.JSONDecodeError) as e:
|
except (OSError, json.JSONDecodeError) as e:
|
||||||
logger.warning("Could not load starlark schema for app: %s", e)
|
logger.warning(f"[Pages V3] Could not load schema for {app_id}: {e}", exc_info=True)
|
||||||
|
|
||||||
# Load config from config.json if it exists — validate path stays within starlark_base
|
# Load config from config.json if it exists
|
||||||
config = {}
|
config = {}
|
||||||
config_file = (starlark_base / app_id / 'config.json').resolve()
|
config_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'config.json'
|
||||||
try:
|
if config_file.exists():
|
||||||
config_file.relative_to(starlark_base)
|
|
||||||
except ValueError:
|
|
||||||
config_file = None
|
|
||||||
if config_file and config_file.exists():
|
|
||||||
try:
|
try:
|
||||||
with open(config_file, 'r') as f:
|
with open(config_file, 'r') as f:
|
||||||
config = json.load(f)
|
config = json.load(f)
|
||||||
except (OSError, json.JSONDecodeError) as e:
|
except (OSError, json.JSONDecodeError) as e:
|
||||||
logger.warning("Could not load starlark config for app: %s", e)
|
logger.warning(f"[Pages V3] Could not load config for {app_id}: {e}", exc_info=True)
|
||||||
|
|
||||||
return render_template(
|
return render_template(
|
||||||
'v3/partials/starlark_config.html',
|
'v3/partials/starlark_config.html',
|
||||||
@@ -570,5 +549,5 @@ def _load_starlark_config_partial(app_id):
|
|||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("[Pages V3] Error loading starlark config for app", exc_info=True)
|
logger.exception(f"[Pages V3] Error loading starlark config for {app_id}")
|
||||||
return '<div class="text-red-500 p-4">Error loading starlark config; see logs for details</div>', 500
|
return f'<div class="text-red-500 p-4">Error loading starlark config: {str(e)}</div>', 500
|
||||||
|
|||||||
Reference in New Issue
Block a user