Files
LEDMatrix/src/cache_manager.py
Chuck 781224591f fix: post-audit follow-up code fixes (cache, fonts, icons, dev script) (#307)
* fix: post-audit follow-up code fixes (cache, fonts, icons, dev script, CI)

The docs refresh effort (#306, ledmatrix-plugins#92) surfaced seven
code bugs that were intentionally left out of the docs PRs because
they required code changes rather than doc fixes. This PR addresses
the six that belong in LEDMatrix (the seventh — a lacrosse-scoreboard
mode rename — lives in the plugins repo).

Bug 1: cache_manager.delete() AttributeError
  src/common/api_helper.py:287 and
  src/plugin_system/resource_monitor.py:343 both call
  cache_manager.delete(key), which doesn't exist — only
  clear_cache(key=None). Added a delete() alias method on
  CacheManager that forwards to clear_cache(key). Reverts the
  "There is no delete() method" wording in DEVELOPER_QUICK_REFERENCE,
  .cursorrules so the docs match the new shim.

Bug 2: dev_plugin_setup.sh PROJECT_ROOT resolution
  scripts/dev/dev_plugin_setup.sh:9 set PROJECT_ROOT to SCRIPT_DIR
  instead of walking up two levels to the repo root, so PLUGINS_DIR
  resolved to scripts/dev/plugins/ and created symlinks under the
  script's own directory. Fixed the path and removed the stray
  scripts/dev/plugins/of-the-day symlink left by earlier runs.

Bug 3: plugin custom icons regressed from v2 to v3
  web_interface/blueprints/api_v3.py built the /plugins/installed
  response without including the manifest's "icon" field, and
  web_interface/templates/v3/base.html hardcoded
  fas fa-puzzle-piece in all three plugin-tab render sites. Pass
  the icon through the API and read it from the templates with a
  puzzle-piece fallback. Reverts the "currently broken" banners in
  docs/PLUGIN_CUSTOM_ICONS.md and docs/PLUGIN_CUSTOM_ICONS_FEATURE.md.

Bug 4: register_plugin_fonts was never wired up
  src/font_manager.py:150 defines register_plugin_fonts(plugin_id,
  font_manifest) but nothing called it, so plugin manifests with a
  "fonts" block were silently no-ops. Wired the call into
  PluginManager.load_plugin() right after plugin_loader.load_plugin
  returns. Reverts the "not currently wired" warning in
  docs/FONT_MANAGER.md's "For Plugin Developers" section.

Bug 5: dead web_interface_v2 import pattern (LEDMatrix half)
  src/base_odds_manager.py had a try/except importing
  web_interface_v2.increment_api_counter, falling back to a no-op
  stub. The module doesn't exist anywhere in the v3 codebase and
  no API metrics dashboard reads it. Deleted the import block and
  the single call site; the plugins-repo half of this cleanup lands
  in ledmatrix-plugins#<next>.

Bug 7: no CI test workflow
  .github/workflows/ only contained security-audit.yml; pytest ran
  locally but was not gated on PRs. Added
  .github/workflows/tests.yml running pytest against Python 3.10,
  3.11, 3.12 in EMULATOR=true mode, skipping tests marked hardware
  or slow. Updated docs/HOW_TO_RUN_TESTS.md to reflect that the
  workflow now exists.

Verification done locally:
  - CacheManager.delete(key) round-trips with set/get
  - base_odds_manager imports without the v2 module present
  - dev_plugin_setup.sh PROJECT_ROOT resolves to repo root
  - api_v3 and plugin_manager compile clean
  - tests.yml YAML parses
  - Script syntax check on dev_plugin_setup.sh

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: address CodeRabbit review comments on #307

- src/cache_manager.py: clear_cache(key) treated empty string as
  "wipe all" because of `if key:`. Switched to `key is None`
  branching, made delete(key) and clear_cache(key) reject empty
  strings and None outright with ValueError, and updated both
  docstrings to make the contract explicit. Verified locally
  with a round-trip test that clear_cache() (no arg) still
  wipes everything but clear_cache("") and delete("") raise.

- src/plugin_system/plugin_manager.py: was reaching for the
  font manager via getattr(self.display_manager, 'font_manager',
  None). PluginManager already takes a dedicated font_manager
  parameter (line 54) and stores it as self.font_manager
  (line 69), so the old path was both wrong and could miss the
  font manager entirely when the host injects them separately.
  Switched to self.font_manager directly with the same try/except
  warning behavior.

- web_interface/templates/v3/base.html: in the full plugin-tab
  renderer, the icon was injected with
  `<i class="${escapeHtml(plugin.icon)}">` — but escapeHtml only
  escapes <, >, and &, not double quotes, so a manifest with a
  quote in its icon string could break out of the class
  attribute. Replaced the innerHTML template with createElement
  for the <i> tag, set className from plugin.icon directly
  (no string interpolation), and used a text node for the
  label. Same fix shape would also harden the two stub-renderer
  sites at line 515 / 774, but those already escape `"` to
  &quot; and CodeRabbit only flagged this site, so leaving them
  for now.

- docs/FONT_MANAGER.md: clarified that the Manual Font Overrides
  *workflow* (set_override / remove_override / font_overrides.json)
  is the supported override path today, and only the Fonts tab
  in the web UI is the placeholder. Previous wording conflated
  the two and made it sound like overrides themselves were
  broken.

- docs/HOW_TO_RUN_TESTS.md: replaced the vague "see the PR
  adding it" with a concrete link to #307 and a note that the
  workflow file itself is held back pending the workflow scope.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 09:25:12 -04:00

920 lines
40 KiB
Python

import json
import os
import time
from datetime import datetime
import pytz
from typing import Any, Dict, List, Optional
import logging
import threading
import tempfile
from pathlib import Path
from src.exceptions import CacheError
from src.cache.memory_cache import MemoryCache
from src.cache.disk_cache import DiskCache
from src.cache.cache_strategy import CacheStrategy
from src.cache.cache_metrics import CacheMetrics
from src.logging_config import get_logger
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
class CacheManager:
"""Manages caching of API responses to reduce API calls."""
def __init__(self) -> None:
# Initialize logger first
self.logger: logging.Logger = get_logger(__name__)
# Determine the most reliable writable directory
self.cache_dir: Optional[str] = self._get_writable_cache_dir()
if self.cache_dir:
self.logger.info(f"Using cache directory: {self.cache_dir}")
else:
# This is a critical failure, as caching is essential.
self.logger.error("Could not find or create a writable cache directory. Caching will be disabled.")
self.cache_dir = None
# Initialize config manager for sport-specific intervals
try:
from src.config_manager import ConfigManager
self.config_manager: Optional[Any] = ConfigManager()
self.config_manager.load_config()
except ImportError:
self.config_manager: Optional[Any] = None
self.logger.warning("ConfigManager not available, using default cache intervals")
# Initialize cache components using composition
self._memory_cache_component = MemoryCache(max_size=1000, cleanup_interval=300.0)
self._disk_cache_component = DiskCache(cache_dir=self.cache_dir, logger=self.logger)
self._strategy_component = CacheStrategy(config_manager=self.config_manager, logger=self.logger)
self._metrics_component = CacheMetrics(logger=self.logger)
# Keep old attributes for backward compatibility (delegated to components)
self._memory_cache = self._memory_cache_component._cache
self._memory_cache_timestamps = self._memory_cache_component._timestamps
self._cache_lock = self._memory_cache_component._lock
self._max_memory_cache_size = self._memory_cache_component._max_size
self._memory_cache_cleanup_interval = self._memory_cache_component._cleanup_interval
self._last_memory_cache_cleanup = self._memory_cache_component._last_cleanup
# Disk cleanup configuration
self._disk_cleanup_interval_hours = 24 # Run cleanup every 24 hours
self._disk_cleanup_interval = 3600.0 # Minimum interval between cleanups (1 hour) for throttle
self._last_disk_cleanup = 0.0 # Timestamp of last disk cleanup
self._cleanup_thread: Optional[threading.Thread] = None
self._cleanup_stop_event = threading.Event() # Event to signal thread shutdown
self._retention_policies = {
'odds': 2, # Odds data: 2 days (lines move frequently)
'odds_live': 2, # Live odds: 2 days
'sports_live': 7, # Live sports: 7 days
'weather_current': 7, # Current weather: 7 days
'sports_recent': 7, # Recent games: 7 days
'news': 14, # News: 14 days
'sports_upcoming': 60, # Upcoming games: 60 days (schedules stable)
'sports_schedules': 60, # Schedules: 60 days
'team_info': 60, # Team info: 60 days
'stocks': 14, # Stock data: 14 days
'crypto': 14, # Crypto data: 14 days
'default': 30 # Default: 30 days
}
# Start background cleanup thread only if disk caching is enabled
if self.cache_dir:
self.start_cleanup_thread()
def _get_writable_cache_dir(self) -> Optional[str]:
"""Tries to find or create a writable cache directory, preferring a system path when available."""
# Attempt 1: System-wide persistent cache directory (preferred for services)
try:
system_cache_dir = '/var/cache/ledmatrix'
if os.path.exists(system_cache_dir):
test_file = os.path.join(system_cache_dir, '.writetest')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.logger.info(f"Using system cache directory: {system_cache_dir}")
return system_cache_dir
except (IOError, OSError):
self.logger.debug(f"System cache directory exists but is not writable: {system_cache_dir}")
else:
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
try:
ensure_directory_permissions(Path(system_cache_dir), get_cache_dir_mode())
if os.access(system_cache_dir, os.W_OK):
self.logger.info(f"Using system cache directory: {system_cache_dir}")
return system_cache_dir
except (OSError, IOError, PermissionError) as perm_error:
# Permission errors are expected when running as non-root
self.logger.debug(f"Could not create system cache directory (permission denied): {system_cache_dir}")
except (OSError, IOError, PermissionError) as e:
# Permission errors are expected when running as non-root, log at DEBUG level
self.logger.debug(f"System cache directory not available: {e}")
# Attempt 2: User's home directory (handling sudo), but avoid /root preference
try:
real_user = os.environ.get('SUDO_USER') or os.environ.get('USER', 'default')
if real_user and real_user != 'root':
home_dir = os.path.expanduser(f"~{real_user}")
else:
# When running as root and /var/cache/ledmatrix failed, still allow fallback to /root
home_dir = os.path.expanduser('~')
user_cache_dir = os.path.join(home_dir, '.ledmatrix_cache')
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(user_cache_dir), get_cache_dir_mode())
test_file = os.path.join(user_cache_dir, '.writetest')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.logger.info(f"Using user cache directory: {user_cache_dir}")
return user_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use user-specific cache directory: {e}")
# Attempt 3: /opt/ledmatrix/cache (alternative persistent location)
try:
opt_cache_dir = '/opt/ledmatrix/cache'
# Check if directory exists and we can write to it
if os.path.exists(opt_cache_dir):
# Test if we can write to the existing directory
test_file = os.path.join(opt_cache_dir, '.writetest')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
return opt_cache_dir
except (IOError, OSError):
self.logger.warning(f"Directory exists but is not writable: {opt_cache_dir}")
else:
# Try to create the directory
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(opt_cache_dir), get_cache_dir_mode())
if os.access(opt_cache_dir, os.W_OK):
return opt_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use /opt/ledmatrix/cache: {e}", exc_info=True)
# Attempt 4: System-wide temporary directory (fallback, not persistent)
try:
temp_cache_dir = os.path.join(tempfile.gettempdir(), 'ledmatrix_cache')
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
ensure_directory_permissions(Path(temp_cache_dir), get_cache_dir_mode())
if os.access(temp_cache_dir, os.W_OK):
self.logger.warning("Using temporary cache directory - cache will NOT persist across restarts")
return temp_cache_dir
except (OSError, IOError, PermissionError) as e:
self.logger.warning(f"Could not use system-wide temporary cache directory: {e}", exc_info=True)
# Return None if no directory is writable
return None
def _cleanup_memory_cache(self, force: bool = False) -> int:
"""
Clean up expired entries from memory cache and enforce size limits.
Args:
force: If True, perform cleanup regardless of time interval
Returns:
Number of entries removed
"""
now = time.time()
# Check if cleanup is needed
if not force and (now - self._last_memory_cache_cleanup) < self._memory_cache_cleanup_interval:
return 0
with self._cache_lock:
removed_count = 0
current_time = time.time()
# Remove expired entries (entries older than 1 hour without access are considered expired)
# We use a conservative TTL of 1 hour for cleanup
max_age_for_cleanup = 3600 # 1 hour
expired_keys = []
for key, timestamp in list(self._memory_cache_timestamps.items()):
if isinstance(timestamp, str):
try:
timestamp = float(timestamp)
except ValueError:
timestamp = None
if timestamp is None or (current_time - timestamp) > max_age_for_cleanup:
expired_keys.append(key)
# Remove expired entries
for key in expired_keys:
self._memory_cache.pop(key, None)
self._memory_cache_timestamps.pop(key, None)
removed_count += 1
# Enforce size limit by removing oldest entries if cache is too large
if len(self._memory_cache) > self._max_memory_cache_size:
# Sort by timestamp (oldest first)
sorted_entries = sorted(
self._memory_cache_timestamps.items(),
key=lambda x: float(x[1]) if isinstance(x[1], (int, float)) else 0
)
# Remove oldest entries until we're under the limit
excess_count = len(self._memory_cache) - self._max_memory_cache_size
for i in range(excess_count):
if i < len(sorted_entries):
key = sorted_entries[i][0]
self._memory_cache.pop(key, None)
self._memory_cache_timestamps.pop(key, None)
removed_count += 1
self._last_memory_cache_cleanup = current_time
if removed_count > 0:
self.logger.debug(f"Memory cache cleanup: removed {removed_count} entries (current size: {len(self._memory_cache)})")
return removed_count
def _get_cache_path(self, key: str) -> Optional[str]:
"""Get the path for a cache file."""
return self._disk_cache_component.get_cache_path(key)
def get_cached_data(self, key: str, max_age: int = 300, memory_ttl: Optional[int] = None) -> Optional[Dict[str, Any]]:
"""Get data from cache (memory first, then disk) honoring TTLs.
- memory_ttl: TTL for in-memory entry; defaults to max_age if not provided
- max_age: TTL for persisted (on-disk) entry based on the stored timestamp
"""
# Periodic cleanup of memory cache
self._cleanup_memory_cache()
in_memory_ttl = memory_ttl if memory_ttl is not None else max_age
# 1) Memory cache
cached = self._memory_cache_component.get(key, max_age=in_memory_ttl)
if cached is not None:
return cached
# 2) Disk cache
record = self._disk_cache_component.get(key, max_age=max_age)
if record is not None:
# Hydrate memory cache (use current time to start memory TTL window)
self._memory_cache_component.set(key, record)
return record
# 3) Miss
return None
def save_cache(self, key: str, data: Dict[str, Any]) -> None:
"""
Save data to cache.
Args:
key: Cache key
data: Data to cache
"""
# Periodic cleanup before adding new entries
self._cleanup_memory_cache()
# Update memory cache first
self._memory_cache_component.set(key, data)
# Save to disk cache
try:
self._disk_cache_component.set(key, data)
except CacheError:
# Disk cache errors are already logged and raised by DiskCache
raise
def load_cache(self, key: str) -> Optional[Dict[str, Any]]:
"""Load data from cache with memory caching."""
# Check memory cache first (1 minute TTL)
cached = self._memory_cache_component.get(key, max_age=60)
if cached is not None:
return cached
# Check disk cache
data = self._disk_cache_component.get(key, max_age=3600) # 1 hour for load_cache
if data is not None:
# Update memory cache
self._memory_cache_component.set(key, data)
return data
return None
def clear_cache(self, key: Optional[str] = None) -> None:
"""Clear cache entries.
Pass a non-empty ``key`` to remove a single entry, or pass
``None`` (the default) to clear every cached entry. An empty
string is rejected to prevent accidental whole-cache wipes
from callers that pass through unvalidated input.
"""
if key is None:
# Clear all keys
memory_count = self._memory_cache_component.size()
self._memory_cache_component.clear()
self._disk_cache_component.clear()
self.logger.info("Cleared all cache: %d memory entries", memory_count)
return
if not isinstance(key, str) or not key:
raise ValueError(
"clear_cache(key) requires a non-empty string; "
"pass key=None to clear all entries"
)
# Clear specific key
self._memory_cache_component.clear(key)
self._disk_cache_component.clear(key)
self.logger.info("Cleared cache for key: %s", key)
def delete(self, key: str) -> None:
"""Remove a single cache entry.
Thin wrapper around :meth:`clear_cache` that **requires** a
non-empty string key — unlike ``clear_cache(None)`` it never
wipes every entry. Raises ``ValueError`` on ``None`` or an
empty string.
"""
if key is None or not isinstance(key, str) or not key:
raise ValueError("delete(key) requires a non-empty string key")
self.clear_cache(key)
def list_cache_files(self) -> List[Dict[str, Any]]:
"""List all cache files with metadata (key, age, size, path).
Returns:
List of dicts with keys: 'key', 'filename', 'age_seconds', 'age_display',
'size_bytes', 'size_display', 'path', 'modified_time'
"""
if not self.cache_dir or not os.path.exists(self.cache_dir):
return []
cache_files = []
current_time = time.time()
try:
with self._cache_lock:
for filename in os.listdir(self.cache_dir):
if not filename.endswith('.json'):
continue
# Extract key from filename (remove .json extension)
key = filename[:-5] # Remove '.json'
file_path = os.path.join(self.cache_dir, filename)
try:
# Get file stats
stat_info = os.stat(file_path)
size_bytes = stat_info.st_size
modified_time = stat_info.st_mtime
age_seconds = current_time - modified_time
# Format age display
if age_seconds < 60:
age_display = f"{int(age_seconds)}s"
elif age_seconds < 3600:
age_display = f"{int(age_seconds / 60)}m"
elif age_seconds < 86400:
age_display = f"{int(age_seconds / 3600)}h"
else:
age_display = f"{int(age_seconds / 86400)}d"
# Format size display
if size_bytes < 1024:
size_display = f"{size_bytes}B"
elif size_bytes < 1024 * 1024:
size_display = f"{size_bytes / 1024:.1f}KB"
else:
size_display = f"{size_bytes / (1024 * 1024):.1f}MB"
cache_files.append({
'key': key,
'filename': filename,
'age_seconds': age_seconds,
'age_display': age_display,
'size_bytes': size_bytes,
'size_display': size_display,
'path': file_path,
'modified_time': modified_time,
'modified_datetime': datetime.fromtimestamp(modified_time).isoformat()
})
except OSError as e:
self.logger.warning(f"Error getting stats for cache file {filename} at {file_path}: {e}", exc_info=True)
continue
except OSError as e:
self.logger.error(f"Error listing cache directory {self.cache_dir}: {e}", exc_info=True)
return []
# Sort by modified time (newest first)
cache_files.sort(key=lambda x: x['modified_time'], reverse=True)
return cache_files
def get_cache_dir(self) -> Optional[str]:
"""Get the cache directory path."""
return self.cache_dir
def has_data_changed(self, data_type: str, new_data: Dict[str, Any]) -> bool:
"""Check if data has changed from cached version."""
cached_data = self.load_cache(data_type)
if not cached_data:
return True
if data_type == 'weather':
return self._has_weather_changed(cached_data, new_data)
elif data_type == 'stocks':
return self._has_stocks_changed(cached_data, new_data)
elif data_type == 'stock_news':
return self._has_news_changed(cached_data, new_data)
elif data_type == 'nhl':
return self._has_nhl_changed(cached_data, new_data)
elif data_type == 'mlb':
return self._has_mlb_changed(cached_data, new_data)
return True
def _has_weather_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if weather data has changed."""
# Handle new cache structure where data is nested under 'data' key
if 'data' in cached:
cached = cached['data']
# Handle case where cached data might be the weather data directly
if 'current' in cached:
# This is the new structure with 'current' and 'forecast' keys
current_weather = cached.get('current', {})
if current_weather and 'main' in current_weather and 'weather' in current_weather:
cached_temp = round(current_weather['main']['temp'])
cached_condition = current_weather['weather'][0]['main']
return (cached_temp != new.get('temp') or
cached_condition != new.get('condition'))
# Handle old structure where temp and condition are directly accessible
return (cached.get('temp') != new.get('temp') or
cached.get('condition') != new.get('condition'))
def _has_stocks_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if stock data has changed."""
if not self._is_market_open():
return False
return cached.get('price') != new.get('price')
def _has_news_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if news data has changed."""
# Handle both dictionary and list formats
if isinstance(new, list):
# If new data is a list, cached data should also be a list
if not isinstance(cached, list):
return True
# Compare lengths and content
if len(cached) != len(new):
return True
# Compare titles since they're unique enough for our purposes
cached_titles = set(item.get('title', '') for item in cached)
new_titles = set(item.get('title', '') for item in new)
return cached_titles != new_titles
else:
# Original dictionary format handling
cached_headlines = set(h.get('id') for h in cached.get('headlines', []))
new_headlines = set(h.get('id') for h in new.get('headlines', []))
return not cached_headlines.issuperset(new_headlines)
def _has_nhl_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if NHL data has changed."""
return (cached.get('game_status') != new.get('game_status') or
cached.get('score') != new.get('score'))
def _has_mlb_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
"""Check if MLB game data has changed."""
if not cached or not new:
return True
# Check if any games have changed status or score
for game_id, new_game in new.items():
cached_game = cached.get(game_id)
if not cached_game:
return True
# Check for score changes
if (new_game['away_score'] != cached_game['away_score'] or
new_game['home_score'] != cached_game['home_score']):
return True
# Check for status changes
if new_game['status'] != cached_game['status']:
return True
# For live games, check inning and count
if new_game['status'] == 'in':
if (new_game['inning'] != cached_game['inning'] or
new_game['inning_half'] != cached_game['inning_half'] or
new_game['balls'] != cached_game['balls'] or
new_game['strikes'] != cached_game['strikes'] or
new_game['bases_occupied'] != cached_game['bases_occupied']):
return True
return False
def _is_market_open(self) -> bool:
"""Check if the US stock market is currently open."""
return self._strategy_component.is_market_open()
def update_cache(self, data_type: str, data: Dict[str, Any]) -> bool:
"""Update cache with new data."""
cache_data = {
'data': data,
'timestamp': time.time()
}
return self.save_cache(data_type, cache_data)
def get(self, key: str, max_age: int = 300) -> Optional[Dict[str, Any]]:
"""Get data from cache if it exists and is not stale."""
cached_data = self.get_cached_data(key, max_age)
if cached_data and 'data' in cached_data:
return cached_data['data']
return cached_data
def set(self, key: str, data: Dict[str, Any], ttl: Optional[int] = None) -> None:
"""
Store data in cache with current timestamp.
Args:
key: Cache key
data: Data to cache
ttl: Optional time-to-live in seconds (stored for compatibility but
expiration is still controlled via max_age when reading)
"""
cache_data = {
'data': data,
'timestamp': time.time()
}
if ttl is not None:
cache_data['ttl'] = ttl
self.save_cache(key, cache_data)
def setup_persistent_cache(self) -> bool:
"""
Set up a persistent cache directory with proper permissions.
This should be run once with sudo to create the directory.
"""
try:
# Try to create /var/cache/ledmatrix with proper permissions
from pathlib import Path
from src.common.permission_utils import (
ensure_directory_permissions,
get_cache_dir_mode
)
cache_dir = '/var/cache/ledmatrix'
cache_dir_path = Path(cache_dir)
ensure_directory_permissions(cache_dir_path, get_cache_dir_mode())
# Set ownership to the real user (not root)
real_user = os.environ.get('SUDO_USER')
if real_user:
import pwd
try:
uid = pwd.getpwnam(real_user).pw_uid
gid = pwd.getpwnam(real_user).pw_gid
os.chown(cache_dir, uid, gid)
self.logger.info(f"Set ownership of {cache_dir} to {real_user}")
except (OSError, KeyError) as e:
self.logger.warning(f"Could not set ownership for {cache_dir}: {e}", exc_info=True)
self.logger.info(f"Successfully set up persistent cache directory: {cache_dir}")
return True
except (OSError, IOError, PermissionError) as e:
self.logger.error(f"Failed to set up persistent cache directory {cache_dir}: {e}", exc_info=True)
return False
def cleanup_disk_cache(self, force: bool = False) -> Dict[str, Any]:
"""
Clean up expired disk cache files based on retention policies.
Args:
force: If True, run cleanup regardless of last cleanup time
Returns:
Dictionary with cleanup statistics
"""
now = time.time()
# Check if cleanup is needed (throttle to prevent too-frequent cleanups)
if not force and (now - self._last_disk_cleanup) < self._disk_cleanup_interval:
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 0,
'duration_sec': 0.0
}
start_time = time.time()
try:
# Perform cleanup
stats = self._disk_cache_component.cleanup_expired_files(
cache_strategy=self._strategy_component,
retention_policies=self._retention_policies
)
duration = time.time() - start_time
space_freed_mb = stats['space_freed_bytes'] / (1024 * 1024)
# Record metrics
self._metrics_component.record_disk_cleanup(
files_cleaned=stats['files_deleted'],
space_freed_mb=space_freed_mb,
duration_sec=duration
)
# Log summary
if stats['files_deleted'] > 0:
self.logger.info(
"Disk cache cleanup completed: %d/%d files deleted, %.2f MB freed, %d errors, took %.2fs",
stats['files_deleted'], stats['files_scanned'], space_freed_mb,
stats['errors'], duration
)
else:
self.logger.debug(
"Disk cache cleanup completed: no files to delete (%d files scanned)",
stats['files_scanned']
)
# Update last cleanup time
self._last_disk_cleanup = time.time()
return {
'files_scanned': stats['files_scanned'],
'files_deleted': stats['files_deleted'],
'space_freed_mb': space_freed_mb,
'errors': stats['errors'],
'duration_sec': duration
}
except Exception as e:
self.logger.error("Error during disk cache cleanup: %s", e, exc_info=True)
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 1,
'duration_sec': time.time() - start_time
}
def start_cleanup_thread(self) -> None:
"""Start background thread for periodic disk cache cleanup."""
if self._cleanup_thread and self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread already running")
return
def cleanup_loop():
"""Background loop that runs cleanup periodically."""
self.logger.info("Disk cache cleanup thread started (interval: %d hours)",
self._disk_cleanup_interval_hours)
# Run initial cleanup on startup (deferred from __init__ to avoid blocking)
try:
self.logger.debug("Running initial disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in initial cleanup: %s", e, exc_info=True)
# Main cleanup loop
while not self._cleanup_stop_event.is_set():
try:
# Sleep for the configured interval (interruptible)
sleep_seconds = self._disk_cleanup_interval_hours * 3600
if self._cleanup_stop_event.wait(timeout=sleep_seconds):
# Event was set, exit loop
break
# Run cleanup if not stopped
if not self._cleanup_stop_event.is_set():
self.logger.debug("Running scheduled disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in cleanup thread: %s", e, exc_info=True)
# Continue running despite errors, but use interruptible sleep
if self._cleanup_stop_event.wait(timeout=60):
# Event was set during error recovery sleep, exit loop
break
self.logger.info("Disk cache cleanup thread stopped")
self._cleanup_stop_event.clear() # Reset event before starting thread
self._cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="DiskCacheCleanup")
self._cleanup_thread.start()
self.logger.info("Started disk cache cleanup background thread")
def stop_cleanup_thread(self) -> None:
"""
Stop the background cleanup thread gracefully.
Signals the thread to stop and waits for it to finish (with timeout).
This allows for clean shutdown during testing or application termination.
"""
if not self._cleanup_thread or not self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread not running")
return
self.logger.info("Stopping disk cache cleanup thread...")
self._cleanup_stop_event.set() # Signal thread to stop
# Wait for thread to finish (with timeout to avoid hanging)
self._cleanup_thread.join(timeout=5.0)
if self._cleanup_thread.is_alive():
self.logger.warning("Cleanup thread did not stop within timeout, thread may still be running")
else:
self.logger.info("Disk cache cleanup thread stopped successfully")
def get_sport_live_interval(self, sport_key: str) -> int:
"""
Get the live_update_interval for a specific sport from config.
Falls back to default values if config is not available.
"""
return self._strategy_component.get_sport_live_interval(sport_key)
def get_cache_strategy(self, data_type: str, sport_key: Optional[str] = None) -> Dict[str, Any]:
"""
Get cache strategy for different data types.
Now respects sport-specific live_update_interval configurations.
"""
return self._strategy_component.get_cache_strategy(data_type, sport_key)
def get_data_type_from_key(self, key: str) -> str:
"""
Determine the appropriate cache strategy based on the cache key.
This helps automatically select the right cache duration.
"""
return self._strategy_component.get_data_type_from_key(key)
def get_sport_key_from_cache_key(self, key: str) -> Optional[str]:
"""
Extract sport key from cache key to determine appropriate live_update_interval.
"""
return self._strategy_component.get_sport_key_from_cache_key(key)
def get_cached_data_with_strategy(self, key: str, data_type: str = 'default') -> Optional[Dict[str, Any]]:
"""
Get data from cache using data-type-specific strategy.
Now respects sport-specific live_update_interval configurations.
"""
# Extract sport key for live sports data
sport_key = None
if data_type in ['sports_live', 'live_scores']:
sport_key = self._strategy_component.get_sport_key_from_cache_key(key)
strategy = self._strategy_component.get_cache_strategy(data_type, sport_key)
max_age = strategy['max_age']
memory_ttl = strategy.get('memory_ttl', max_age)
# For market data, check if market is open
if strategy.get('market_hours_only', False) and not self._strategy_component.is_market_open():
# During off-hours, extend cache duration
max_age *= 4 # 4x longer cache during off-hours
record = self.get_cached_data(key, max_age, memory_ttl)
# Unwrap if stored in { 'data': ..., 'timestamp': ... }
if isinstance(record, dict) and 'data' in record:
return record['data']
return record
def get_with_auto_strategy(self, key: str) -> Optional[Dict[str, Any]]:
"""
Get cached data using automatically determined strategy.
Now respects sport-specific live_update_interval configurations.
"""
data_type = self.get_data_type_from_key(key)
return self.get_cached_data_with_strategy(key, data_type)
def get_background_cached_data(self, key: str, sport_key: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
Get data from background service cache with appropriate strategy.
This method is specifically designed for Recent/Upcoming managers
to use data cached by the background service.
Args:
key: Cache key to retrieve
sport_key: Sport key for determining appropriate cache strategy
Returns:
Cached data if available and fresh, None otherwise
"""
# Determine the appropriate cache strategy
data_type = self.get_data_type_from_key(key)
strategy = self.get_cache_strategy(data_type, sport_key)
# For Recent/Upcoming managers, we want to use the background service cache
# which should have longer TTLs than the individual manager caches
max_age = strategy['max_age']
memory_ttl = strategy.get('memory_ttl', max_age)
# Get the cached data
cached_data = self.get_cached_data(key, max_age, memory_ttl)
if cached_data:
# Record cache hit for performance monitoring
self.record_cache_hit('background')
# Unwrap if stored in { 'data': ..., 'timestamp': ... } format
if isinstance(cached_data, dict) and 'data' in cached_data:
return cached_data['data']
return cached_data
# Record cache miss for performance monitoring
self.record_cache_miss('background')
return None
def is_background_data_available(self, key: str, sport_key: Optional[str] = None) -> bool:
"""
Check if background service has fresh data available.
This helps Recent/Upcoming managers determine if they should
wait for background data or fetch immediately.
"""
data_type = self.get_data_type_from_key(key)
strategy = self.get_cache_strategy(data_type, sport_key)
# Check if we have data that's still fresh according to background service TTL
cached_data = self.get_cached_data(key, strategy['max_age'])
return cached_data is not None
def generate_sport_cache_key(self, sport: str, date_str: Optional[str] = None) -> str:
"""
Centralized cache key generation for sports data.
This ensures consistent cache keys across background service and managers.
Args:
sport: Sport identifier (e.g., 'nba', 'nfl', 'ncaa_fb')
date_str: Date string in YYYYMMDD format. If None, uses current UTC date.
Returns:
Cache key in format: {sport}_{date}
"""
if date_str is None:
date_str = datetime.now(pytz.utc).strftime('%Y%m%d')
return f"{sport}_{date_str}"
def record_cache_hit(self, cache_type: str = 'regular') -> None:
"""Record a cache hit for performance monitoring."""
self._metrics_component.record_hit(cache_type)
def record_cache_miss(self, cache_type: str = 'regular') -> None:
"""Record a cache miss for performance monitoring."""
self._metrics_component.record_miss(cache_type)
def record_fetch_time(self, duration: float) -> None:
"""Record fetch operation duration for performance monitoring."""
self._metrics_component.record_fetch_time(duration)
def get_cache_metrics(self) -> Dict[str, Any]:
"""Get current cache performance metrics."""
return self._metrics_component.get_metrics()
def log_cache_metrics(self) -> None:
"""Log current cache performance metrics."""
self._metrics_component.log_metrics()
def get_memory_cache_stats(self) -> Dict[str, Any]:
"""
Get statistics about the memory cache.
Returns:
Dictionary with memory cache statistics
"""
with self._cache_lock:
return {
'size': len(self._memory_cache),
'max_size': self._max_memory_cache_size,
'usage_percent': (len(self._memory_cache) / self._max_memory_cache_size * 100) if self._max_memory_cache_size > 0 else 0,
'last_cleanup': self._last_memory_cache_cleanup,
'cleanup_interval': self._memory_cache_cleanup_interval
}
def log_memory_cache_stats(self) -> None:
"""Log current memory cache statistics."""
stats = self.get_memory_cache_stats()
self.logger.info(f"Memory Cache - Size: {stats['size']}/{stats['max_size']} "
f"({stats['usage_percent']:.1f}%), "
f"Last cleanup: {time.time() - stats['last_cleanup']:.1f}s ago")