Files
LEDMatrix/src/cache_manager.py
Chuck 05b3fa56cb fix: Codacy security fixes, CVE dependency bumps, and code quality cleanup (#331)
* fix(deps): bump minimum versions to address CVEs

Pillow 10.4.0 → 12.2.0: CVE-2026-40192 (DoS via FITS decompression bomb),
CVE-2026-25990 (OOB write via PSD image), CVE-2026-42311/42308/42310

requests 2.32.0 → 2.33.0: CVE-2026-25645 (temp file security bypass),
CVE-2024-47081 (.netrc credentials leak)

werkzeug 3.0.0 → 3.1.6: CVE-2023-46136, CVE-2024-49766/49767,
CVE-2025-66221, CVE-2026-21860/27199 (DoS, path traversal, safe_join bypass)

Flask 3.0.0 → 3.1.3: CVE-2026-27205 (session data caching info disclosure)

spotipy 2.24.0 → 2.25.2: CVE-2025-27154, CVE-2025-66040

python-socketio 5.11.0 → 5.14.0: CVE-2025-61765

pytest 7.4.0 → 9.0.3: CVE-2025-71176 (insecure temp dir handling)

Updated in requirements.txt, web_interface/requirements.txt,
plugin-repos/starlark-apps/requirements.txt, and
plugin-repos/march-madness/requirements.txt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve Pylint errors in executor, data service, and odds call

Rename TimeoutError to PluginTimeoutError in plugin_executor.py to
avoid shadowing the built-in; no external callers affected.

Remove dead try/except in BackgroundDataService.shutdown: executor.shutdown()
never accepted a timeout kwarg so the try branch always raised TypeError.
Simplify to a direct shutdown(wait=wait) call.

Remove is_live kwarg from odds_manager.get_odds() call in sports.py;
BaseOddsManager.get_odds() has no such parameter. The live update interval
is already encoded in the update_interval_seconds argument passed alongside.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: MD5→SHA-256, shellcheck warnings, and broken doc links

config_service.py: replace MD5 with SHA-256 for config change detection;
same semantics (equality comparison), no stored hashes affected.

Shell scripts — shellcheck warnings:
- diagnose_web_interface.sh: remove useless cat (SC2002)
- dev_plugin_setup.sh: restructure A&&B||C into if/then (SC2015)
- fix_assets_permissions.sh: remove unused REAL_HOME block (SC2034)
- install_web_service.sh: remove unused USER_HOME assignment (SC2034)
- diagnose_web_ui.sh: remove unused SUDO assignments (SC2034)
- diagnose_plugin_permissions.sh: remove unused BLUE color var (SC2034)
- first_time_install.sh: remove unused CLEAR var, PACKAGE_NAME
  assignment, and replace loop variable with _ (SC2034)

docs/PLUGIN_ARCHITECTURE_SPEC.md: fix 10 broken TOC anchor links to
include section numbers matching the actual headings (MD051).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: remove unused imports and bare exception aliases (pyflakes F401/F841)

Remove unused imports across 86 files in src/, web_interface/, test/,
and scripts/ using autoflake. No logic changes — only dead import
statements and unused names in from-imports are removed.

Also remove bare exception aliases where the variable is never
referenced in the handler body:
- src/cache/disk_cache.py: except (IOError, OSError, PermissionError) as e
- src/cache_manager.py: except (OSError, IOError, PermissionError) as perm_error
- src/plugin_system/resource_monitor.py: except Exception as e
- web_interface/app.py: except Exception as read_err

86 files changed, 205 lines removed, 18 pre-existing test failures unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: remove unused local variable assignments (pyflakes F841)

Dead assignments removed across src/ and web_interface/:

- background_data_service: drop future= on fire-and-forget executor.submit
- base_classes/baseball: drop font= (all rendering uses self.fonts['time'])
- base_classes/hockey: drop status_short= (never referenced after assignment)
- common/cli: drop game_helper=/config_helper= bindings in import-test block;
  constructors called for instantiation-only validation
- common/display_helper: drop text_width= (x_position uses display_width
  directly); drop draw= in create_error_image (uses _draw_centered_text)
- config_manager: remove dead secrets_content loading block in migration path
  (comment already noted save_config_atomic handles secrets internally)
- display_manager: drop setup_start= (timing was never completed or read)
- font_manager: drop target_path= (catalog uses font_file_path directly);
  drop face=/font= bindings in validate_font (validation by construction —
  TypeError on failure is the signal, not the return value)
- font_test_manager: drop width=/height= (draw_text uses display_manager directly)
- plugin_system/state_reconciliation: drop manager= (only config/disk/state_mgr used)
- plugin_system/store_manager: drop result= on pip install subprocess.run
  (check=True raises on failure; stdout unused)
- web_interface/blueprints/pages_v3: drop main_config_path=""/secrets_config_path=""
  (render_template uses config_manager.get_*_path() inline)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(js): resolve ESLint no-undef warnings across 6 JS files

Three distinct patterns:

1. Vendor library globals — htmx is injected by <script> before these
   extension files load; ESLint lints files in isolation and doesn't know.
   Fix: add /* global htmx */ to htmx-sse.js and htmx-json-enc.js.

2. Cross-file globals — showNotification is defined as window.showNotification
   in app.js/notification.js but called bare in app.js and error_handler.js.
   ESLint doesn't connect window.X = Y with a bare call to X.
   Fix: add /* global showNotification */ to app.js and error_handler.js.

3. Forward-reference window.* functions — in array-table.js, checkbox-group.js,
   and custom-feeds.js, functions like removeArrayTableRow are called early
   inside event-handler closures but assigned to window.* later in the file.
   At runtime this works (the handler fires after the assignment), but ESLint
   sees the bare name at the call site.
   Fix: change bare calls to window.removeArrayTableRow(this) etc. so the
   reference is explicit and ESLint-safe.

Also guard the updateSystemStats call in app.js reconnectSSE: the function
is called but defined nowhere in the codebase. Guard with typeof check so
it won't throw ReferenceError if the reconnect path is hit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(js): resolve Biome lint warnings across 9 JS files

noUnusedVariables (catch bindings → optional catch syntax):
- app.js, file-upload.js, timezone-selector.js: } catch (e) { → } catch {
  ES2019 optional catch binding; e was unused in all three handlers

noUnusedVariables (dead assignments):
- app.js: remove const data= in display SSE stub (handler does nothing yet)
- api_client.js: remove const timeoutId= (setTimeout ID never used to cancel)
- custom-feeds.js: remove const oldIndex= (getAttribute result never read)
- schedule-picker.js: remove const compactMode= (never used in HTML build)
- select-dropdown.js: remove const icons= (icons not yet rendered in options)

noPrototypeBuiltins:
- day-selector.js: DAY_LABELS.hasOwnProperty(x) →
  Object.prototype.hasOwnProperty.call(DAY_LABELS, x)
  Safe form that works even on null-prototype objects

useIterableCallbackReturn:
- file-upload.js, notification.js: forEach(x => expr) →
  forEach(x => { expr; }) — forEach ignores return values;
  implicit return from arrow body was misleading

htmx-sse.js is a vendor extension file with old-style var/== patterns
that are correct for it; 18 Biome issues suppressed via Codacy API
rather than modifying the vendor source.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(security): escape user input in raw HTML responses in pages_v3.py

plugin_id comes directly from the URL path
(/partials/plugin-config/<plugin_id>) and was interpolated into an HTML
fragment without escaping. A crafted URL like
/partials/plugin-config/<script>alert(1)</script> would inject that
tag into the DOM via the HTMX partial response.

Fix: wrap all user-controlled values in markupsafe.escape() before
embedding in raw HTML strings. Affects the plugin-not-found 404
response and both error 500 responses in the plugin config partial.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address Bandit B108/B110 across production code

B110 (try/except/pass):
- display_controller.py: narrow 'except Exception' to 'except AttributeError'
  for get_offset_frame() — plugins not having this optional method is the
  expected case, not all exceptions
- config_manager.py: B110 already resolved by the earlier removal of the
  dead secrets-loading block (the except/pass was inside it)
- All other except/pass blocks in src/ and web_interface/ are intentional
  (last-resort recovery, best-effort fallbacks, non-critical startup probes).
  Annotated each with # nosec B110 and a brief inline reason so the decision
  is explicit for future reviewers.
- Test files and plugin-repos B110 suppressed via Codacy API (not prod code).

B108 (/tmp usage):
- permission_utils.py: /tmp listed to PREVENT permission changes on it — not
  used as a temp path. Annotated # nosec B108.
- display_manager.py: fixed snapshot path is intentional (web UI reads same
  path); path-check guard also annotated.
- wifi_manager.py: named /tmp files match the sudoers allowlist installed with
  the system (the paths are hard-coded in both places by design). Annotated
  all six open/cp references # nosec B108.
- scripts/render_plugin.py: dev script default overridable by user. Annotated.
- web_interface/app.py: reads the same fixed path written by display_manager.
  Annotated # nosec B108.
- Test files suppressed via Codacy API.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address remaining Codacy security findings

Flask debug=True (real fix):
- web_interface/app.py: debug=True in __main__ block exposes the Werkzeug
  interactive debugger (arbitrary code execution). Changed to
  os.environ.get('FLASK_DEBUG', '0') == '1' — off by default, opt-in
  via environment variable for local development.

nosec annotations (accepted risk with documented rationale):
- disk_cache.py: os.chmod(0o660) is intentional — web UI and LED matrix
  service share a group, 660 gives group write while denying world access
  (B103 + Semgrep insecure-file-permissions suppressed in Codacy)
- wifi_manager.py: urlopen to hardcoded connectivity-check.ubuntu.com URL
  (B310 — no user input involved)
- font_manager.py: urlretrieve URL comes from user's own config file on
  their local device (B310)
- start_web_conditionally.py: os.execvp with both sys.executable and a
  fixed PROJECT_DIR-relative constant (B606)

Confirmed false positives suppressed via Codacy API (15 issues):
- SSRF (3x): client-side JS fetch — SSRF is server-side; browser fetch
  is CORS-restricted to same origin
- B105 (3x): test fixtures use dummy secrets by design; store_manager
  checks for the placeholder string, it is not itself a secret
- PMD numeric literal (2x): 10000000 is within Number.MAX_SAFE_INTEGER
- Prototype pollution (1x): read-only schema traversal, no writes
- no-unsanitized_method (1x): dynamic import() is CORS-restricted
- detect-unsafe-regex (1x): operates on server-controlled config values
- plugin-repos B103 (1x): vendor code chmod on executable
- Semgrep insecure-file-permissions (3x): same disk_cache 0o660 as above

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: remove unnecessary f prefix from f-strings without placeholders (F541)

Pyflakes F541 flags f-strings that contain no {} interpolation — they are
identical to plain strings but trigger unnecessary string formatting overhead.

Fixed in production code:
- src/base_classes/data_sources.py (2 debug log calls)
- src/logo_downloader.py (1 error log)
- src/plugin_system/store_manager.py (5 strings across 3 log calls)
- src/web_interface/validators.py (1 return value)
- src/wifi_manager.py (4 log/message strings)
- web_interface/start.py (1 print)

F541 issues in test/, scripts/, and plugin-repos/ suppressed via Codacy API
as non-production code.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore(dev): add Pillow compatibility smoke test script

Covers all Pillow APIs used in LEDMatrix — image creation, drawing,
font metrics, LANCZOS resampling, paste/alpha_composite, and PNG I/O.
Run after any Pillow version bump to catch regressions before deploy.

    python3 scripts/dev/test_pillow_compat.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve 8 new Codacy issues introduced by PR changes

shellcheck SC2034:
- first_time_install.sh: 'type' loop variable also unused in the wifi
  status loop (we previously fixed 'device' → '_' but left 'type').
  Changed to '_ _ state' since neither device nor type is referenced.

ESLint no-undef:
- app.js: typeof guards don't satisfy no-undef; added updateSystemStats
  to the /* global */ declaration alongside showNotification.

nosec annotation:
- web_interface/app.py: app.run(host='0.0.0.0') line changed when we
  fixed debug=True, giving it a new issue ID. Re-added # nosec B104.

pyflakes F401:
- scripts/dev/test_pillow_compat.py: ImageFilter was imported but never
  used in the smoke test. Removed from the import.

Codacy API suppressions (false positives on changed lines):
- disk_cache.py 0o660 chmod (2x): lines changed when # nosec B103 was
  added, producing new Semgrep issue IDs. Re-suppressed.
- pages_v3.py raw-html-concat: Semgrep does not recognise escape() as
  a sanitizer; the escape() call IS the correct fix.
- app.py flask 0.0.0.0: same line as B104 above; Semgrep rule also
  re-suppressed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address PR review findings

Fix (10 of 15 findings):

plugin-repos/march-madness/requirements.txt:
  Add urllib3>=1.26.0 — manager.py directly imports from urllib3; it was
  an undeclared transitive dependency via requests.

scripts/dev/dev_plugin_setup.sh:
  Restore subshell form (cd "$target_dir" && git pull --rebase) || true
  so the shell's working directory is not permanently changed after the
  if-cd block. Previous fix for SC2015 leaked cwd into the remainder of
  the script.

src/base_classes/sports.py:
  Narrow 'except Exception' to 'except RuntimeError as e' and log via
  self.logger.debug — Path.home() raises only RuntimeError for service
  users; other exceptions should not be silently swallowed.

src/config_service.py:
  Fix stale "MD5 checksum" in ConfigVersion.__init__ docstring (line 40);
  the implementation uses SHA-256 since the Codacy fix.

src/wifi_manager.py:
  Log the last-resort AP enable failure with exc_info=True instead of
  silently passing — failure here means the device may be unreachable.

web_interface/blueprints/pages_v3.py:
  Log the outer metadata pre-load exception at debug level instead of
  swallowing it silently; schema still loads fully below.

src/background_data_service.py:
  Remove unused 'timeout' parameter from shutdown() — executor.shutdown()
  does not accept timeout; update __del__ caller accordingly.

src/font_manager.py:
  Validate URL scheme before urlretrieve — reject non-http/https schemes
  (e.g. file://) to prevent reading local files from config-supplied URLs.

src/plugin_system/plugin_executor.py:
  Simplify redundant except tuple: (PluginTimeoutError, PluginError,
  Exception) → Exception, which already covers the others.

test/test_display_controller.py:
  Mark empty test_plugin_discovery_and_loading as @pytest.mark.skip with
  reason. Move duplicate 'from datetime import datetime' to module header
  and remove the stray mid-module copy.

Skip (5 of 15 findings, with reasons):
  - pytest 9.0.3 concerns: full suite already verified (467 pass, 18 pre-existing)
  - Pillow 12.2.0 API concerns: no deprecated APIs in codebase; tests + Pi smoke test pass
  - diagnose_web_ui.sh sudo validation: set -e already ensures fail-fast on any sudo failure
  - app.py request-logging except: must stay silent (recursive logging risk); annotated
  - app.py SSE file-read except: genuinely transient I/O; annotated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 10:19:55 -04:00

919 lines
40 KiB
Python

import json
import os
import time
from datetime import datetime
import pytz
from typing import Any, Dict, List, Optional
import logging
import threading
import tempfile
from src.exceptions import CacheError
from src.cache.memory_cache import MemoryCache
from src.cache.disk_cache import DiskCache
from src.cache.cache_strategy import CacheStrategy
from src.cache.cache_metrics import CacheMetrics
from src.logging_config import get_logger
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
class CacheManager:
"""Manages caching of API responses to reduce API calls."""
def __init__(self) -> None:
# Initialize logger first
self.logger: logging.Logger = get_logger(__name__)
# Determine the most reliable writable directory
self.cache_dir: Optional[str] = self._get_writable_cache_dir()
if self.cache_dir:
self.logger.info(f"Using cache directory: {self.cache_dir}")
else:
# This is a critical failure, as caching is essential.
self.logger.error("Could not find or create a writable cache directory. Caching will be disabled.")
self.cache_dir = None
# Initialize config manager for sport-specific intervals
try:
from src.config_manager import ConfigManager
self.config_manager: Optional[Any] = ConfigManager()
self.config_manager.load_config()
except ImportError:
self.config_manager: Optional[Any] = None
self.logger.warning("ConfigManager not available, using default cache intervals")
# Initialize cache components using composition
self._memory_cache_component = MemoryCache(max_size=1000, cleanup_interval=300.0)
self._disk_cache_component = DiskCache(cache_dir=self.cache_dir, logger=self.logger)
self._strategy_component = CacheStrategy(config_manager=self.config_manager, logger=self.logger)
self._metrics_component = CacheMetrics(logger=self.logger)
# Keep old attributes for backward compatibility (delegated to components)
self._memory_cache = self._memory_cache_component._cache
self._memory_cache_timestamps = self._memory_cache_component._timestamps
self._cache_lock = self._memory_cache_component._lock
self._max_memory_cache_size = self._memory_cache_component._max_size
self._memory_cache_cleanup_interval = self._memory_cache_component._cleanup_interval
self._last_memory_cache_cleanup = self._memory_cache_component._last_cleanup
# Disk cleanup configuration
self._disk_cleanup_interval_hours = 24 # Run cleanup every 24 hours
self._disk_cleanup_interval = 3600.0 # Minimum interval between cleanups (1 hour) for throttle
self._last_disk_cleanup = 0.0 # Timestamp of last disk cleanup
self._cleanup_thread: Optional[threading.Thread] = None
self._cleanup_stop_event = threading.Event() # Event to signal thread shutdown
self._retention_policies = {
'odds': 2, # Odds data: 2 days (lines move frequently)
'odds_live': 2, # Live odds: 2 days
'sports_live': 7, # Live sports: 7 days
'weather_current': 7, # Current weather: 7 days
'sports_recent': 7, # Recent games: 7 days
'news': 14, # News: 14 days
'sports_upcoming': 60, # Upcoming games: 60 days (schedules stable)
'sports_schedules': 60, # Schedules: 60 days
'team_info': 60, # Team info: 60 days
'stocks': 14, # Stock data: 14 days
'crypto': 14, # Crypto data: 14 days
'default': 30 # Default: 30 days
}
# Start background cleanup thread only if disk caching is enabled
if self.cache_dir:
self.start_cleanup_thread()
def _get_writable_cache_dir(self) -> Optional[str]:
"""Tries to find or create a writable cache directory, preferring a system path when available."""
# Attempt 1: System-wide persistent cache directory (preferred for services)
try:
system_cache_dir = '/var/cache/ledmatrix'
if os.path.exists(system_cache_dir):
test_file = os.path.join(system_cache_dir, '.writetest')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.logger.info(f"Using system cache directory: {system_cache_dir}")
return system_cache_dir
except (IOError, OSError):
self.logger.debug(f"System cache directory exists but is not writable: {system_cache_dir}")
else:
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
try:
ensure_directory_permissions(Path(system_cache_dir), get_cache_dir_mode())
if os.access(system_cache_dir, os.W_OK):
self.logger.info(f"Using system cache directory: {system_cache_dir}")
return system_cache_dir
except (OSError, IOError, PermissionError):
# Permission errors are expected when running as non-root
self.logger.debug(f"Could not create system cache directory (permission denied): {system_cache_dir}")
except (OSError, IOError, PermissionError) as e:
# Permission errors are expected when running as non-root, log at DEBUG level
self.logger.debug(f"System cache directory not available: {e}")
# Attempt 2: User's home directory (handling sudo), but avoid /root preference
try:
real_user = os.environ.get('SUDO_USER') or os.environ.get('USER', 'default')
if real_user and real_user != 'root':
home_dir = os.path.expanduser(f"~{real_user}")
else:
# When running as root and /var/cache/ledmatrix failed, still allow fallback to /root
home_dir = os.path.expanduser('~')
user_cache_dir = os.path.join(home_dir, '.ledmatrix_cache')
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(user_cache_dir), get_cache_dir_mode())
test_file = os.path.join(user_cache_dir, '.writetest')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.logger.info(f"Using user cache directory: {user_cache_dir}")
return user_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use user-specific cache directory: {e}")
# Attempt 3: /opt/ledmatrix/cache (alternative persistent location)
try:
opt_cache_dir = '/opt/ledmatrix/cache'
# Check if directory exists and we can write to it
if os.path.exists(opt_cache_dir):
# Test if we can write to the existing directory
test_file = os.path.join(opt_cache_dir, '.writetest')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
return opt_cache_dir
except (IOError, OSError):
self.logger.warning(f"Directory exists but is not writable: {opt_cache_dir}")
else:
# Try to create the directory
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(opt_cache_dir), get_cache_dir_mode())
if os.access(opt_cache_dir, os.W_OK):
return opt_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use /opt/ledmatrix/cache: {e}", exc_info=True)
# Attempt 4: System-wide temporary directory (fallback, not persistent)
try:
temp_cache_dir = os.path.join(tempfile.gettempdir(), 'ledmatrix_cache')
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(temp_cache_dir), get_cache_dir_mode())
if os.access(temp_cache_dir, os.W_OK):
self.logger.warning("Using temporary cache directory - cache will NOT persist across restarts")
return temp_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use system-wide temporary cache directory: {e}", exc_info=True)
# Return None if no directory is writable
return None
def _cleanup_memory_cache(self, force: bool = False) -> int:
"""
Clean up expired entries from memory cache and enforce size limits.
Args:
force: If True, perform cleanup regardless of time interval
Returns:
Number of entries removed
"""
now = time.time()
# Check if cleanup is needed
if not force and (now - self._last_memory_cache_cleanup) < self._memory_cache_cleanup_interval:
return 0
with self._cache_lock:
removed_count = 0
current_time = time.time()
# Remove expired entries (entries older than 1 hour without access are considered expired)
# We use a conservative TTL of 1 hour for cleanup
max_age_for_cleanup = 3600 # 1 hour
expired_keys = []
for key, timestamp in list(self._memory_cache_timestamps.items()):
if isinstance(timestamp, str):
try:
timestamp = float(timestamp)
except ValueError:
timestamp = None
if timestamp is None or (current_time - timestamp) > max_age_for_cleanup:
expired_keys.append(key)
# Remove expired entries
for key in expired_keys:
self._memory_cache.pop(key, None)
self._memory_cache_timestamps.pop(key, None)
removed_count += 1
# Enforce size limit by removing oldest entries if cache is too large
if len(self._memory_cache) > self._max_memory_cache_size:
# Sort by timestamp (oldest first)
sorted_entries = sorted(
self._memory_cache_timestamps.items(),
key=lambda x: float(x[1]) if isinstance(x[1], (int, float)) else 0
)
# Remove oldest entries until we're under the limit
excess_count = len(self._memory_cache) - self._max_memory_cache_size
for i in range(excess_count):
if i < len(sorted_entries):
key = sorted_entries[i][0]
self._memory_cache.pop(key, None)
self._memory_cache_timestamps.pop(key, None)
removed_count += 1
self._last_memory_cache_cleanup = current_time
if removed_count > 0:
self.logger.debug(f"Memory cache cleanup: removed {removed_count} entries (current size: {len(self._memory_cache)})")
return removed_count
def _get_cache_path(self, key: str) -> Optional[str]:
"""Get the path for a cache file."""
return self._disk_cache_component.get_cache_path(key)
def get_cached_data(self, key: str, max_age: int = 300, memory_ttl: Optional[int] = None) -> Optional[Dict[str, Any]]:
"""Get data from cache (memory first, then disk) honoring TTLs.
- memory_ttl: TTL for in-memory entry; defaults to max_age if not provided
- max_age: TTL for persisted (on-disk) entry based on the stored timestamp
"""
# Periodic cleanup of memory cache
self._cleanup_memory_cache()
in_memory_ttl = memory_ttl if memory_ttl is not None else max_age
# 1) Memory cache
cached = self._memory_cache_component.get(key, max_age=in_memory_ttl)
if cached is not None:
return cached
# 2) Disk cache
record = self._disk_cache_component.get(key, max_age=max_age)
if record is not None:
# Hydrate memory cache (use current time to start memory TTL window)
self._memory_cache_component.set(key, record)
return record
# 3) Miss
return None
def save_cache(self, key: str, data: Dict[str, Any]) -> None:
"""
Save data to cache.
Args:
key: Cache key
data: Data to cache
"""
# Periodic cleanup before adding new entries
self._cleanup_memory_cache()
# Update memory cache first
self._memory_cache_component.set(key, data)
# Save to disk cache
try:
self._disk_cache_component.set(key, data)
except CacheError:
# Disk cache errors are already logged and raised by DiskCache
raise
def load_cache(self, key: str) -> Optional[Dict[str, Any]]:
"""Load data from cache with memory caching."""
# Check memory cache first (1 minute TTL)
cached = self._memory_cache_component.get(key, max_age=60)
if cached is not None:
return cached
# Check disk cache
data = self._disk_cache_component.get(key, max_age=3600) # 1 hour for load_cache
if data is not None:
# Update memory cache
self._memory_cache_component.set(key, data)
return data
return None
def clear_cache(self, key: Optional[str] = None) -> None:
"""Clear cache entries.
Pass a non-empty ``key`` to remove a single entry, or pass
``None`` (the default) to clear every cached entry. An empty
string is rejected to prevent accidental whole-cache wipes
from callers that pass through unvalidated input.
"""
if key is None:
# Clear all keys
memory_count = self._memory_cache_component.size()
self._memory_cache_component.clear()
self._disk_cache_component.clear()
self.logger.info("Cleared all cache: %d memory entries", memory_count)
return
if not isinstance(key, str) or not key:
raise ValueError(
"clear_cache(key) requires a non-empty string; "
"pass key=None to clear all entries"
)
# Clear specific key
self._memory_cache_component.clear(key)
self._disk_cache_component.clear(key)
self.logger.info("Cleared cache for key: %s", key)
def delete(self, key: str) -> None:
"""Remove a single cache entry.
Thin wrapper around :meth:`clear_cache` that **requires** a
non-empty string key — unlike ``clear_cache(None)`` it never
wipes every entry. Raises ``ValueError`` on ``None`` or an
empty string.
"""
if key is None or not isinstance(key, str) or not key:
raise ValueError("delete(key) requires a non-empty string key")
self.clear_cache(key)
def list_cache_files(self) -> List[Dict[str, Any]]:
"""List all cache files with metadata (key, age, size, path).
Returns:
List of dicts with keys: 'key', 'filename', 'age_seconds', 'age_display',
'size_bytes', 'size_display', 'path', 'modified_time'
"""
if not self.cache_dir or not os.path.exists(self.cache_dir):
return []
cache_files = []
current_time = time.time()
try:
with self._cache_lock:
for filename in os.listdir(self.cache_dir):
if not filename.endswith('.json'):
continue
# Extract key from filename (remove .json extension)
key = filename[:-5] # Remove '.json'
file_path = os.path.join(self.cache_dir, filename)
try:
# Get file stats
stat_info = os.stat(file_path)
size_bytes = stat_info.st_size
modified_time = stat_info.st_mtime
age_seconds = current_time - modified_time
# Format age display
if age_seconds < 60:
age_display = f"{int(age_seconds)}s"
elif age_seconds < 3600:
age_display = f"{int(age_seconds / 60)}m"
elif age_seconds < 86400:
age_display = f"{int(age_seconds / 3600)}h"
else:
age_display = f"{int(age_seconds / 86400)}d"
# Format size display
if size_bytes < 1024:
size_display = f"{size_bytes}B"
elif size_bytes < 1024 * 1024:
size_display = f"{size_bytes / 1024:.1f}KB"
else:
size_display = f"{size_bytes / (1024 * 1024):.1f}MB"
cache_files.append({
'key': key,
'filename': filename,
'age_seconds': age_seconds,
'age_display': age_display,
'size_bytes': size_bytes,
'size_display': size_display,
'path': file_path,
'modified_time': modified_time,
'modified_datetime': datetime.fromtimestamp(modified_time).isoformat()
})
except OSError as e:
self.logger.warning(f"Error getting stats for cache file {filename} at {file_path}: {e}", exc_info=True)
continue
except OSError as e:
self.logger.error(f"Error listing cache directory {self.cache_dir}: {e}", exc_info=True)
return []
# Sort by modified time (newest first)
cache_files.sort(key=lambda x: x['modified_time'], reverse=True)
return cache_files
def get_cache_dir(self) -> Optional[str]:
"""Get the cache directory path."""
return self.cache_dir
def has_data_changed(self, data_type: str, new_data: Dict[str, Any]) -> bool:
"""Check if data has changed from cached version."""
cached_data = self.load_cache(data_type)
if not cached_data:
return True
if data_type == 'weather':
return self._has_weather_changed(cached_data, new_data)
elif data_type == 'stocks':
return self._has_stocks_changed(cached_data, new_data)
elif data_type == 'stock_news':
return self._has_news_changed(cached_data, new_data)
elif data_type == 'nhl':
return self._has_nhl_changed(cached_data, new_data)
elif data_type == 'mlb':
return self._has_mlb_changed(cached_data, new_data)
return True
def _has_weather_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if weather data has changed."""
# Handle new cache structure where data is nested under 'data' key
if 'data' in cached:
cached = cached['data']
# Handle case where cached data might be the weather data directly
if 'current' in cached:
# This is the new structure with 'current' and 'forecast' keys
current_weather = cached.get('current', {})
if current_weather and 'main' in current_weather and 'weather' in current_weather:
cached_temp = round(current_weather['main']['temp'])
cached_condition = current_weather['weather'][0]['main']
return (cached_temp != new.get('temp') or
cached_condition != new.get('condition'))
# Handle old structure where temp and condition are directly accessible
return (cached.get('temp') != new.get('temp') or
cached.get('condition') != new.get('condition'))
def _has_stocks_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if stock data has changed."""
if not self._is_market_open():
return False
return cached.get('price') != new.get('price')
def _has_news_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if news data has changed."""
# Handle both dictionary and list formats
if isinstance(new, list):
# If new data is a list, cached data should also be a list
if not isinstance(cached, list):
return True
# Compare lengths and content
if len(cached) != len(new):
return True
# Compare titles since they're unique enough for our purposes
cached_titles = set(item.get('title', '') for item in cached)
new_titles = set(item.get('title', '') for item in new)
return cached_titles != new_titles
else:
# Original dictionary format handling
cached_headlines = set(h.get('id') for h in cached.get('headlines', []))
new_headlines = set(h.get('id') for h in new.get('headlines', []))
return not cached_headlines.issuperset(new_headlines)
def _has_nhl_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if NHL data has changed."""
return (cached.get('game_status') != new.get('game_status') or
cached.get('score') != new.get('score'))
def _has_mlb_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if MLB game data has changed."""
if not cached or not new:
return True
# Check if any games have changed status or score
for game_id, new_game in new.items():
cached_game = cached.get(game_id)
if not cached_game:
return True
# Check for score changes
if (new_game['away_score'] != cached_game['away_score'] or
new_game['home_score'] != cached_game['home_score']):
return True
# Check for status changes
if new_game['status'] != cached_game['status']:
return True
# For live games, check inning and count
if new_game['status'] == 'in':
if (new_game['inning'] != cached_game['inning'] or
new_game['inning_half'] != cached_game['inning_half'] or
new_game['balls'] != cached_game['balls'] or
new_game['strikes'] != cached_game['strikes'] or
new_game['bases_occupied'] != cached_game['bases_occupied']):
return True
return False
def _is_market_open(self) -> bool:
"""Check if the US stock market is currently open."""
return self._strategy_component.is_market_open()
def update_cache(self, data_type: str, data: Dict[str, Any]) -> bool:
"""Update cache with new data."""
cache_data = {
'data': data,
'timestamp': time.time()
}
return self.save_cache(data_type, cache_data)
def get(self, key: str, max_age: int = 300) -> Optional[Dict[str, Any]]:
"""Get data from cache if it exists and is not stale."""
cached_data = self.get_cached_data(key, max_age)
if cached_data and 'data' in cached_data:
return cached_data['data']
return cached_data
def set(self, key: str, data: Dict[str, Any], ttl: Optional[int] = None) -> None:
"""
Store data in cache with current timestamp.
Args:
key: Cache key
data: Data to cache
ttl: Optional time-to-live in seconds (stored for compatibility but
expiration is still controlled via max_age when reading)
"""
cache_data = {
'data': data,
'timestamp': time.time()
}
if ttl is not None:
cache_data['ttl'] = ttl
self.save_cache(key, cache_data)
def setup_persistent_cache(self) -> bool:
"""
Set up a persistent cache directory with proper permissions.
This should be run once with sudo to create the directory.
"""
try:
# Try to create /var/cache/ledmatrix with proper permissions
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
cache_dir = '/var/cache/ledmatrix'
cache_dir_path = Path(cache_dir)
ensure_directory_permissions(cache_dir_path, get_cache_dir_mode())
# Set ownership to the real user (not root)
real_user = os.environ.get('SUDO_USER')
if real_user:
import pwd
try:
uid = pwd.getpwnam(real_user).pw_uid
gid = pwd.getpwnam(real_user).pw_gid
os.chown(cache_dir, uid, gid)
self.logger.info(f"Set ownership of {cache_dir} to {real_user}")
except (OSError, KeyError) as e:
self.logger.warning(f"Could not set ownership for {cache_dir}: {e}", exc_info=True)
self.logger.info(f"Successfully set up persistent cache directory: {cache_dir}")
return True
except (OSError, IOError, PermissionError) as e:
self.logger.error(f"Failed to set up persistent cache directory {cache_dir}: {e}", exc_info=True)
return False
def cleanup_disk_cache(self, force: bool = False) -> Dict[str, Any]:
"""
Clean up expired disk cache files based on retention policies.
Args:
force: If True, run cleanup regardless of last cleanup time
Returns:
Dictionary with cleanup statistics
"""
now = time.time()
# Check if cleanup is needed (throttle to prevent too-frequent cleanups)
if not force and (now - self._last_disk_cleanup) < self._disk_cleanup_interval:
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 0,
'duration_sec': 0.0
}
start_time = time.time()
try:
# Perform cleanup
stats = self._disk_cache_component.cleanup_expired_files(
cache_strategy=self._strategy_component,
retention_policies=self._retention_policies
)
duration = time.time() - start_time
space_freed_mb = stats['space_freed_bytes'] / (1024 * 1024)
# Record metrics
self._metrics_component.record_disk_cleanup(
files_cleaned=stats['files_deleted'],
space_freed_mb=space_freed_mb,
duration_sec=duration
)
# Log summary
if stats['files_deleted'] > 0:
self.logger.info(
"Disk cache cleanup completed: %d/%d files deleted, %.2f MB freed, %d errors, took %.2fs",
stats['files_deleted'], stats['files_scanned'], space_freed_mb,
stats['errors'], duration
)
else:
self.logger.debug(
"Disk cache cleanup completed: no files to delete (%d files scanned)",
stats['files_scanned']
)
# Update last cleanup time
self._last_disk_cleanup = time.time()
return {
'files_scanned': stats['files_scanned'],
'files_deleted': stats['files_deleted'],
'space_freed_mb': space_freed_mb,
'errors': stats['errors'],
'duration_sec': duration
}
except Exception as e:
self.logger.error("Error during disk cache cleanup: %s", e, exc_info=True)
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 1,
'duration_sec': time.time() - start_time
}
def start_cleanup_thread(self) -> None:
"""Start background thread for periodic disk cache cleanup."""
if self._cleanup_thread and self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread already running")
return
def cleanup_loop():
"""Background loop that runs cleanup periodically."""
self.logger.info("Disk cache cleanup thread started (interval: %d hours)",
self._disk_cleanup_interval_hours)
# Run initial cleanup on startup (deferred from __init__ to avoid blocking)
try:
self.logger.debug("Running initial disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in initial cleanup: %s", e, exc_info=True)
# Main cleanup loop
while not self._cleanup_stop_event.is_set():
try:
# Sleep for the configured interval (interruptible)
sleep_seconds = self._disk_cleanup_interval_hours * 3600
if self._cleanup_stop_event.wait(timeout=sleep_seconds):
# Event was set, exit loop
break
# Run cleanup if not stopped
if not self._cleanup_stop_event.is_set():
self.logger.debug("Running scheduled disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in cleanup thread: %s", e, exc_info=True)
# Continue running despite errors, but use interruptible sleep
if self._cleanup_stop_event.wait(timeout=60):
# Event was set during error recovery sleep, exit loop
break
self.logger.info("Disk cache cleanup thread stopped")
self._cleanup_stop_event.clear() # Reset event before starting thread
self._cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="DiskCacheCleanup")
self._cleanup_thread.start()
self.logger.info("Started disk cache cleanup background thread")
def stop_cleanup_thread(self) -> None:
"""
Stop the background cleanup thread gracefully.
Signals the thread to stop and waits for it to finish (with timeout).
This allows for clean shutdown during testing or application termination.
"""
if not self._cleanup_thread or not self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread not running")
return
self.logger.info("Stopping disk cache cleanup thread...")
self._cleanup_stop_event.set() # Signal thread to stop
# Wait for thread to finish (with timeout to avoid hanging)
self._cleanup_thread.join(timeout=5.0)
if self._cleanup_thread.is_alive():
self.logger.warning("Cleanup thread did not stop within timeout, thread may still be running")
else:
self.logger.info("Disk cache cleanup thread stopped successfully")
def get_sport_live_interval(self, sport_key: str) -> int:
"""
Get the live_update_interval for a specific sport from config.
Falls back to default values if config is not available.
"""
return self._strategy_component.get_sport_live_interval(sport_key)
def get_cache_strategy(self, data_type: str, sport_key: Optional[str] = None) -> Dict[str, Any]:
"""
Get cache strategy for different data types.
Now respects sport-specific live_update_interval configurations.
"""
return self._strategy_component.get_cache_strategy(data_type, sport_key)
def get_data_type_from_key(self, key: str) -> str:
"""
Determine the appropriate cache strategy based on the cache key.
This helps automatically select the right cache duration.
"""
return self._strategy_component.get_data_type_from_key(key)
def get_sport_key_from_cache_key(self, key: str) -> Optional[str]:
"""
Extract sport key from cache key to determine appropriate live_update_interval.
"""
return self._strategy_component.get_sport_key_from_cache_key(key)
def get_cached_data_with_strategy(self, key: str, data_type: str = 'default') -> Optional[Dict[str, Any]]:
"""
Get data from cache using data-type-specific strategy.
Now respects sport-specific live_update_interval configurations.
"""
# Extract sport key for live sports data
sport_key = None
if data_type in ['sports_live', 'live_scores']:
sport_key = self._strategy_component.get_sport_key_from_cache_key(key)
strategy = self._strategy_component.get_cache_strategy(data_type, sport_key)
max_age = strategy['max_age']
memory_ttl = strategy.get('memory_ttl', max_age)
# For market data, check if market is open
if strategy.get('market_hours_only', False) and not self._strategy_component.is_market_open():
# During off-hours, extend cache duration
max_age *= 4 # 4x longer cache during off-hours
record = self.get_cached_data(key, max_age, memory_ttl)
# Unwrap if stored in { 'data': ..., 'timestamp': ... }
if isinstance(record, dict) and 'data' in record:
return record['data']
return record
def get_with_auto_strategy(self, key: str) -> Optional[Dict[str, Any]]:
"""
Get cached data using automatically determined strategy.
Now respects sport-specific live_update_interval configurations.
"""
data_type = self.get_data_type_from_key(key)
return self.get_cached_data_with_strategy(key, data_type)
def get_background_cached_data(self, key: str, sport_key: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Get data from background service cache with appropriate strategy.
This method is specifically designed for Recent/Upcoming managers
to use data cached by the background service.
Args:
key: Cache key to retrieve
sport_key: Sport key for determining appropriate cache strategy
Returns:
Cached data if available and fresh, None otherwise
"""
# Determine the appropriate cache strategy
data_type = self.get_data_type_from_key(key)
strategy = self.get_cache_strategy(data_type, sport_key)
# For Recent/Upcoming managers, we want to use the background service cache
# which should have longer TTLs than the individual manager caches
max_age = strategy['max_age']
memory_ttl = strategy.get('memory_ttl', max_age)
# Get the cached data
cached_data = self.get_cached_data(key, max_age, memory_ttl)
if cached_data:
# Record cache hit for performance monitoring
self.record_cache_hit('background')
# Unwrap if stored in { 'data': ..., 'timestamp': ... } format
if isinstance(cached_data, dict) and 'data' in cached_data:
return cached_data['data']
return cached_data
# Record cache miss for performance monitoring
self.record_cache_miss('background')
return None
def is_background_data_available(self, key: str, sport_key: Optional[str] = None) -> bool:
"""
Check if background service has fresh data available.
This helps Recent/Upcoming managers determine if they should
wait for background data or fetch immediately.
"""
data_type = self.get_data_type_from_key(key)
strategy = self.get_cache_strategy(data_type, sport_key)
# Check if we have data that's still fresh according to background service TTL
cached_data = self.get_cached_data(key, strategy['max_age'])
return cached_data is not None
def generate_sport_cache_key(self, sport: str, date_str: Optional[str] = None) -> str:
"""
Centralized cache key generation for sports data.
This ensures consistent cache keys across background service and managers.
Args:
sport: Sport identifier (e.g., 'nba', 'nfl', 'ncaa_fb')
date_str: Date string in YYYYMMDD format. If None, uses current UTC date.
Returns:
Cache key in format: {sport}_{date}
"""
if date_str is None:
date_str = datetime.now(pytz.utc).strftime('%Y%m%d')
return f"{sport}_{date_str}"
def record_cache_hit(self, cache_type: str = 'regular') -> None:
"""Record a cache hit for performance monitoring."""
self._metrics_component.record_hit(cache_type)
def record_cache_miss(self, cache_type: str = 'regular') -> None:
"""Record a cache miss for performance monitoring."""
self._metrics_component.record_miss(cache_type)
def record_fetch_time(self, duration: float) -> None:
"""Record fetch operation duration for performance monitoring."""
self._metrics_component.record_fetch_time(duration)
def get_cache_metrics(self) -> Dict[str, Any]:
"""Get current cache performance metrics."""
return self._metrics_component.get_metrics()
def log_cache_metrics(self) -> None:
"""Log current cache performance metrics."""
self._metrics_component.log_metrics()
def get_memory_cache_stats(self) -> Dict[str, Any]:
"""
Get statistics about the memory cache.
Returns:
Dictionary with memory cache statistics
"""
with self._cache_lock:
return {
'size': len(self._memory_cache),
'max_size': self._max_memory_cache_size,
'usage_percent': (len(self._memory_cache) / self._max_memory_cache_size * 100) if self._max_memory_cache_size > 0 else 0,
'last_cleanup': self._last_memory_cache_cleanup,
'cleanup_interval': self._memory_cache_cleanup_interval
}
def log_memory_cache_stats(self) -> None:
"""Log current memory cache statistics."""
stats = self.get_memory_cache_stats()
self.logger.info(f"Memory Cache - Size: {stats['size']}/{stats['max_size']} "
f"({stats['usage_percent']:.1f}%), "
f"Last cleanup: {time.time() - stats['last_cleanup']:.1f}s ago")