mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-04-10 21:03:01 +00:00
* feat(cache): Add intelligent disk cache cleanup with retention policies - Add cleanup_expired_files() method to DiskCache class - Implement retention policies based on cache data types: * Odds data: 2 days (lines move frequently) * Live/recent/leaderboard: 7 days (weekly updates) * News/stocks: 14 days * Upcoming/schedules/team_info/logos: 60 days (stable data) - Add cleanup_disk_cache() orchestration in CacheManager - Start background cleanup thread running every 24 hours - Run cleanup on application startup - Add disk cleanup metrics tracking - Comprehensive logging with cleanup statistics This prevents disk cache from accumulating indefinitely while preserving important season data longer than volatile live game data. * refactor(cache): improve disk cache cleanup implementation - Implement force parameter throttle mechanism in cleanup_disk_cache - Fix TOCTOU race condition in disk cache cleanup (getsize/remove) - Reduce lock contention by processing files outside lock where possible - Add CacheStrategyProtocol for better type safety (replaces Any) - Move time import to module level in cache_metrics - Defer initial cleanup to background thread for non-blocking startup - Add graceful shutdown mechanism with threading.Event for cleanup thread - Add stop_cleanup_thread() method for controlled thread termination * fix(cache): improve disk cache cleanup initialization and error handling - Only start cleanup thread when disk caching is enabled (cache_dir is set) - Remove unused retention policy keys (leaderboard, live_scores, logos) - Handle FileNotFoundError as benign race condition in cleanup - Preserve existing OSError handling for actual file system errors --------- Co-authored-by: Chuck <chuck@example.com>
895 lines
39 KiB
Python
895 lines
39 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
import pytz
|
|
from typing import Any, Dict, List, Optional
|
|
import logging
|
|
import threading
|
|
import tempfile
|
|
from pathlib import Path
|
|
from src.exceptions import CacheError
|
|
from src.cache.memory_cache import MemoryCache
|
|
from src.cache.disk_cache import DiskCache
|
|
from src.cache.cache_strategy import CacheStrategy
|
|
from src.cache.cache_metrics import CacheMetrics
|
|
from src.logging_config import get_logger
|
|
|
|
class DateTimeEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
return super().default(obj)
|
|
|
|
class CacheManager:
|
|
"""Manages caching of API responses to reduce API calls."""
|
|
|
|
def __init__(self) -> None:
|
|
# Initialize logger first
|
|
self.logger: logging.Logger = get_logger(__name__)
|
|
|
|
# Determine the most reliable writable directory
|
|
self.cache_dir: Optional[str] = self._get_writable_cache_dir()
|
|
if self.cache_dir:
|
|
self.logger.info(f"Using cache directory: {self.cache_dir}")
|
|
else:
|
|
# This is a critical failure, as caching is essential.
|
|
self.logger.error("Could not find or create a writable cache directory. Caching will be disabled.")
|
|
self.cache_dir = None
|
|
|
|
# Initialize config manager for sport-specific intervals
|
|
try:
|
|
from src.config_manager import ConfigManager
|
|
self.config_manager: Optional[Any] = ConfigManager()
|
|
self.config_manager.load_config()
|
|
except ImportError:
|
|
self.config_manager: Optional[Any] = None
|
|
self.logger.warning("ConfigManager not available, using default cache intervals")
|
|
|
|
# Initialize cache components using composition
|
|
self._memory_cache_component = MemoryCache(max_size=1000, cleanup_interval=300.0)
|
|
self._disk_cache_component = DiskCache(cache_dir=self.cache_dir, logger=self.logger)
|
|
self._strategy_component = CacheStrategy(config_manager=self.config_manager, logger=self.logger)
|
|
self._metrics_component = CacheMetrics(logger=self.logger)
|
|
|
|
# Keep old attributes for backward compatibility (delegated to components)
|
|
self._memory_cache = self._memory_cache_component._cache
|
|
self._memory_cache_timestamps = self._memory_cache_component._timestamps
|
|
self._cache_lock = self._memory_cache_component._lock
|
|
self._max_memory_cache_size = self._memory_cache_component._max_size
|
|
self._memory_cache_cleanup_interval = self._memory_cache_component._cleanup_interval
|
|
self._last_memory_cache_cleanup = self._memory_cache_component._last_cleanup
|
|
|
|
# Disk cleanup configuration
|
|
self._disk_cleanup_interval_hours = 24 # Run cleanup every 24 hours
|
|
self._disk_cleanup_interval = 3600.0 # Minimum interval between cleanups (1 hour) for throttle
|
|
self._last_disk_cleanup = 0.0 # Timestamp of last disk cleanup
|
|
self._cleanup_thread: Optional[threading.Thread] = None
|
|
self._cleanup_stop_event = threading.Event() # Event to signal thread shutdown
|
|
self._retention_policies = {
|
|
'odds': 2, # Odds data: 2 days (lines move frequently)
|
|
'odds_live': 2, # Live odds: 2 days
|
|
'sports_live': 7, # Live sports: 7 days
|
|
'weather_current': 7, # Current weather: 7 days
|
|
'sports_recent': 7, # Recent games: 7 days
|
|
'news': 14, # News: 14 days
|
|
'sports_upcoming': 60, # Upcoming games: 60 days (schedules stable)
|
|
'sports_schedules': 60, # Schedules: 60 days
|
|
'team_info': 60, # Team info: 60 days
|
|
'stocks': 14, # Stock data: 14 days
|
|
'crypto': 14, # Crypto data: 14 days
|
|
'default': 30 # Default: 30 days
|
|
}
|
|
|
|
# Start background cleanup thread only if disk caching is enabled
|
|
if self.cache_dir:
|
|
self.start_cleanup_thread()
|
|
|
|
def _get_writable_cache_dir(self) -> Optional[str]:
|
|
"""Tries to find or create a writable cache directory, preferring a system path when available."""
|
|
# Attempt 1: System-wide persistent cache directory (preferred for services)
|
|
try:
|
|
system_cache_dir = '/var/cache/ledmatrix'
|
|
if os.path.exists(system_cache_dir):
|
|
test_file = os.path.join(system_cache_dir, '.writetest')
|
|
try:
|
|
with open(test_file, 'w') as f:
|
|
f.write('test')
|
|
os.remove(test_file)
|
|
self.logger.info(f"Using system cache directory: {system_cache_dir}")
|
|
return system_cache_dir
|
|
except (IOError, OSError):
|
|
self.logger.debug(f"System cache directory exists but is not writable: {system_cache_dir}")
|
|
else:
|
|
from pathlib import Path
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_cache_dir_mode
|
|
)
|
|
try:
|
|
ensure_directory_permissions(Path(system_cache_dir), get_cache_dir_mode())
|
|
if os.access(system_cache_dir, os.W_OK):
|
|
self.logger.info(f"Using system cache directory: {system_cache_dir}")
|
|
return system_cache_dir
|
|
except (OSError, IOError, PermissionError) as perm_error:
|
|
# Permission errors are expected when running as non-root
|
|
self.logger.debug(f"Could not create system cache directory (permission denied): {system_cache_dir}")
|
|
except (OSError, IOError, PermissionError) as e:
|
|
# Permission errors are expected when running as non-root, log at DEBUG level
|
|
self.logger.debug(f"System cache directory not available: {e}")
|
|
|
|
# Attempt 2: User's home directory (handling sudo), but avoid /root preference
|
|
try:
|
|
real_user = os.environ.get('SUDO_USER') or os.environ.get('USER', 'default')
|
|
if real_user and real_user != 'root':
|
|
home_dir = os.path.expanduser(f"~{real_user}")
|
|
else:
|
|
# When running as root and /var/cache/ledmatrix failed, still allow fallback to /root
|
|
home_dir = os.path.expanduser('~')
|
|
user_cache_dir = os.path.join(home_dir, '.ledmatrix_cache')
|
|
from pathlib import Path
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_cache_dir_mode
|
|
)
|
|
ensure_directory_permissions(Path(user_cache_dir), get_cache_dir_mode())
|
|
test_file = os.path.join(user_cache_dir, '.writetest')
|
|
with open(test_file, 'w') as f:
|
|
f.write('test')
|
|
os.remove(test_file)
|
|
self.logger.info(f"Using user cache directory: {user_cache_dir}")
|
|
return user_cache_dir
|
|
except (OSError, IOError, PermissionError) as e:
|
|
self.logger.warning(f"Could not use user-specific cache directory: {e}")
|
|
|
|
# Attempt 3: /opt/ledmatrix/cache (alternative persistent location)
|
|
try:
|
|
opt_cache_dir = '/opt/ledmatrix/cache'
|
|
|
|
# Check if directory exists and we can write to it
|
|
if os.path.exists(opt_cache_dir):
|
|
# Test if we can write to the existing directory
|
|
test_file = os.path.join(opt_cache_dir, '.writetest')
|
|
try:
|
|
with open(test_file, 'w') as f:
|
|
f.write('test')
|
|
os.remove(test_file)
|
|
return opt_cache_dir
|
|
except (IOError, OSError):
|
|
self.logger.warning(f"Directory exists but is not writable: {opt_cache_dir}")
|
|
else:
|
|
# Try to create the directory
|
|
from pathlib import Path
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_cache_dir_mode
|
|
)
|
|
ensure_directory_permissions(Path(opt_cache_dir), get_cache_dir_mode())
|
|
if os.access(opt_cache_dir, os.W_OK):
|
|
return opt_cache_dir
|
|
except (OSError, IOError, PermissionError) as e:
|
|
self.logger.warning(f"Could not use /opt/ledmatrix/cache: {e}", exc_info=True)
|
|
|
|
# Attempt 4: System-wide temporary directory (fallback, not persistent)
|
|
try:
|
|
temp_cache_dir = os.path.join(tempfile.gettempdir(), 'ledmatrix_cache')
|
|
from pathlib import Path
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_cache_dir_mode
|
|
)
|
|
ensure_directory_permissions(Path(temp_cache_dir), get_cache_dir_mode())
|
|
if os.access(temp_cache_dir, os.W_OK):
|
|
self.logger.warning("Using temporary cache directory - cache will NOT persist across restarts")
|
|
return temp_cache_dir
|
|
except (OSError, IOError, PermissionError) as e:
|
|
self.logger.warning(f"Could not use system-wide temporary cache directory: {e}", exc_info=True)
|
|
|
|
# Return None if no directory is writable
|
|
return None
|
|
|
|
def _cleanup_memory_cache(self, force: bool = False) -> int:
|
|
"""
|
|
Clean up expired entries from memory cache and enforce size limits.
|
|
|
|
Args:
|
|
force: If True, perform cleanup regardless of time interval
|
|
|
|
Returns:
|
|
Number of entries removed
|
|
"""
|
|
now = time.time()
|
|
|
|
# Check if cleanup is needed
|
|
if not force and (now - self._last_memory_cache_cleanup) < self._memory_cache_cleanup_interval:
|
|
return 0
|
|
|
|
with self._cache_lock:
|
|
removed_count = 0
|
|
current_time = time.time()
|
|
|
|
# Remove expired entries (entries older than 1 hour without access are considered expired)
|
|
# We use a conservative TTL of 1 hour for cleanup
|
|
max_age_for_cleanup = 3600 # 1 hour
|
|
|
|
expired_keys = []
|
|
for key, timestamp in list(self._memory_cache_timestamps.items()):
|
|
if isinstance(timestamp, str):
|
|
try:
|
|
timestamp = float(timestamp)
|
|
except ValueError:
|
|
timestamp = None
|
|
|
|
if timestamp is None or (current_time - timestamp) > max_age_for_cleanup:
|
|
expired_keys.append(key)
|
|
|
|
# Remove expired entries
|
|
for key in expired_keys:
|
|
self._memory_cache.pop(key, None)
|
|
self._memory_cache_timestamps.pop(key, None)
|
|
removed_count += 1
|
|
|
|
# Enforce size limit by removing oldest entries if cache is too large
|
|
if len(self._memory_cache) > self._max_memory_cache_size:
|
|
# Sort by timestamp (oldest first)
|
|
sorted_entries = sorted(
|
|
self._memory_cache_timestamps.items(),
|
|
key=lambda x: float(x[1]) if isinstance(x[1], (int, float)) else 0
|
|
)
|
|
|
|
# Remove oldest entries until we're under the limit
|
|
excess_count = len(self._memory_cache) - self._max_memory_cache_size
|
|
for i in range(excess_count):
|
|
if i < len(sorted_entries):
|
|
key = sorted_entries[i][0]
|
|
self._memory_cache.pop(key, None)
|
|
self._memory_cache_timestamps.pop(key, None)
|
|
removed_count += 1
|
|
|
|
self._last_memory_cache_cleanup = current_time
|
|
|
|
if removed_count > 0:
|
|
self.logger.debug(f"Memory cache cleanup: removed {removed_count} entries (current size: {len(self._memory_cache)})")
|
|
|
|
return removed_count
|
|
|
|
def _get_cache_path(self, key: str) -> Optional[str]:
|
|
"""Get the path for a cache file."""
|
|
return self._disk_cache_component.get_cache_path(key)
|
|
|
|
def get_cached_data(self, key: str, max_age: int = 300, memory_ttl: Optional[int] = None) -> Optional[Dict[str, Any]]:
|
|
"""Get data from cache (memory first, then disk) honoring TTLs.
|
|
|
|
- memory_ttl: TTL for in-memory entry; defaults to max_age if not provided
|
|
- max_age: TTL for persisted (on-disk) entry based on the stored timestamp
|
|
"""
|
|
# Periodic cleanup of memory cache
|
|
self._cleanup_memory_cache()
|
|
|
|
in_memory_ttl = memory_ttl if memory_ttl is not None else max_age
|
|
|
|
# 1) Memory cache
|
|
cached = self._memory_cache_component.get(key, max_age=in_memory_ttl)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
# 2) Disk cache
|
|
record = self._disk_cache_component.get(key, max_age=max_age)
|
|
if record is not None:
|
|
# Hydrate memory cache (use current time to start memory TTL window)
|
|
self._memory_cache_component.set(key, record)
|
|
return record
|
|
|
|
# 3) Miss
|
|
return None
|
|
|
|
def save_cache(self, key: str, data: Dict[str, Any]) -> None:
|
|
"""
|
|
Save data to cache.
|
|
Args:
|
|
key: Cache key
|
|
data: Data to cache
|
|
"""
|
|
# Periodic cleanup before adding new entries
|
|
self._cleanup_memory_cache()
|
|
|
|
# Update memory cache first
|
|
self._memory_cache_component.set(key, data)
|
|
|
|
# Save to disk cache
|
|
try:
|
|
self._disk_cache_component.set(key, data)
|
|
except CacheError:
|
|
# Disk cache errors are already logged and raised by DiskCache
|
|
raise
|
|
|
|
def load_cache(self, key: str) -> Optional[Dict[str, Any]]:
|
|
"""Load data from cache with memory caching."""
|
|
# Check memory cache first (1 minute TTL)
|
|
cached = self._memory_cache_component.get(key, max_age=60)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
# Check disk cache
|
|
data = self._disk_cache_component.get(key, max_age=3600) # 1 hour for load_cache
|
|
if data is not None:
|
|
# Update memory cache
|
|
self._memory_cache_component.set(key, data)
|
|
return data
|
|
|
|
return None
|
|
|
|
def clear_cache(self, key: Optional[str] = None) -> None:
|
|
"""Clear cache for a specific key or all keys."""
|
|
if key:
|
|
# Clear specific key
|
|
self._memory_cache_component.clear(key)
|
|
self._disk_cache_component.clear(key)
|
|
self.logger.info("Cleared cache for key: %s", key)
|
|
else:
|
|
# Clear all keys
|
|
memory_count = self._memory_cache_component.size()
|
|
self._memory_cache_component.clear()
|
|
self._disk_cache_component.clear()
|
|
self.logger.info("Cleared all cache: %d memory entries", memory_count)
|
|
|
|
def list_cache_files(self) -> List[Dict[str, Any]]:
|
|
"""List all cache files with metadata (key, age, size, path).
|
|
|
|
Returns:
|
|
List of dicts with keys: 'key', 'filename', 'age_seconds', 'age_display',
|
|
'size_bytes', 'size_display', 'path', 'modified_time'
|
|
"""
|
|
if not self.cache_dir or not os.path.exists(self.cache_dir):
|
|
return []
|
|
|
|
cache_files = []
|
|
current_time = time.time()
|
|
|
|
try:
|
|
with self._cache_lock:
|
|
for filename in os.listdir(self.cache_dir):
|
|
if not filename.endswith('.json'):
|
|
continue
|
|
|
|
# Extract key from filename (remove .json extension)
|
|
key = filename[:-5] # Remove '.json'
|
|
|
|
file_path = os.path.join(self.cache_dir, filename)
|
|
|
|
try:
|
|
# Get file stats
|
|
stat_info = os.stat(file_path)
|
|
size_bytes = stat_info.st_size
|
|
modified_time = stat_info.st_mtime
|
|
age_seconds = current_time - modified_time
|
|
|
|
# Format age display
|
|
if age_seconds < 60:
|
|
age_display = f"{int(age_seconds)}s"
|
|
elif age_seconds < 3600:
|
|
age_display = f"{int(age_seconds / 60)}m"
|
|
elif age_seconds < 86400:
|
|
age_display = f"{int(age_seconds / 3600)}h"
|
|
else:
|
|
age_display = f"{int(age_seconds / 86400)}d"
|
|
|
|
# Format size display
|
|
if size_bytes < 1024:
|
|
size_display = f"{size_bytes}B"
|
|
elif size_bytes < 1024 * 1024:
|
|
size_display = f"{size_bytes / 1024:.1f}KB"
|
|
else:
|
|
size_display = f"{size_bytes / (1024 * 1024):.1f}MB"
|
|
|
|
cache_files.append({
|
|
'key': key,
|
|
'filename': filename,
|
|
'age_seconds': age_seconds,
|
|
'age_display': age_display,
|
|
'size_bytes': size_bytes,
|
|
'size_display': size_display,
|
|
'path': file_path,
|
|
'modified_time': modified_time,
|
|
'modified_datetime': datetime.fromtimestamp(modified_time).isoformat()
|
|
})
|
|
except OSError as e:
|
|
self.logger.warning(f"Error getting stats for cache file {filename} at {file_path}: {e}", exc_info=True)
|
|
continue
|
|
|
|
except OSError as e:
|
|
self.logger.error(f"Error listing cache directory {self.cache_dir}: {e}", exc_info=True)
|
|
return []
|
|
|
|
# Sort by modified time (newest first)
|
|
cache_files.sort(key=lambda x: x['modified_time'], reverse=True)
|
|
return cache_files
|
|
|
|
def get_cache_dir(self) -> Optional[str]:
|
|
"""Get the cache directory path."""
|
|
return self.cache_dir
|
|
|
|
def has_data_changed(self, data_type: str, new_data: Dict[str, Any]) -> bool:
|
|
"""Check if data has changed from cached version."""
|
|
cached_data = self.load_cache(data_type)
|
|
if not cached_data:
|
|
return True
|
|
|
|
if data_type == 'weather':
|
|
return self._has_weather_changed(cached_data, new_data)
|
|
elif data_type == 'stocks':
|
|
return self._has_stocks_changed(cached_data, new_data)
|
|
elif data_type == 'stock_news':
|
|
return self._has_news_changed(cached_data, new_data)
|
|
elif data_type == 'nhl':
|
|
return self._has_nhl_changed(cached_data, new_data)
|
|
elif data_type == 'mlb':
|
|
return self._has_mlb_changed(cached_data, new_data)
|
|
|
|
return True
|
|
|
|
def _has_weather_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
|
|
"""Check if weather data has changed."""
|
|
# Handle new cache structure where data is nested under 'data' key
|
|
if 'data' in cached:
|
|
cached = cached['data']
|
|
|
|
# Handle case where cached data might be the weather data directly
|
|
if 'current' in cached:
|
|
# This is the new structure with 'current' and 'forecast' keys
|
|
current_weather = cached.get('current', {})
|
|
if current_weather and 'main' in current_weather and 'weather' in current_weather:
|
|
cached_temp = round(current_weather['main']['temp'])
|
|
cached_condition = current_weather['weather'][0]['main']
|
|
return (cached_temp != new.get('temp') or
|
|
cached_condition != new.get('condition'))
|
|
|
|
# Handle old structure where temp and condition are directly accessible
|
|
return (cached.get('temp') != new.get('temp') or
|
|
cached.get('condition') != new.get('condition'))
|
|
|
|
def _has_stocks_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
|
|
"""Check if stock data has changed."""
|
|
if not self._is_market_open():
|
|
return False
|
|
return cached.get('price') != new.get('price')
|
|
|
|
def _has_news_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
|
|
"""Check if news data has changed."""
|
|
# Handle both dictionary and list formats
|
|
if isinstance(new, list):
|
|
# If new data is a list, cached data should also be a list
|
|
if not isinstance(cached, list):
|
|
return True
|
|
# Compare lengths and content
|
|
if len(cached) != len(new):
|
|
return True
|
|
# Compare titles since they're unique enough for our purposes
|
|
cached_titles = set(item.get('title', '') for item in cached)
|
|
new_titles = set(item.get('title', '') for item in new)
|
|
return cached_titles != new_titles
|
|
else:
|
|
# Original dictionary format handling
|
|
cached_headlines = set(h.get('id') for h in cached.get('headlines', []))
|
|
new_headlines = set(h.get('id') for h in new.get('headlines', []))
|
|
return not cached_headlines.issuperset(new_headlines)
|
|
|
|
def _has_nhl_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
|
|
"""Check if NHL data has changed."""
|
|
return (cached.get('game_status') != new.get('game_status') or
|
|
cached.get('score') != new.get('score'))
|
|
|
|
def _has_mlb_changed(self, cached: Dict[str, Any], new: Dict[str, Any]) -> bool:
|
|
"""Check if MLB game data has changed."""
|
|
if not cached or not new:
|
|
return True
|
|
|
|
# Check if any games have changed status or score
|
|
for game_id, new_game in new.items():
|
|
cached_game = cached.get(game_id)
|
|
if not cached_game:
|
|
return True
|
|
|
|
# Check for score changes
|
|
if (new_game['away_score'] != cached_game['away_score'] or
|
|
new_game['home_score'] != cached_game['home_score']):
|
|
return True
|
|
|
|
# Check for status changes
|
|
if new_game['status'] != cached_game['status']:
|
|
return True
|
|
|
|
# For live games, check inning and count
|
|
if new_game['status'] == 'in':
|
|
if (new_game['inning'] != cached_game['inning'] or
|
|
new_game['inning_half'] != cached_game['inning_half'] or
|
|
new_game['balls'] != cached_game['balls'] or
|
|
new_game['strikes'] != cached_game['strikes'] or
|
|
new_game['bases_occupied'] != cached_game['bases_occupied']):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _is_market_open(self) -> bool:
|
|
"""Check if the US stock market is currently open."""
|
|
return self._strategy_component.is_market_open()
|
|
|
|
def update_cache(self, data_type: str, data: Dict[str, Any]) -> bool:
|
|
"""Update cache with new data."""
|
|
cache_data = {
|
|
'data': data,
|
|
'timestamp': time.time()
|
|
}
|
|
return self.save_cache(data_type, cache_data)
|
|
|
|
def get(self, key: str, max_age: int = 300) -> Optional[Dict[str, Any]]:
|
|
"""Get data from cache if it exists and is not stale."""
|
|
cached_data = self.get_cached_data(key, max_age)
|
|
if cached_data and 'data' in cached_data:
|
|
return cached_data['data']
|
|
return cached_data
|
|
|
|
def set(self, key: str, data: Dict[str, Any], ttl: Optional[int] = None) -> None:
|
|
"""
|
|
Store data in cache with current timestamp.
|
|
|
|
Args:
|
|
key: Cache key
|
|
data: Data to cache
|
|
ttl: Optional time-to-live in seconds (stored for compatibility but
|
|
expiration is still controlled via max_age when reading)
|
|
"""
|
|
cache_data = {
|
|
'data': data,
|
|
'timestamp': time.time()
|
|
}
|
|
if ttl is not None:
|
|
cache_data['ttl'] = ttl
|
|
self.save_cache(key, cache_data)
|
|
|
|
def setup_persistent_cache(self) -> bool:
|
|
"""
|
|
Set up a persistent cache directory with proper permissions.
|
|
This should be run once with sudo to create the directory.
|
|
"""
|
|
try:
|
|
# Try to create /var/cache/ledmatrix with proper permissions
|
|
from pathlib import Path
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_cache_dir_mode
|
|
)
|
|
cache_dir = '/var/cache/ledmatrix'
|
|
cache_dir_path = Path(cache_dir)
|
|
ensure_directory_permissions(cache_dir_path, get_cache_dir_mode())
|
|
|
|
# Set ownership to the real user (not root)
|
|
real_user = os.environ.get('SUDO_USER')
|
|
if real_user:
|
|
import pwd
|
|
try:
|
|
uid = pwd.getpwnam(real_user).pw_uid
|
|
gid = pwd.getpwnam(real_user).pw_gid
|
|
os.chown(cache_dir, uid, gid)
|
|
self.logger.info(f"Set ownership of {cache_dir} to {real_user}")
|
|
except (OSError, KeyError) as e:
|
|
self.logger.warning(f"Could not set ownership for {cache_dir}: {e}", exc_info=True)
|
|
|
|
self.logger.info(f"Successfully set up persistent cache directory: {cache_dir}")
|
|
return True
|
|
|
|
except (OSError, IOError, PermissionError) as e:
|
|
self.logger.error(f"Failed to set up persistent cache directory {cache_dir}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def cleanup_disk_cache(self, force: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Clean up expired disk cache files based on retention policies.
|
|
|
|
Args:
|
|
force: If True, run cleanup regardless of last cleanup time
|
|
|
|
Returns:
|
|
Dictionary with cleanup statistics
|
|
"""
|
|
now = time.time()
|
|
|
|
# Check if cleanup is needed (throttle to prevent too-frequent cleanups)
|
|
if not force and (now - self._last_disk_cleanup) < self._disk_cleanup_interval:
|
|
return {
|
|
'files_scanned': 0,
|
|
'files_deleted': 0,
|
|
'space_freed_mb': 0.0,
|
|
'errors': 0,
|
|
'duration_sec': 0.0
|
|
}
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Perform cleanup
|
|
stats = self._disk_cache_component.cleanup_expired_files(
|
|
cache_strategy=self._strategy_component,
|
|
retention_policies=self._retention_policies
|
|
)
|
|
|
|
duration = time.time() - start_time
|
|
space_freed_mb = stats['space_freed_bytes'] / (1024 * 1024)
|
|
|
|
# Record metrics
|
|
self._metrics_component.record_disk_cleanup(
|
|
files_cleaned=stats['files_deleted'],
|
|
space_freed_mb=space_freed_mb,
|
|
duration_sec=duration
|
|
)
|
|
|
|
# Log summary
|
|
if stats['files_deleted'] > 0:
|
|
self.logger.info(
|
|
"Disk cache cleanup completed: %d/%d files deleted, %.2f MB freed, %d errors, took %.2fs",
|
|
stats['files_deleted'], stats['files_scanned'], space_freed_mb,
|
|
stats['errors'], duration
|
|
)
|
|
else:
|
|
self.logger.debug(
|
|
"Disk cache cleanup completed: no files to delete (%d files scanned)",
|
|
stats['files_scanned']
|
|
)
|
|
|
|
# Update last cleanup time
|
|
self._last_disk_cleanup = time.time()
|
|
|
|
return {
|
|
'files_scanned': stats['files_scanned'],
|
|
'files_deleted': stats['files_deleted'],
|
|
'space_freed_mb': space_freed_mb,
|
|
'errors': stats['errors'],
|
|
'duration_sec': duration
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error("Error during disk cache cleanup: %s", e, exc_info=True)
|
|
return {
|
|
'files_scanned': 0,
|
|
'files_deleted': 0,
|
|
'space_freed_mb': 0.0,
|
|
'errors': 1,
|
|
'duration_sec': time.time() - start_time
|
|
}
|
|
|
|
def start_cleanup_thread(self) -> None:
|
|
"""Start background thread for periodic disk cache cleanup."""
|
|
if self._cleanup_thread and self._cleanup_thread.is_alive():
|
|
self.logger.debug("Cleanup thread already running")
|
|
return
|
|
|
|
def cleanup_loop():
|
|
"""Background loop that runs cleanup periodically."""
|
|
self.logger.info("Disk cache cleanup thread started (interval: %d hours)",
|
|
self._disk_cleanup_interval_hours)
|
|
|
|
# Run initial cleanup on startup (deferred from __init__ to avoid blocking)
|
|
try:
|
|
self.logger.debug("Running initial disk cache cleanup")
|
|
self.cleanup_disk_cache()
|
|
except Exception as e:
|
|
self.logger.error("Error in initial cleanup: %s", e, exc_info=True)
|
|
|
|
# Main cleanup loop
|
|
while not self._cleanup_stop_event.is_set():
|
|
try:
|
|
# Sleep for the configured interval (interruptible)
|
|
sleep_seconds = self._disk_cleanup_interval_hours * 3600
|
|
if self._cleanup_stop_event.wait(timeout=sleep_seconds):
|
|
# Event was set, exit loop
|
|
break
|
|
|
|
# Run cleanup if not stopped
|
|
if not self._cleanup_stop_event.is_set():
|
|
self.logger.debug("Running scheduled disk cache cleanup")
|
|
self.cleanup_disk_cache()
|
|
|
|
except Exception as e:
|
|
self.logger.error("Error in cleanup thread: %s", e, exc_info=True)
|
|
# Continue running despite errors, but use interruptible sleep
|
|
if self._cleanup_stop_event.wait(timeout=60):
|
|
# Event was set during error recovery sleep, exit loop
|
|
break
|
|
|
|
self.logger.info("Disk cache cleanup thread stopped")
|
|
|
|
self._cleanup_stop_event.clear() # Reset event before starting thread
|
|
self._cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="DiskCacheCleanup")
|
|
self._cleanup_thread.start()
|
|
self.logger.info("Started disk cache cleanup background thread")
|
|
|
|
def stop_cleanup_thread(self) -> None:
|
|
"""
|
|
Stop the background cleanup thread gracefully.
|
|
|
|
Signals the thread to stop and waits for it to finish (with timeout).
|
|
This allows for clean shutdown during testing or application termination.
|
|
"""
|
|
if not self._cleanup_thread or not self._cleanup_thread.is_alive():
|
|
self.logger.debug("Cleanup thread not running")
|
|
return
|
|
|
|
self.logger.info("Stopping disk cache cleanup thread...")
|
|
self._cleanup_stop_event.set() # Signal thread to stop
|
|
|
|
# Wait for thread to finish (with timeout to avoid hanging)
|
|
self._cleanup_thread.join(timeout=5.0)
|
|
|
|
if self._cleanup_thread.is_alive():
|
|
self.logger.warning("Cleanup thread did not stop within timeout, thread may still be running")
|
|
else:
|
|
self.logger.info("Disk cache cleanup thread stopped successfully")
|
|
|
|
def get_sport_live_interval(self, sport_key: str) -> int:
|
|
"""
|
|
Get the live_update_interval for a specific sport from config.
|
|
Falls back to default values if config is not available.
|
|
"""
|
|
return self._strategy_component.get_sport_live_interval(sport_key)
|
|
|
|
def get_cache_strategy(self, data_type: str, sport_key: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Get cache strategy for different data types.
|
|
Now respects sport-specific live_update_interval configurations.
|
|
"""
|
|
return self._strategy_component.get_cache_strategy(data_type, sport_key)
|
|
|
|
def get_data_type_from_key(self, key: str) -> str:
|
|
"""
|
|
Determine the appropriate cache strategy based on the cache key.
|
|
This helps automatically select the right cache duration.
|
|
"""
|
|
return self._strategy_component.get_data_type_from_key(key)
|
|
|
|
def get_sport_key_from_cache_key(self, key: str) -> Optional[str]:
|
|
"""
|
|
Extract sport key from cache key to determine appropriate live_update_interval.
|
|
"""
|
|
return self._strategy_component.get_sport_key_from_cache_key(key)
|
|
|
|
def get_cached_data_with_strategy(self, key: str, data_type: str = 'default') -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get data from cache using data-type-specific strategy.
|
|
Now respects sport-specific live_update_interval configurations.
|
|
"""
|
|
# Extract sport key for live sports data
|
|
sport_key = None
|
|
if data_type in ['sports_live', 'live_scores']:
|
|
sport_key = self._strategy_component.get_sport_key_from_cache_key(key)
|
|
|
|
strategy = self._strategy_component.get_cache_strategy(data_type, sport_key)
|
|
max_age = strategy['max_age']
|
|
memory_ttl = strategy.get('memory_ttl', max_age)
|
|
|
|
# For market data, check if market is open
|
|
if strategy.get('market_hours_only', False) and not self._strategy_component.is_market_open():
|
|
# During off-hours, extend cache duration
|
|
max_age *= 4 # 4x longer cache during off-hours
|
|
|
|
record = self.get_cached_data(key, max_age, memory_ttl)
|
|
# Unwrap if stored in { 'data': ..., 'timestamp': ... }
|
|
if isinstance(record, dict) and 'data' in record:
|
|
return record['data']
|
|
return record
|
|
|
|
def get_with_auto_strategy(self, key: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get cached data using automatically determined strategy.
|
|
Now respects sport-specific live_update_interval configurations.
|
|
"""
|
|
data_type = self.get_data_type_from_key(key)
|
|
return self.get_cached_data_with_strategy(key, data_type)
|
|
|
|
def get_background_cached_data(self, key: str, sport_key: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get data from background service cache with appropriate strategy.
|
|
This method is specifically designed for Recent/Upcoming managers
|
|
to use data cached by the background service.
|
|
|
|
Args:
|
|
key: Cache key to retrieve
|
|
sport_key: Sport key for determining appropriate cache strategy
|
|
|
|
Returns:
|
|
Cached data if available and fresh, None otherwise
|
|
"""
|
|
# Determine the appropriate cache strategy
|
|
data_type = self.get_data_type_from_key(key)
|
|
strategy = self.get_cache_strategy(data_type, sport_key)
|
|
|
|
# For Recent/Upcoming managers, we want to use the background service cache
|
|
# which should have longer TTLs than the individual manager caches
|
|
max_age = strategy['max_age']
|
|
memory_ttl = strategy.get('memory_ttl', max_age)
|
|
|
|
# Get the cached data
|
|
cached_data = self.get_cached_data(key, max_age, memory_ttl)
|
|
|
|
if cached_data:
|
|
# Record cache hit for performance monitoring
|
|
self.record_cache_hit('background')
|
|
# Unwrap if stored in { 'data': ..., 'timestamp': ... } format
|
|
if isinstance(cached_data, dict) and 'data' in cached_data:
|
|
return cached_data['data']
|
|
return cached_data
|
|
|
|
# Record cache miss for performance monitoring
|
|
self.record_cache_miss('background')
|
|
return None
|
|
|
|
def is_background_data_available(self, key: str, sport_key: Optional[str] = None) -> bool:
|
|
"""
|
|
Check if background service has fresh data available.
|
|
This helps Recent/Upcoming managers determine if they should
|
|
wait for background data or fetch immediately.
|
|
"""
|
|
data_type = self.get_data_type_from_key(key)
|
|
strategy = self.get_cache_strategy(data_type, sport_key)
|
|
|
|
# Check if we have data that's still fresh according to background service TTL
|
|
cached_data = self.get_cached_data(key, strategy['max_age'])
|
|
return cached_data is not None
|
|
|
|
def generate_sport_cache_key(self, sport: str, date_str: Optional[str] = None) -> str:
|
|
"""
|
|
Centralized cache key generation for sports data.
|
|
This ensures consistent cache keys across background service and managers.
|
|
|
|
Args:
|
|
sport: Sport identifier (e.g., 'nba', 'nfl', 'ncaa_fb')
|
|
date_str: Date string in YYYYMMDD format. If None, uses current UTC date.
|
|
|
|
Returns:
|
|
Cache key in format: {sport}_{date}
|
|
"""
|
|
if date_str is None:
|
|
date_str = datetime.now(pytz.utc).strftime('%Y%m%d')
|
|
return f"{sport}_{date_str}"
|
|
|
|
def record_cache_hit(self, cache_type: str = 'regular') -> None:
|
|
"""Record a cache hit for performance monitoring."""
|
|
self._metrics_component.record_hit(cache_type)
|
|
|
|
def record_cache_miss(self, cache_type: str = 'regular') -> None:
|
|
"""Record a cache miss for performance monitoring."""
|
|
self._metrics_component.record_miss(cache_type)
|
|
|
|
def record_fetch_time(self, duration: float) -> None:
|
|
"""Record fetch operation duration for performance monitoring."""
|
|
self._metrics_component.record_fetch_time(duration)
|
|
|
|
def get_cache_metrics(self) -> Dict[str, Any]:
|
|
"""Get current cache performance metrics."""
|
|
return self._metrics_component.get_metrics()
|
|
|
|
def log_cache_metrics(self) -> None:
|
|
"""Log current cache performance metrics."""
|
|
self._metrics_component.log_metrics()
|
|
|
|
def get_memory_cache_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
Get statistics about the memory cache.
|
|
|
|
Returns:
|
|
Dictionary with memory cache statistics
|
|
"""
|
|
with self._cache_lock:
|
|
return {
|
|
'size': len(self._memory_cache),
|
|
'max_size': self._max_memory_cache_size,
|
|
'usage_percent': (len(self._memory_cache) / self._max_memory_cache_size * 100) if self._max_memory_cache_size > 0 else 0,
|
|
'last_cleanup': self._last_memory_cache_cleanup,
|
|
'cleanup_interval': self._memory_cache_cleanup_interval
|
|
}
|
|
|
|
def log_memory_cache_stats(self) -> None:
|
|
"""Log current memory cache statistics."""
|
|
stats = self.get_memory_cache_stats()
|
|
self.logger.info(f"Memory Cache - Size: {stats['size']}/{stats['max_size']} "
|
|
f"({stats['usage_percent']:.1f}%), "
|
|
f"Last cleanup: {time.time() - stats['last_cleanup']:.1f}s ago") |