feat(cache): Add intelligent disk cache cleanup with retention policies (#199)

* feat(cache): Add intelligent disk cache cleanup with retention policies

- Add cleanup_expired_files() method to DiskCache class
- Implement retention policies based on cache data types:
  * Odds data: 2 days (lines move frequently)
  * Live/recent/leaderboard: 7 days (weekly updates)
  * News/stocks: 14 days
  * Upcoming/schedules/team_info/logos: 60 days (stable data)
- Add cleanup_disk_cache() orchestration in CacheManager
- Start background cleanup thread running every 24 hours
- Run cleanup on application startup
- Add disk cleanup metrics tracking
- Comprehensive logging with cleanup statistics

This prevents disk cache from accumulating indefinitely while preserving
important season data longer than volatile live game data.

* refactor(cache): improve disk cache cleanup implementation

- Implement force parameter throttle mechanism in cleanup_disk_cache
- Fix TOCTOU race condition in disk cache cleanup (getsize/remove)
- Reduce lock contention by processing files outside lock where possible
- Add CacheStrategyProtocol for better type safety (replaces Any)
- Move time import to module level in cache_metrics
- Defer initial cleanup to background thread for non-blocking startup
- Add graceful shutdown mechanism with threading.Event for cleanup thread
- Add stop_cleanup_thread() method for controlled thread termination

* fix(cache): improve disk cache cleanup initialization and error handling

- Only start cleanup thread when disk caching is enabled (cache_dir is set)
- Remove unused retention policy keys (leaderboard, live_scores, logos)
- Handle FileNotFoundError as benign race condition in cleanup
- Preserve existing OSError handling for actual file system errors

---------

Co-authored-by: Chuck <chuck@example.com>
This commit is contained in:
Chuck
2026-01-19 15:57:19 -05:00
committed by GitHub
parent bc23b7c75c
commit 2381ead03f
3 changed files with 331 additions and 42 deletions

View File

@@ -5,6 +5,7 @@ Tracks cache performance metrics including hit rates, miss rates, and fetch time
"""
import threading
import time
import logging
from typing import Dict, Any, Optional
@@ -28,7 +29,12 @@ class CacheMetrics:
'background_hits': 0,
'background_misses': 0,
'total_fetch_time': 0.0,
'fetch_count': 0
'fetch_count': 0,
# Disk cleanup metrics
'last_disk_cleanup': 0.0,
'total_files_cleaned': 0,
'total_space_freed_mb': 0.0,
'last_cleanup_duration_sec': 0.0
}
def record_hit(self, cache_type: str = 'regular') -> None:
@@ -69,6 +75,21 @@ class CacheMetrics:
self._metrics['total_fetch_time'] += duration
self._metrics['fetch_count'] += 1
def record_disk_cleanup(self, files_cleaned: int, space_freed_mb: float, duration_sec: float) -> None:
"""
Record disk cleanup operation results.
Args:
files_cleaned: Number of files deleted
space_freed_mb: Space freed in megabytes
duration_sec: Duration of cleanup operation in seconds
"""
with self._lock:
self._metrics['last_disk_cleanup'] = time.time()
self._metrics['total_files_cleaned'] += files_cleaned
self._metrics['total_space_freed_mb'] += space_freed_mb
self._metrics['last_cleanup_duration_sec'] = duration_sec
def get_metrics(self) -> Dict[str, Any]:
"""
Get current cache performance metrics.
@@ -93,7 +114,12 @@ class CacheMetrics:
'api_calls_saved': self._metrics['api_calls_saved'],
'average_fetch_time': avg_fetch_time,
'total_fetch_time': self._metrics['total_fetch_time'],
'fetch_count': self._metrics['fetch_count']
'fetch_count': self._metrics['fetch_count'],
# Disk cleanup metrics
'last_disk_cleanup': self._metrics['last_disk_cleanup'],
'total_files_cleaned': self._metrics['total_files_cleaned'],
'total_space_freed_mb': self._metrics['total_space_freed_mb'],
'last_cleanup_duration_sec': self._metrics['last_cleanup_duration_sec']
}
def log_metrics(self) -> None:

View File

@@ -10,12 +10,28 @@ import time
import tempfile
import logging
import threading
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, Protocol
from datetime import datetime
from src.exceptions import CacheError
class CacheStrategyProtocol(Protocol):
"""Protocol for cache strategy objects that categorize cache keys."""
def get_data_type_from_key(self, key: str) -> str:
"""
Determine the data type from a cache key.
Args:
key: Cache key
Returns:
Data type string for strategy lookup
"""
...
class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that handles datetime objects."""
def default(self, obj: Any) -> Any:
@@ -269,4 +285,116 @@ class DiskCache:
def get_cache_dir(self) -> Optional[str]:
"""Get the cache directory path."""
return self.cache_dir
def cleanup_expired_files(self, cache_strategy: CacheStrategyProtocol, retention_policies: Dict[str, int]) -> Dict[str, Any]:
"""
Clean up expired cache files based on retention policies.
Args:
cache_strategy: Object implementing CacheStrategyProtocol for categorizing files
retention_policies: Dict mapping data types to retention days
Returns:
Dictionary with cleanup statistics:
- files_scanned: Total files checked
- files_deleted: Files removed
- space_freed_bytes: Bytes freed
- errors: Number of errors encountered
"""
if not self.cache_dir or not os.path.exists(self.cache_dir):
self.logger.warning("Cache directory not available for cleanup")
return {'files_scanned': 0, 'files_deleted': 0, 'space_freed_bytes': 0, 'errors': 0}
stats = {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_bytes': 0,
'errors': 0
}
current_time = time.time()
try:
# Collect files to process outside the lock to avoid blocking cache operations
# Only hold lock during directory listing to get snapshot of files
try:
with self._lock:
# Get snapshot of files while holding lock briefly
filenames = [f for f in os.listdir(self.cache_dir) if f.endswith('.json')]
except OSError as list_error:
self.logger.error("Error listing cache directory %s: %s", self.cache_dir, list_error, exc_info=True)
stats['errors'] += 1
return stats
# Process files outside the lock to avoid blocking get/set operations
for filename in filenames:
stats['files_scanned'] += 1
file_path = os.path.join(self.cache_dir, filename)
try:
# Get file age (outside lock - stat operations are generally atomic)
file_mtime = os.path.getmtime(file_path)
file_age_days = (current_time - file_mtime) / 86400 # Convert to days
# Extract cache key from filename (remove .json extension)
cache_key = filename[:-5]
# Determine data type and retention policy
data_type = cache_strategy.get_data_type_from_key(cache_key)
retention_days = retention_policies.get(data_type, retention_policies.get('default', 30))
# Delete if older than retention period
# Only hold lock during actual file deletion to ensure atomicity
if file_age_days > retention_days:
try:
# Hold lock only during delete operation (get size and remove atomically)
with self._lock:
# Double-check file still exists (may have been deleted by another process)
if os.path.exists(file_path):
try:
file_size = os.path.getsize(file_path)
os.remove(file_path)
# Only increment stats if removal succeeded
stats['files_deleted'] += 1
stats['space_freed_bytes'] += file_size
self.logger.debug(
"Deleted expired cache file: %s (age: %.1f days, type: %s, retention: %d days)",
filename, file_age_days, data_type, retention_days
)
except FileNotFoundError:
# File was deleted by another process between exists check and remove
# This is a benign race condition, silently continue
pass
else:
# File was deleted by another process before lock was acquired
# This is a benign race condition, silently continue
pass
except FileNotFoundError:
# File was already deleted by another process, skip it
# This is a benign race condition, silently continue
continue
except OSError as e:
# Other file system errors, log but don't fail the entire cleanup
stats['errors'] += 1
self.logger.warning("Error deleting cache file %s: %s", filename, e)
continue
except FileNotFoundError:
# File was deleted by another process between listing and processing
# This is a benign race condition, silently continue
continue
except OSError as e:
stats['errors'] += 1
self.logger.warning("Error processing cache file %s: %s", filename, e)
continue
except Exception as e:
stats['errors'] += 1
self.logger.error("Unexpected error processing cache file %s: %s", filename, e, exc_info=True)
continue
except OSError as e:
self.logger.error("Error listing cache directory %s: %s", self.cache_dir, e, exc_info=True)
stats['errors'] += 1
return stats

View File

@@ -59,6 +59,31 @@ class CacheManager:
self._max_memory_cache_size = self._memory_cache_component._max_size
self._memory_cache_cleanup_interval = self._memory_cache_component._cleanup_interval
self._last_memory_cache_cleanup = self._memory_cache_component._last_cleanup
# Disk cleanup configuration
self._disk_cleanup_interval_hours = 24 # Run cleanup every 24 hours
self._disk_cleanup_interval = 3600.0 # Minimum interval between cleanups (1 hour) for throttle
self._last_disk_cleanup = 0.0 # Timestamp of last disk cleanup
self._cleanup_thread: Optional[threading.Thread] = None
self._cleanup_stop_event = threading.Event() # Event to signal thread shutdown
self._retention_policies = {
'odds': 2, # Odds data: 2 days (lines move frequently)
'odds_live': 2, # Live odds: 2 days
'sports_live': 7, # Live sports: 7 days
'weather_current': 7, # Current weather: 7 days
'sports_recent': 7, # Recent games: 7 days
'news': 14, # News: 14 days
'sports_upcoming': 60, # Upcoming games: 60 days (schedules stable)
'sports_schedules': 60, # Schedules: 60 days
'team_info': 60, # Team info: 60 days
'stocks': 14, # Stock data: 14 days
'crypto': 14, # Crypto data: 14 days
'default': 30 # Default: 30 days
}
# Start background cleanup thread only if disk caching is enabled
if self.cache_dir:
self.start_cleanup_thread()
def _get_writable_cache_dir(self) -> Optional[str]:
"""Tries to find or create a writable cache directory, preferring a system path when available."""
@@ -555,7 +580,150 @@ class CacheManager:
except (OSError, IOError, PermissionError) as e:
self.logger.error(f"Failed to set up persistent cache directory {cache_dir}: {e}", exc_info=True)
return False
return False
def cleanup_disk_cache(self, force: bool = False) -> Dict[str, Any]:
"""
Clean up expired disk cache files based on retention policies.
Args:
force: If True, run cleanup regardless of last cleanup time
Returns:
Dictionary with cleanup statistics
"""
now = time.time()
# Check if cleanup is needed (throttle to prevent too-frequent cleanups)
if not force and (now - self._last_disk_cleanup) < self._disk_cleanup_interval:
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 0,
'duration_sec': 0.0
}
start_time = time.time()
try:
# Perform cleanup
stats = self._disk_cache_component.cleanup_expired_files(
cache_strategy=self._strategy_component,
retention_policies=self._retention_policies
)
duration = time.time() - start_time
space_freed_mb = stats['space_freed_bytes'] / (1024 * 1024)
# Record metrics
self._metrics_component.record_disk_cleanup(
files_cleaned=stats['files_deleted'],
space_freed_mb=space_freed_mb,
duration_sec=duration
)
# Log summary
if stats['files_deleted'] > 0:
self.logger.info(
"Disk cache cleanup completed: %d/%d files deleted, %.2f MB freed, %d errors, took %.2fs",
stats['files_deleted'], stats['files_scanned'], space_freed_mb,
stats['errors'], duration
)
else:
self.logger.debug(
"Disk cache cleanup completed: no files to delete (%d files scanned)",
stats['files_scanned']
)
# Update last cleanup time
self._last_disk_cleanup = time.time()
return {
'files_scanned': stats['files_scanned'],
'files_deleted': stats['files_deleted'],
'space_freed_mb': space_freed_mb,
'errors': stats['errors'],
'duration_sec': duration
}
except Exception as e:
self.logger.error("Error during disk cache cleanup: %s", e, exc_info=True)
return {
'files_scanned': 0,
'files_deleted': 0,
'space_freed_mb': 0.0,
'errors': 1,
'duration_sec': time.time() - start_time
}
def start_cleanup_thread(self) -> None:
"""Start background thread for periodic disk cache cleanup."""
if self._cleanup_thread and self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread already running")
return
def cleanup_loop():
"""Background loop that runs cleanup periodically."""
self.logger.info("Disk cache cleanup thread started (interval: %d hours)",
self._disk_cleanup_interval_hours)
# Run initial cleanup on startup (deferred from __init__ to avoid blocking)
try:
self.logger.debug("Running initial disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in initial cleanup: %s", e, exc_info=True)
# Main cleanup loop
while not self._cleanup_stop_event.is_set():
try:
# Sleep for the configured interval (interruptible)
sleep_seconds = self._disk_cleanup_interval_hours * 3600
if self._cleanup_stop_event.wait(timeout=sleep_seconds):
# Event was set, exit loop
break
# Run cleanup if not stopped
if not self._cleanup_stop_event.is_set():
self.logger.debug("Running scheduled disk cache cleanup")
self.cleanup_disk_cache()
except Exception as e:
self.logger.error("Error in cleanup thread: %s", e, exc_info=True)
# Continue running despite errors, but use interruptible sleep
if self._cleanup_stop_event.wait(timeout=60):
# Event was set during error recovery sleep, exit loop
break
self.logger.info("Disk cache cleanup thread stopped")
self._cleanup_stop_event.clear() # Reset event before starting thread
self._cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True, name="DiskCacheCleanup")
self._cleanup_thread.start()
self.logger.info("Started disk cache cleanup background thread")
def stop_cleanup_thread(self) -> None:
"""
Stop the background cleanup thread gracefully.
Signals the thread to stop and waits for it to finish (with timeout).
This allows for clean shutdown during testing or application termination.
"""
if not self._cleanup_thread or not self._cleanup_thread.is_alive():
self.logger.debug("Cleanup thread not running")
return
self.logger.info("Stopping disk cache cleanup thread...")
self._cleanup_stop_event.set() # Signal thread to stop
# Wait for thread to finish (with timeout to avoid hanging)
self._cleanup_thread.join(timeout=5.0)
if self._cleanup_thread.is_alive():
self.logger.warning("Cleanup thread did not stop within timeout, thread may still be running")
else:
self.logger.info("Disk cache cleanup thread stopped successfully")
def get_sport_live_interval(self, sport_key: str) -> int:
"""
@@ -685,56 +853,23 @@ class CacheManager:
def record_cache_hit(self, cache_type: str = 'regular') -> None:
"""Record a cache hit for performance monitoring."""
with self._cache_lock:
if cache_type == 'background':
self._cache_metrics['background_hits'] += 1
else:
self._cache_metrics['hits'] += 1
self._metrics_component.record_hit(cache_type)
def record_cache_miss(self, cache_type: str = 'regular') -> None:
"""Record a cache miss for performance monitoring."""
with self._cache_lock:
if cache_type == 'background':
self._cache_metrics['background_misses'] += 1
else:
self._cache_metrics['misses'] += 1
self._cache_metrics['api_calls_saved'] += 1
self._metrics_component.record_miss(cache_type)
def record_fetch_time(self, duration: float) -> None:
"""Record fetch operation duration for performance monitoring."""
with self._cache_lock:
self._cache_metrics['total_fetch_time'] += duration
self._cache_metrics['fetch_count'] += 1
self._metrics_component.record_fetch_time(duration)
def get_cache_metrics(self) -> Dict[str, Any]:
"""Get current cache performance metrics."""
with self._cache_lock:
total_hits = self._cache_metrics['hits'] + self._cache_metrics['background_hits']
total_misses = self._cache_metrics['misses'] + self._cache_metrics['background_misses']
total_requests = total_hits + total_misses
avg_fetch_time = (self._cache_metrics['total_fetch_time'] /
self._cache_metrics['fetch_count']) if self._cache_metrics['fetch_count'] > 0 else 0.0
return {
'total_requests': total_requests,
'cache_hit_rate': total_hits / total_requests if total_requests > 0 else 0.0,
'background_hit_rate': (self._cache_metrics['background_hits'] /
(self._cache_metrics['background_hits'] + self._cache_metrics['background_misses'])
if (self._cache_metrics['background_hits'] + self._cache_metrics['background_misses']) > 0 else 0.0),
'api_calls_saved': self._cache_metrics['api_calls_saved'],
'average_fetch_time': avg_fetch_time,
'total_fetch_time': self._cache_metrics['total_fetch_time'],
'fetch_count': self._cache_metrics['fetch_count']
}
return self._metrics_component.get_metrics()
def log_cache_metrics(self) -> None:
"""Log current cache performance metrics."""
metrics = self.get_cache_metrics()
self.logger.info(f"Cache Performance - Hit Rate: {metrics['cache_hit_rate']:.2%}, "
f"Background Hit Rate: {metrics['background_hit_rate']:.2%}, "
f"API Calls Saved: {metrics['api_calls_saved']}, "
f"Avg Fetch Time: {metrics['average_fetch_time']:.2f}s")
self._metrics_component.log_metrics()
def get_memory_cache_stats(self) -> Dict[str, Any]:
"""