Files
LEDMatrix/src/cache_manager.py
Chuck eedf680a8c perf: display pipeline optimizations — caching, logging, scroll, text width (#358)
* docs(core): add module and class docstrings to the 5 undocumented core files

Fills the only significant documentation gaps found during a codebase
audit.  All other core files (plugin_system/, logging_config.py, etc.)
already have complete module, class, and function docstrings.

Files changed (documentation only — zero logic changes):

  display_controller.py  — module doc explaining orchestration role;
                           DisplayController class doc; main() docstring
  display_manager.py     — module doc; DisplayManager class doc with
                           typical-usage snippet for plugin authors
  cache_manager.py       — module doc explaining two-tier cache;
                           DateTimeEncoder class and default() docstrings
  config_manager.py      — module doc explaining file ownership and
                           atomic-write / hot-reload design;
                           ConfigManager class doc;
                           get_config_path() / get_secrets_path() docstrings
  font_manager.py        — module doc (class docstring already existed)

Also noted (but not changed to avoid behaviour risk):
  display_manager.py and font_manager.py use logging.getLogger() directly
  instead of the project's get_logger() wrapper.  display_manager.py also
  calls setLevel(logging.INFO) immediately after, which would be lost if
  switched to get_logger().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* perf(display_controller): three targeted hot-path optimizations

Opt 1 — cache inspect.signature() per plugin_id
  inspect.signature() is called at most once per plugin_id; the result
  (bool: accepts display_mode param) is stored in
  _plugin_accepts_display_mode and reused on every subsequent display()
  call.  Eliminates all reflection from the display path at runtime.
  Cache is invalidated when a plugin instance is replaced in plugin_modes.

Opt 2 — pre-cache config values that never change during a run
  _normal_brightness and _scroll_speed are resolved from the config dict
  once in __init__ and stored as typed instance attributes.
  - Removes 2+ chained dict.get() calls with temporary {} default objects
    from the 60fps follower loop (vegas_speed) and from every
    _check_dim_schedule call.
  - current_brightness init now uses _normal_brightness directly.

Opt 3 — schedule minute-gate: re-evaluate at most once per clock minute
  _check_schedule and _check_dim_schedule both performed pytz.timezone(),
  datetime.now(), strftime(), and datetime.strptime() on every outer loop
  call.  Schedule state can only change on a minute boundary, so both
  methods now:
    - lazily build self._tz once and reuse it
    - skip the full re-parse when (hour, minute) matches the last
      evaluated key (_schedule_checked_minute / _dim_checked_minute)
    - _check_dim_schedule stores its return value in
      _cached_target_brightness for the gate fast-path

Tests: 23 new tests in test_display_controller_optimizations.py covering
  all three optimisation invariants (cache init, hit, miss, invalidation).
  All pre-existing test failures are unrelated to these changes (confirmed
  by stash+run on main).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve 22 pre-existing test failures across 6 groups

Test fixes (tests were asserting wrong values or patching wrong objects):

  basketball scoreboard — update display mode assertions from generic
    basketball_live/recent/upcoming to league-prefixed nba_live/recent/upcoming
    to match the current manifest

  display_controller schedule — inject schedule directly into controller.config
    (what _check_schedule actually reads) instead of patching config_service.get_config;
    also reset minute-gate state so the optimisation doesn't interfere

  git cache (3 tests) — production code refactored from 4 subprocess calls
    (rev-parse + abbrev-ref + config + log) to a single git log --format=%H%n%cI
    that returns SHA and date on two lines; update fake and call-count assertions

  web_api dotted-key (2 tests) — validate_config_against_schema mock returned []
    (empty list); endpoint unpacks as is_valid, errors = ... causing ValueError;
    fix: return_value = (True, [])

  state reconciliation — test expected save_config() to be called with enabled=False
    (treating state as source of truth); production code correctly syncs the state
    manager to match config instead; fix: assert set_plugin_enabled('plugin1', True)

Production fixes (production code had bugs or missing features):

  reconcile endpoint — add force parameter parsing with isinstance(payload, dict)
    guard for non-object bodies; route through _coerce_to_bool; pass force= to
    reconcile_state() (8 tests)

  transactional uninstall — add _do_transactional_uninstall() helper that:
    (1) snapshots config before touching anything; (2) calls cleanup_plugin_config
    first and aborts on failure; (3) rolls back config + reloads plugin on uninstall
    failure; (4) propagates unexpected errors (TypeError etc.) instead of swallowing
    them (6 tests)

  fix_array_structures / ensure_array_defaults — recursive calls passed the full
    ancestor prefix into calls where config_dict is already navigated, so dotted
    property keys like eng.1 caused parent_parts.split('.') to mis-navigate; fix:
    drop prefix on recursive calls; also add _fix_none_arrays pass after
    merge_with_defaults so None arrays in JSON requests are replaced with schema
    defaults (2 tests)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* perf: four targeted optimizations across the display pipeline

Opt 1 — cache data-fetch interval per plugin (plugin_manager.py)
  _get_plugin_update_interval fell back to config_manager.get_config()
  (a full dict copy) when the manifest lacked an interval.  Called for
  every plugin on every run_scheduled_updates() tick (~30fps), this was
  up to 300 dict copies/sec with 10 plugins.
  Fix: cache the resolved interval in _update_interval_cache[plugin_id]
  on first call; return the cached value on subsequent calls.  Cache is
  cleared on load_plugin and unload_plugin.

Opt 2 — demote noisy per-cycle INFO logs to DEBUG (display_controller.py)
  Four logger.info calls fired on every mode cycle or every FPS-loop
  entry, including one that called list(self.plugin_modes.keys())
  unconditionally (allocating a list every outer loop iteration).
  - "Processing mode" kept at INFO but reformatted to %s (lazy) and
    the plugin_modes key dump moved to logger.debug
  - "Attempting/Got cycle duration" → logger.debug
  - "Entering high/normal FPS loop" → logger.debug
  Mode name at INFO is preserved for black-screen troubleshooting.

Opt 3 — use Image.frombytes instead of Image.fromarray in scroll hot path
  (scroll_helper.py)
  Image.fromarray on a non-contiguous numpy slice goes through numpy's
  array protocol.  Image.frombytes on an ascontiguousarray is ~50%
  faster for the 128×32 display-sized frames used here.  Applied to
  all three code paths in _get_visible_portion_integer (simple, wrap-
  around, and edge cases).

Opt 5 — cache get_text_width per (text, font) pair (display_manager.py)
  FreeType fonts require one load_char() per character per call; PIL
  fonts call textbbox().  Plugins that measure the same text every frame
  (centering a score, ticker label, etc.) were re-measuring from scratch
  on every display() call.
  Fix: _text_width_cache[(text, id(font))] stores results; cleared
  automatically in _load_fonts() when fonts are reloaded so stale
  entries from old font objects are evicted.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(scroll_helper): fix edge-case bug exposed by frombytes switch

The previous commit replaced Image.fromarray with Image.frombytes in
_get_visible_portion_integer.  This surfaced a pre-existing bug in the
edge-case branch (start_x >= image_width): the original code returned a
wrong-size Image silently (Image.fromarray accepts a too-short array);
Image.frombytes raises ValueError instead.

Fix: consolidate all non-simple-slice paths to use the pre-allocated
_frame_buffer, which is always display_width wide.  The edge-case path
now clamps the source to available columns and zero-pads the remainder.

Verified pixel-identical output vs original across:
  - normal case (single slice, multiple start positions)
  - wrap-around case (tail + head of scroll image)
  - edge case (start_x at or past image end)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address CodeRabbit review comments on PR #358

1. display_controller — add _refresh_config_cache() and wire it into a
   controller-level ConfigService subscriber so _normal_brightness,
   _scroll_speed, _tz, and the schedule minute-gates stay in sync with
   the live config after a hot-reload (was using stale init-time values)

2. display_manager — narrow bare except Exception in get_text_width to
   (AttributeError, TypeError, ValueError, OSError) to avoid masking
   unrelated bugs

3. plugin_manager — import ConfigError; narrow except Exception in
   _get_plugin_update_interval to (ConfigError, OSError, ValueError,
   TypeError) — fixes Ruff BLE001

4. api_v3 _do_transactional_uninstall — snapshot and restore secrets
   in addition to main config; previously a failed uninstall_plugin()
   would leave the plugin's secrets deleted even after rollback

5. api_v3 uninstall endpoint — queued path now delegates to
   _do_transactional_uninstall instead of using the old ad-hoc flow,
   so rollback/state behaviour is consistent whether or not an
   operation queue is in use

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(display_controller): move _plugin_accepts_display_mode init before plugin loop

Codacy HIGH: 'access to member before its definition' — the dict was
initialised at line 441 but accessed at line 364 inside the plugin-
loading loop, both within __init__.

Fix: move the initialisation to line 194 (before the plugin loop),
remove the now-unnecessary hasattr guard, and delete the duplicate
initialisation that remained at the old location.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 11:58:21 -04:00

947 lines
41 KiB
Python

"""
Cache Manager — multi-tier response cache for the LEDMatrix application.
:class:`CacheManager` provides a unified caching layer used by all plugins
to reduce external API calls and survive network outages gracefully.
Two storage tiers
-----------------
* **Memory tier** (:class:`~src.cache.memory_cache.MemoryCache`): fast LRU
cache (up to 1 000 entries by default). Hit on this tier before touching
disk.
* **Disk tier** (:class:`~src.cache.disk_cache.DiskCache`): filesystem-backed
persistent store that survives process restarts.
Data written to cache is serialised as JSON. :class:`DateTimeEncoder` handles
``datetime`` objects transparently so callers don't have to pre-serialise them.
Typical plugin usage::
data = self.cache_manager.get_cached_data('my_key', max_age=300)
if data is None:
data = fetch_from_api()
self.cache_manager.save_cache('my_key', data)
"""
import json
import os
import time
from datetime import datetime
import pytz
from typing import Any, Dict, List, Optional
import logging
import threading
import tempfile
from src.exceptions import CacheError
from src.cache.memory_cache import MemoryCache
from src.cache.disk_cache import DiskCache
from src.cache.cache_strategy import CacheStrategy
from src.cache.cache_metrics import CacheMetrics
from src.logging_config import get_logger
class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that serialises ``datetime`` objects as ISO-8601 strings."""
def default(self, obj):
"""Return ISO-8601 string for datetime; delegate all other types to the base encoder."""
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
class CacheManager:
"""Manages caching of API responses to reduce API calls."""
def __init__(self) -> None:
# Initialize logger first
self.logger: logging.Logger = get_logger(__name__)
# Determine the most reliable writable directory
self.cache_dir: Optional[str] = self._get_writable_cache_dir()
if self.cache_dir:
self.logger.info(f"Using cache directory: {self.cache_dir}")
else:
# This is a critical failure, as caching is essential.
self.logger.error("Could not find or create a writable cache directory. Caching will be disabled.")
self.cache_dir = None
# Initialize config manager for sport-specific intervals
try:
from src.config_manager import ConfigManager
self.config_manager: Optional[Any] = ConfigManager()
self.config_manager.load_config()
except ImportError:
self.config_manager: Optional[Any] = None
self.logger.warning("ConfigManager not available, using default cache intervals")
# Initialize cache components using composition
self._memory_cache_component = MemoryCache(max_size=1000, cleanup_interval=300.0)
self._disk_cache_component = DiskCache(cache_dir=self.cache_dir, logger=self.logger)
self._strategy_component = CacheStrategy(config_manager=self.config_manager, logger=self.logger)
self._metrics_component = CacheMetrics(logger=self.logger)
# Keep old attributes for backward compatibility (delegated to components)
self._memory_cache = self._memory_cache_component._cache
self._memory_cache_timestamps = self._memory_cache_component._timestamps
self._cache_lock = self._memory_cache_component._lock
self._max_memory_cache_size = self._memory_cache_component._max_size
self._memory_cache_cleanup_interval = self._memory_cache_component._cleanup_interval
self._last_memory_cache_cleanup = self._memory_cache_component._last_cleanup
# Disk cleanup configuration
self._disk_cleanup_interval_hours = 24 # Run cleanup every 24 hours
self._disk_cleanup_interval = 3600.0 # Minimum interval between cleanups (1 hour) for throttle
self._last_disk_cleanup = 0.0 # Timestamp of last disk cleanup
self._cleanup_thread: Optional[threading.Thread] = None
self._cleanup_stop_event = threading.Event() # Event to signal thread shutdown
self._retention_policies = {
'odds': 2, # Odds data: 2 days (lines move frequently)
'odds_live': 2, # Live odds: 2 days
'sports_live': 7, # Live sports: 7 days
'weather_current': 7, # Current weather: 7 days
'sports_recent': 7, # Recent games: 7 days
'news': 14, # News: 14 days
'sports_upcoming': 60, # Upcoming games: 60 days (schedules stable)
'sports_schedules': 60, # Schedules: 60 days
'team_info': 60, # Team info: 60 days
'stocks': 14, # Stock data: 14 days
'crypto': 14, # Crypto data: 14 days
'default': 30 # Default: 30 days
}
# Start background cleanup thread only if disk caching is enabled
if self.cache_dir:
self.start_cleanup_thread()
def _get_writable_cache_dir(self) -> Optional[str]:
"""Tries to find or create a writable cache directory, preferring a system path when available."""
# Attempt 1: System-wide persistent cache directory (preferred for services)
try:
system_cache_dir = '/var/cache/ledmatrix'
if os.path.exists(system_cache_dir):
test_file = os.path.join(system_cache_dir, '.writetest')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.logger.info(f"Using system cache directory: {system_cache_dir}")
return system_cache_dir
except (IOError, OSError):
self.logger.debug(f"System cache directory exists but is not writable: {system_cache_dir}")
else:
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
try:
ensure_directory_permissions(Path(system_cache_dir), get_cache_dir_mode())
if os.access(system_cache_dir, os.W_OK):
self.logger.info(f"Using system cache directory: {system_cache_dir}")
return system_cache_dir
except (OSError, IOError, PermissionError):
# Permission errors are expected when running as non-root
self.logger.debug(f"Could not create system cache directory (permission denied): {system_cache_dir}")
except (OSError, IOError, PermissionError) as e:
# Permission errors are expected when running as non-root, log at DEBUG level
self.logger.debug(f"System cache directory not available: {e}")
# Attempt 2: User's home directory (handling sudo), but avoid /root preference
try:
real_user = os.environ.get('SUDO_USER') or os.environ.get('USER', 'default')
if real_user and real_user != 'root':
home_dir = os.path.expanduser(f"~{real_user}")
else:
# When running as root and /var/cache/ledmatrix failed, still allow fallback to /root
home_dir = os.path.expanduser('~')
user_cache_dir = os.path.join(home_dir, '.ledmatrix_cache')
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(user_cache_dir), get_cache_dir_mode())
test_file = os.path.join(user_cache_dir, '.writetest')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.logger.info(f"Using user cache directory: {user_cache_dir}")
return user_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use user-specific cache directory: {e}")
# Attempt 3: /opt/ledmatrix/cache (alternative persistent location)
try:
opt_cache_dir = '/opt/ledmatrix/cache'
# Check if directory exists and we can write to it
if os.path.exists(opt_cache_dir):
# Test if we can write to the existing directory
test_file = os.path.join(opt_cache_dir, '.writetest')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
return opt_cache_dir
except (IOError, OSError):
self.logger.warning(f"Directory exists but is not writable: {opt_cache_dir}")
else:
# Try to create the directory
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(opt_cache_dir), get_cache_dir_mode())
if os.access(opt_cache_dir, os.W_OK):
return opt_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use /opt/ledmatrix/cache: {e}", exc_info=True)
# Attempt 4: System-wide temporary directory (fallback, not persistent)
try:
temp_cache_dir = os.path.join(tempfile.gettempdir(), 'ledmatrix_cache')
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(temp_cache_dir), get_cache_dir_mode())
if os.access(temp_cache_dir, os.W_OK):
self.logger.warning("Using temporary cache directory - cache will NOT persist across restarts")
return temp_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use system-wide temporary cache directory: {e}", exc_info=True)
# Return None if no directory is writable
return None
def _cleanup_memory_cache(self, force: bool = False) -> int:
"""
Clean up expired entries from memory cache and enforce size limits.
Args:
force: If True, perform cleanup regardless of time interval
Returns:
Number of entries removed
"""
now = time.time()
# Check if cleanup is needed
if not force and (now - self._last_memory_cache_cleanup) < self._memory_cache_cleanup_interval:
return 0
with self._cache_lock:
removed_count = 0
current_time = time.time()
# Remove expired entries (entries older than 1 hour without access are considered expired)
# We use a conservative TTL of 1 hour for cleanup
max_age_for_cleanup = 3600 # 1 hour
expired_keys = []
for key, timestamp in list(self._memory_cache_timestamps.items()):
if isinstance(timestamp, str):
try:
timestamp = float(timestamp)
except ValueError:
timestamp = None
if timestamp is None or (current_time - timestamp) > max_age_for_cleanup:
expired_keys.append(key)
# Remove expired entries
for key in expired_keys:
self._memory_cache.pop(key, None)
self._memory_cache_timestamps.pop(key, None)
removed_count += 1
# Enforce size limit by removing oldest entries if cache is too large
if len(self._memory_cache) > self._max_memory_cache_size:
# Sort by timestamp (oldest first)
sorted_entries = sorted(
self._memory_cache_timestamps.items(),
key=lambda x: float(x[1]) if isinstance(x[1], (int, float)) else 0
)
# Remove oldest entries until we're under the limit
excess_count = len(self._memory_cache) - self._max_memory_cache_size
for i in range(excess_count):
if i < len(sorted_entries):
key = sorted_entries[i][0]
self._memory_cache.pop(key, None)
self._memory_cache_timestamps.pop(key, None)
removed_count += 1
self._last_memory_cache_cleanup = current_time
if removed_count > 0:
self.logger.debug(f"Memory cache cleanup: removed {removed_count} entries (current size: {len(self._memory_cache)})")
return removed_count
def _get_cache_path(self, key: str) -> Optional[str]:
"""Get the path for a cache file."""
return self._disk_cache_component.get_cache_path(key)
def get_cached_data(self, key: str, max_age: int = 300, memory_ttl: Optional[int] = None) -> Optional[Dict[str, Any]]:
"""Get data from cache (memory first, then disk) honoring TTLs.
- memory_ttl: TTL for in-memory entry; defaults to max_age if not provided
- max_age: TTL for persisted (on-disk) entry based on the stored timestamp
"""
# Periodic cleanup of memory cache
self._cleanup_memory_cache()
in_memory_ttl = memory_ttl if memory_ttl is not None else max_age
# 1) Memory cache
cached = self._memory_cache_component.get(key, max_age=in_memory_ttl)
if cached is not None:
return cached
# 2) Disk cache
record = self._disk_cache_component.get(key, max_age=max_age)
if record is not None:
# Hydrate memory cache (use current time to start memory TTL window)
self._memory_cache_component.set(key, record)
return record
# 3) Miss
return None
def save_cache(self, key: str, data: Dict[str, Any]) -> None:
"""
Save data to cache.
Args:
key: Cache key
data: Data to cache
"""
# Periodic cleanup before adding new entries
self._cleanup_memory_cache()
# Update memory cache first
self._memory_cache_component.set(key, data)
# Save to disk cache
try:
self._disk_cache_component.set(key, data)
except CacheError:
# Disk cache errors are already logged and raised by DiskCache
raise
def load_cache(self, key: str) -> Optional[Dict[str, Any]]:
"""Load data from cache with memory caching."""
# Check memory cache first (1 minute TTL)
cached = self._memory_cache_component.get(key, max_age=60)
if cached is not None:
return cached
# Check disk cache
data = self._disk_cache_component.get(key, max_age=3600) # 1 hour for load_cache
if data is not None:
# Update memory cache
self._memory_cache_component.set(key, data)
return data
return None
def clear_cache(self, key: Optional[str] = None) -> None:
"""Clear cache entries.
Pass a non-empty ``key`` to remove a single entry, or pass
``None`` (the default) to clear every cached entry. An empty
string is rejected to prevent accidental whole-cache wipes
from callers that pass through unvalidated input.
"""
if key is None:
# Clear all keys
memory_count = self._memory_cache_component.size()
self._memory_cache_component.clear()
self._disk_cache_component.clear()
self.logger.info("Cleared all cache: %d memory entries", memory_count)
return
if not isinstance(key, str) or not key:
raise ValueError(
"clear_cache(key) requires a non-empty string; "
"pass key=None to clear all entries"
)
# Clear specific key
self._memory_cache_component.clear(key)
self._disk_cache_component.clear(key)
self.logger.info("Cleared cache for key: %s", key)
def delete(self, key: str) -> None:
"""Remove a single cache entry.
Thin wrapper around :meth:`clear_cache` that **requires** a
non-empty string key — unlike ``clear_cache(None)`` it never
wipes every entry. Raises ``ValueError`` on ``None`` or an
empty string.
"""
if key is None or not isinstance(key, str) or not key:
raise ValueError("delete(key) requires a non-empty string key")
self.clear_cache(key)
def list_cache_files(self) -> List[Dict[str, Any]]:
"""List all cache files with metadata (key, age, size, path).
Returns:
List of dicts with keys: 'key', 'filename', 'age_seconds', 'age_display',
'size_bytes', 'size_display', 'path', 'modified_time'
"""
if not self.cache_dir or not os.path.exists(self.cache_dir):
return []
cache_files = []
current_time = time.time()
try:
with self._cache_lock:
for filename in os.listdir(self.cache_dir):
if not filename.endswith('.json'):
continue
# Extract key from filename (remove .json extension)
key = filename[:-5] # Remove '.json'
file_path = os.path.join(self.cache_dir, filename)
try:
# Get file stats
stat_info = os.stat(file_path)
size_bytes = stat_info.st_size
modified_time = stat_info.st_mtime
age_seconds = current_time - modified_time
# Format age display
if age_seconds < 60:
age_display = f"{int(age_seconds)}s"
elif age_seconds < 3600:
age_display = f"{int(age_seconds / 60)}m"
elif age_seconds < 86400:
age_display = f"{int(age_seconds / 3600)}h"
else:
age_display = f"{int(age_seconds / 86400)}d"
# Format size display
if size_bytes < 1024:
size_display = f"{size_bytes}B"
elif size_bytes < 1024 * 1024:
size_display = f"{size_bytes / 1024:.1f}KB"
else:
size_display = f"{size_bytes / (1024 * 1024):.1f}MB"
cache_files.append({
'key': key,
'filename': filename,
'age_seconds': age_seconds,
'age_display': age_display,
'size_bytes': size_bytes,
'size_display': size_display,
'path': file_path,
'modified_time': modified_time,
'modified_datetime': datetime.fromtimestamp(modified_time).isoformat()
})
except OSError as e:
self.logger.warning(f"Error getting stats for cache file {filename} at {file_path}: {e}", exc_info=True)
continue
except OSError as e:
self.logger.error(f"Error listing cache directory {self.cache_dir}: {e}", exc_info=True)
return []
# Sort by modified time (newest first)
cache_files.sort(key=lambda x: x['modified_time'], reverse=True)
return cache_files
def get_cache_dir(self) -> Optional[str]:
"""Get the cache directory path."""
return self.cache_dir
def has_data_changed(self, data_type: str, new_data: Dict[str, Any]) -> bool:
"""Check if data has changed from cached version."""
cached_data = self.load_cache(data_type)
if not cached_data:
return True
if data_type == 'weather':
return self._has_weather_changed(cached_data, new_data)
elif data_type == 'stocks':
return self._has_stocks_changed(cached_data, new_data)
elif data_type == 'stock_news':
return self._has_news_changed(cached_data, new_data)
elif data_type == 'nhl':
return self._has_nhl_changed(cached_data, new_data)
elif data_type == 'mlb':
return self._has_mlb_changed(cached_data, new_data)
return True
def _has_weather_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if weather data has changed."""
# Handle new cache structure where data is nested under 'data' key
if 'data' in cached:
cached = cached['data']
# Handle case where cached data might be the weather data directly
if 'current' in cached:
# This is the new structure with 'current' and 'forecast' keys
current_weather = cached.get('current', {})
if current_weather and 'main' in current_weather and 'weather' in current_weather:
cached_temp = round(current_weather['main']['temp'])
cached_condition = current_weather['weather'][0]['main']
return (cached_temp != new.get('temp') or
cached_condition != new.get('condition'))
# Handle old structure where temp and condition are directly accessible
return (cached.get('temp') != new.get('temp') or
cached.get('condition') != new.get('condition'))
def _has_stocks_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if stock data has changed."""
if not self._is_market_open():
return False
return cached.get('price') != new.get('price')
def _has_news_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if news data has changed."""
# Handle both dictionary and list formats
if isinstance(new, list):
# If new data is a list, cached data should also be a list
if not isinstance(cached, list):
return True
# Compare lengths and content
if len(cached) != len(new):
return True
# Compare titles since they're unique enough for our purposes
cached_titles = set(item.get('title', '') for item in cached)
new_titles = set(item.get('title', '') for item in new)
return cached_titles != new_titles
else:
# Original dictionary format handling
cached_headlines = set(h.get('id') for h in cached.get('headlines', []))
new_headlines = set(h.get('id') for h in new.get('headlines', []))
return not cached_headlines.issuperset(new_headlines)
def _has_nhl_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if NHL data has changed."""
return (cached.get('game_status') != new.get('game_status') or
cached.get('score') != new.get('score'))
def _has_mlb_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if MLB game data has changed."""
if not cached or not new:
return True
# Check if any games have changed status or score
for game_id, new_game in new.items():
cached_game = cached.get(game_id)
if not cached_game:
return True
# Check for score changes
if (new_game['away_score'] != cached_game['away_score'] or
new_game['home_score'] != cached_game['home_score']):
return True
# Check for status changes
if new_game['status'] != cached_game['status']:
return True
# For live games, check inning and count
if new_game['status'] == 'in':
if (new_game['inning'] != cached_game['inning'] or
new_game['inning_half'] != cached_game['inning_half'] or
new_game['balls'] != cached_game['balls'] or
new_game['strikes'] != cached_game['strikes'] or
new_game['bases_occupied'] != cached_game['bases_occupied']):
return True
return False
def _is_market_open(self) -> bool:
"""Check if the US stock market is currently open."""
return self._strategy_component.is_market_open()
def update_cache(self, data_type: str, data: Dict[str, Any]) -> bool:
"""Update cache with new data."""
cache_data = {
'data': data,
'timestamp': time.time()
}
return self.save_cache(data_type, cache_data)
def get(self, key: str, max_age: int = 300) -> Optional[Dict[str, Any]]:
"""Get data from cache if it exists and is not stale."""
cached_data = self.get_cached_data(key, max_age)
if cached_data and 'data' in cached_data:
return cached_data['data']
return cached_data
def set(self, key: str, data: Dict[str, Any], ttl: Optional[int] = None) -> None:
"""
Store data in cache with current timestamp.
Args:
key: Cache key
data: Data to cache
ttl: Optional time-to-live in seconds (stored for compatibility but
expiration is still controlled via max_age when reading)
"""
cache_data = {
'data': data,
'timestamp': time.time()
}
if ttl is not None:
cache_data['ttl'] = ttl
self.save_cache(key, cache_data)
def setup_persistent_cache(self) -> bool:
"""
Set up a persistent cache directory with proper permissions.
This should be run once with sudo to create the directory.
"""
try:
# Try to create /var/cache/ledmatrix with proper permissions
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
cache_dir = '/var/cache/ledmatrix'
cache_dir_path = Path(cache_dir)
ensure_directory_permissions(cache_dir_path, get_cache_dir_mode())
# Set ownership to the real user (not root)
real_user = os.environ.get('SUDO_USER')
if real_user:
import pwd
try:
uid = pwd.getpwnam(real_user).pw_uid
gid = pwd.getpwnam(real_user).pw_gid
os.chown(cache_dir, uid, gid)
self.logger.info(f"Set ownership of {cache_dir} to {real_user}")
except (OSError, KeyError) as e:
self.logger.warning(f"Could not set ownership for {cache_dir}: {e}", exc_info=True)
self.logger.info(f"Successfully set up persistent cache directory: {cache_dir}")
return True
except (OSError, IOError, PermissionError) as e:
self.logger.error(f"Failed to set up persistent cache directory {cache_dir}: {e}", exc_info=True)
return False
def cleanup_disk_cache(self, force: bool = False) -> Dict[str, Any]:
"""
Clean up expired disk cache files based on retention policies.
Args:
force: If True, run cleanup regardless of last cleanup time
Returns:
Dictionary with cleanup statistics
"""
now = time.time()
# Check if cleanup is needed (throttle to prevent too-frequent cleanups)
if not force and (now - self._last_disk_cleanup) < self._disk_cleanup_interval:
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 0,
'duration_sec': 0.0
}
start_time = time.time()
try:
# Perform cleanup
stats = self._disk_cache_component.cleanup_expired_files(
cache_strategy=self._strategy_component,
retention_policies=self._retention_policies
)
duration = time.time() - start_time
space_freed_mb = stats['space_freed_bytes'] / (1024 * 1024)
# Record metrics
self._metrics_component.record_disk_cleanup(
files_cleaned=stats['files_deleted'],
space_freed_mb=space_freed_mb,
duration_sec=duration
)
# Log summary
if stats['files_deleted'] > 0:
self.logger.info(
"Disk cache cleanup completed: %d/%d files deleted, %.2f MB freed, %d errors, took %.2fs",
stats['files_deleted'], stats['files_scanned'], space_freed_mb,
stats['errors'], duration
)
else:
self.logger.debug(
"Disk cache cleanup completed: no files to delete (%d files scanned)",
stats['files_scanned']
)
# Update last cleanup time
self._last_disk_cleanup = time.time()
return {
'files_scanned': stats['files_scanned'],
'files_deleted': stats['files_deleted'],
'space_freed_mb': space_freed_mb,
'errors': stats['errors'],
'duration_sec': duration
}
except Exception as e:
self.logger.error("Error during disk cache cleanup: %s", e, exc_info=True)
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 1,
'duration_sec': time.time() - start_time
}
def start_cleanup_thread(self) -> None:
"""Start background thread for periodic disk cache cleanup."""
if self._cleanup_thread and self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread already running")
return
def cleanup_loop():
"""Background loop that runs cleanup periodically."""
self.logger.info("Disk cache cleanup thread started (interval: %d hours)",
self._disk_cleanup_interval_hours)
# Run initial cleanup on startup (deferred from __init__ to avoid blocking)
try:
self.logger.debug("Running initial disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in initial cleanup: %s", e, exc_info=True)
# Main cleanup loop
while not self._cleanup_stop_event.is_set():
try:
# Sleep for the configured interval (interruptible)
sleep_seconds = self._disk_cleanup_interval_hours * 3600
if self._cleanup_stop_event.wait(timeout=sleep_seconds):
# Event was set, exit loop
break
# Run cleanup if not stopped
if not self._cleanup_stop_event.is_set():
self.logger.debug("Running scheduled disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in cleanup thread: %s", e, exc_info=True)
# Continue running despite errors, but use interruptible sleep
if self._cleanup_stop_event.wait(timeout=60):
# Event was set during error recovery sleep, exit loop
break
self.logger.info("Disk cache cleanup thread stopped")
self._cleanup_stop_event.clear() # Reset event before starting thread
self._cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="DiskCacheCleanup")
self._cleanup_thread.start()
self.logger.info("Started disk cache cleanup background thread")
def stop_cleanup_thread(self) -> None:
"""
Stop the background cleanup thread gracefully.
Signals the thread to stop and waits for it to finish (with timeout).
This allows for clean shutdown during testing or application termination.
"""
if not self._cleanup_thread or not self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread not running")
return
self.logger.info("Stopping disk cache cleanup thread...")
self._cleanup_stop_event.set() # Signal thread to stop
# Wait for thread to finish (with timeout to avoid hanging)
self._cleanup_thread.join(timeout=5.0)
if self._cleanup_thread.is_alive():
self.logger.warning("Cleanup thread did not stop within timeout, thread may still be running")
else:
self.logger.info("Disk cache cleanup thread stopped successfully")
def get_sport_live_interval(self, sport_key: str) -> int:
"""
Get the live_update_interval for a specific sport from config.
Falls back to default values if config is not available.
"""
return self._strategy_component.get_sport_live_interval(sport_key)
def get_cache_strategy(self, data_type: str, sport_key: Optional[str] = None) -> Dict[str, Any]:
"""
Get cache strategy for different data types.
Now respects sport-specific live_update_interval configurations.
"""
return self._strategy_component.get_cache_strategy(data_type, sport_key)
def get_data_type_from_key(self, key: str) -> str:
"""
Determine the appropriate cache strategy based on the cache key.
This helps automatically select the right cache duration.
"""
return self._strategy_component.get_data_type_from_key(key)
def get_sport_key_from_cache_key(self, key: str) -> Optional[str]:
"""
Extract sport key from cache key to determine appropriate live_update_interval.
"""
return self._strategy_component.get_sport_key_from_cache_key(key)
def get_cached_data_with_strategy(self, key: str, data_type: str = 'default') -> Optional[Dict[str, Any]]:
"""
Get data from cache using data-type-specific strategy.
Now respects sport-specific live_update_interval configurations.
"""
# Extract sport key for live sports data
sport_key = None
if data_type in ['sports_live', 'live_scores']:
sport_key = self._strategy_component.get_sport_key_from_cache_key(key)
strategy = self._strategy_component.get_cache_strategy(data_type, sport_key)
max_age = strategy['max_age']
memory_ttl = strategy.get('memory_ttl', max_age)
# For market data, check if market is open
if strategy.get('market_hours_only', False) and not self._strategy_component.is_market_open():
# During off-hours, extend cache duration
max_age *= 4 # 4x longer cache during off-hours
record = self.get_cached_data(key, max_age, memory_ttl)
# Unwrap if stored in { 'data': ..., 'timestamp': ... }
if isinstance(record, dict) and 'data' in record:
return record['data']
return record
def get_with_auto_strategy(self, key: str) -> Optional[Dict[str, Any]]:
"""
Get cached data using automatically determined strategy.
Now respects sport-specific live_update_interval configurations.
"""
data_type = self.get_data_type_from_key(key)
return self.get_cached_data_with_strategy(key, data_type)
def get_background_cached_data(self, key: str, sport_key: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Get data from background service cache with appropriate strategy.
This method is specifically designed for Recent/Upcoming managers
to use data cached by the background service.
Args:
key: Cache key to retrieve
sport_key: Sport key for determining appropriate cache strategy
Returns:
Cached data if available and fresh, None otherwise
"""
# Determine the appropriate cache strategy
data_type = self.get_data_type_from_key(key)
strategy = self.get_cache_strategy(data_type, sport_key)
# For Recent/Upcoming managers, we want to use the background service cache
# which should have longer TTLs than the individual manager caches
max_age = strategy['max_age']
memory_ttl = strategy.get('memory_ttl', max_age)
# Get the cached data
cached_data = self.get_cached_data(key, max_age, memory_ttl)
if cached_data:
# Record cache hit for performance monitoring
self.record_cache_hit('background')
# Unwrap if stored in { 'data': ..., 'timestamp': ... } format
if isinstance(cached_data, dict) and 'data' in cached_data:
return cached_data['data']
return cached_data
# Record cache miss for performance monitoring
self.record_cache_miss('background')
return None
def is_background_data_available(self, key: str, sport_key: Optional[str] = None) -> bool:
"""
Check if background service has fresh data available.
This helps Recent/Upcoming managers determine if they should
wait for background data or fetch immediately.
"""
data_type = self.get_data_type_from_key(key)
strategy = self.get_cache_strategy(data_type, sport_key)
# Check if we have data that's still fresh according to background service TTL
cached_data = self.get_cached_data(key, strategy['max_age'])
return cached_data is not None
def generate_sport_cache_key(self, sport: str, date_str: Optional[str] = None) -> str:
"""
Centralized cache key generation for sports data.
This ensures consistent cache keys across background service and managers.
Args:
sport: Sport identifier (e.g., 'nba', 'nfl', 'ncaa_fb')
date_str: Date string in YYYYMMDD format. If None, uses current UTC date.
Returns:
Cache key in format: {sport}_{date}
"""
if date_str is None:
date_str = datetime.now(pytz.utc).strftime('%Y%m%d')
return f"{sport}_{date_str}"
def record_cache_hit(self, cache_type: str = 'regular') -> None:
"""Record a cache hit for performance monitoring."""
self._metrics_component.record_hit(cache_type)
def record_cache_miss(self, cache_type: str = 'regular') -> None:
"""Record a cache miss for performance monitoring."""
self._metrics_component.record_miss(cache_type)
def record_fetch_time(self, duration: float) -> None:
"""Record fetch operation duration for performance monitoring."""
self._metrics_component.record_fetch_time(duration)
def get_cache_metrics(self) -> Dict[str, Any]:
"""Get current cache performance metrics."""
return self._metrics_component.get_metrics()
def log_cache_metrics(self) -> None:
"""Log current cache performance metrics."""
self._metrics_component.log_metrics()
def get_memory_cache_stats(self) -> Dict[str, Any]:
"""
Get statistics about the memory cache.
Returns:
Dictionary with memory cache statistics
"""
with self._cache_lock:
return {
'size': len(self._memory_cache),
'max_size': self._max_memory_cache_size,
'usage_percent': (len(self._memory_cache) / self._max_memory_cache_size * 100) if self._max_memory_cache_size > 0 else 0,
'last_cleanup': self._last_memory_cache_cleanup,
'cleanup_interval': self._memory_cache_cleanup_interval
}
def log_memory_cache_stats(self) -> None:
"""Log current memory cache statistics."""
stats = self.get_memory_cache_stats()
self.logger.info(f"Memory Cache - Size: {stats['size']}/{stats['max_size']} "
f"({stats['usage_percent']:.1f}%), "
f"Last cleanup: {time.time() - stats['last_cleanup']:.1f}s ago")