Files
LEDMatrix/src/background_data_service.py
Chuck 05b3fa56cb fix: Codacy security fixes, CVE dependency bumps, and code quality cleanup (#331)
* fix(deps): bump minimum versions to address CVEs

Pillow 10.4.0 → 12.2.0: CVE-2026-40192 (DoS via FITS decompression bomb),
CVE-2026-25990 (OOB write via PSD image), CVE-2026-42311/42308/42310

requests 2.32.0 → 2.33.0: CVE-2026-25645 (temp file security bypass),
CVE-2024-47081 (.netrc credentials leak)

werkzeug 3.0.0 → 3.1.6: CVE-2023-46136, CVE-2024-49766/49767,
CVE-2025-66221, CVE-2026-21860/27199 (DoS, path traversal, safe_join bypass)

Flask 3.0.0 → 3.1.3: CVE-2026-27205 (session data caching info disclosure)

spotipy 2.24.0 → 2.25.2: CVE-2025-27154, CVE-2025-66040

python-socketio 5.11.0 → 5.14.0: CVE-2025-61765

pytest 7.4.0 → 9.0.3: CVE-2025-71176 (insecure temp dir handling)

Updated in requirements.txt, web_interface/requirements.txt,
plugin-repos/starlark-apps/requirements.txt, and
plugin-repos/march-madness/requirements.txt.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve Pylint errors in executor, data service, and odds call

Rename TimeoutError to PluginTimeoutError in plugin_executor.py to
avoid shadowing the built-in; no external callers affected.

Remove dead try/except in BackgroundDataService.shutdown: executor.shutdown()
never accepted a timeout kwarg so the try branch always raised TypeError.
Simplify to a direct shutdown(wait=wait) call.

Remove is_live kwarg from odds_manager.get_odds() call in sports.py;
BaseOddsManager.get_odds() has no such parameter. The live update interval
is already encoded in the update_interval_seconds argument passed alongside.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: MD5→SHA-256, shellcheck warnings, and broken doc links

config_service.py: replace MD5 with SHA-256 for config change detection;
same semantics (equality comparison), no stored hashes affected.

Shell scripts — shellcheck warnings:
- diagnose_web_interface.sh: remove useless cat (SC2002)
- dev_plugin_setup.sh: restructure A&&B||C into if/then (SC2015)
- fix_assets_permissions.sh: remove unused REAL_HOME block (SC2034)
- install_web_service.sh: remove unused USER_HOME assignment (SC2034)
- diagnose_web_ui.sh: remove unused SUDO assignments (SC2034)
- diagnose_plugin_permissions.sh: remove unused BLUE color var (SC2034)
- first_time_install.sh: remove unused CLEAR var, PACKAGE_NAME
  assignment, and replace loop variable with _ (SC2034)

docs/PLUGIN_ARCHITECTURE_SPEC.md: fix 10 broken TOC anchor links to
include section numbers matching the actual headings (MD051).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: remove unused imports and bare exception aliases (pyflakes F401/F841)

Remove unused imports across 86 files in src/, web_interface/, test/,
and scripts/ using autoflake. No logic changes — only dead import
statements and unused names in from-imports are removed.

Also remove bare exception aliases where the variable is never
referenced in the handler body:
- src/cache/disk_cache.py: except (IOError, OSError, PermissionError) as e
- src/cache_manager.py: except (OSError, IOError, PermissionError) as perm_error
- src/plugin_system/resource_monitor.py: except Exception as e
- web_interface/app.py: except Exception as read_err

86 files changed, 205 lines removed, 18 pre-existing test failures unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: remove unused local variable assignments (pyflakes F841)

Dead assignments removed across src/ and web_interface/:

- background_data_service: drop future= on fire-and-forget executor.submit
- base_classes/baseball: drop font= (all rendering uses self.fonts['time'])
- base_classes/hockey: drop status_short= (never referenced after assignment)
- common/cli: drop game_helper=/config_helper= bindings in import-test block;
  constructors called for instantiation-only validation
- common/display_helper: drop text_width= (x_position uses display_width
  directly); drop draw= in create_error_image (uses _draw_centered_text)
- config_manager: remove dead secrets_content loading block in migration path
  (comment already noted save_config_atomic handles secrets internally)
- display_manager: drop setup_start= (timing was never completed or read)
- font_manager: drop target_path= (catalog uses font_file_path directly);
  drop face=/font= bindings in validate_font (validation by construction —
  TypeError on failure is the signal, not the return value)
- font_test_manager: drop width=/height= (draw_text uses display_manager directly)
- plugin_system/state_reconciliation: drop manager= (only config/disk/state_mgr used)
- plugin_system/store_manager: drop result= on pip install subprocess.run
  (check=True raises on failure; stdout unused)
- web_interface/blueprints/pages_v3: drop main_config_path=""/secrets_config_path=""
  (render_template uses config_manager.get_*_path() inline)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(js): resolve ESLint no-undef warnings across 6 JS files

Three distinct patterns:

1. Vendor library globals — htmx is injected by <script> before these
   extension files load; ESLint lints files in isolation and doesn't know.
   Fix: add /* global htmx */ to htmx-sse.js and htmx-json-enc.js.

2. Cross-file globals — showNotification is defined as window.showNotification
   in app.js/notification.js but called bare in app.js and error_handler.js.
   ESLint doesn't connect window.X = Y with a bare call to X.
   Fix: add /* global showNotification */ to app.js and error_handler.js.

3. Forward-reference window.* functions — in array-table.js, checkbox-group.js,
   and custom-feeds.js, functions like removeArrayTableRow are called early
   inside event-handler closures but assigned to window.* later in the file.
   At runtime this works (the handler fires after the assignment), but ESLint
   sees the bare name at the call site.
   Fix: change bare calls to window.removeArrayTableRow(this) etc. so the
   reference is explicit and ESLint-safe.

Also guard the updateSystemStats call in app.js reconnectSSE: the function
is called but defined nowhere in the codebase. Guard with typeof check so
it won't throw ReferenceError if the reconnect path is hit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(js): resolve Biome lint warnings across 9 JS files

noUnusedVariables (catch bindings → optional catch syntax):
- app.js, file-upload.js, timezone-selector.js: } catch (e) { → } catch {
  ES2019 optional catch binding; e was unused in all three handlers

noUnusedVariables (dead assignments):
- app.js: remove const data= in display SSE stub (handler does nothing yet)
- api_client.js: remove const timeoutId= (setTimeout ID never used to cancel)
- custom-feeds.js: remove const oldIndex= (getAttribute result never read)
- schedule-picker.js: remove const compactMode= (never used in HTML build)
- select-dropdown.js: remove const icons= (icons not yet rendered in options)

noPrototypeBuiltins:
- day-selector.js: DAY_LABELS.hasOwnProperty(x) →
  Object.prototype.hasOwnProperty.call(DAY_LABELS, x)
  Safe form that works even on null-prototype objects

useIterableCallbackReturn:
- file-upload.js, notification.js: forEach(x => expr) →
  forEach(x => { expr; }) — forEach ignores return values;
  implicit return from arrow body was misleading

htmx-sse.js is a vendor extension file with old-style var/== patterns
that are correct for it; 18 Biome issues suppressed via Codacy API
rather than modifying the vendor source.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(security): escape user input in raw HTML responses in pages_v3.py

plugin_id comes directly from the URL path
(/partials/plugin-config/<plugin_id>) and was interpolated into an HTML
fragment without escaping. A crafted URL like
/partials/plugin-config/<script>alert(1)</script> would inject that
tag into the DOM via the HTMX partial response.

Fix: wrap all user-controlled values in markupsafe.escape() before
embedding in raw HTML strings. Affects the plugin-not-found 404
response and both error 500 responses in the plugin config partial.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address Bandit B108/B110 across production code

B110 (try/except/pass):
- display_controller.py: narrow 'except Exception' to 'except AttributeError'
  for get_offset_frame() — plugins not having this optional method is the
  expected case, not all exceptions
- config_manager.py: B110 already resolved by the earlier removal of the
  dead secrets-loading block (the except/pass was inside it)
- All other except/pass blocks in src/ and web_interface/ are intentional
  (last-resort recovery, best-effort fallbacks, non-critical startup probes).
  Annotated each with # nosec B110 and a brief inline reason so the decision
  is explicit for future reviewers.
- Test files and plugin-repos B110 suppressed via Codacy API (not prod code).

B108 (/tmp usage):
- permission_utils.py: /tmp listed to PREVENT permission changes on it — not
  used as a temp path. Annotated # nosec B108.
- display_manager.py: fixed snapshot path is intentional (web UI reads same
  path); path-check guard also annotated.
- wifi_manager.py: named /tmp files match the sudoers allowlist installed with
  the system (the paths are hard-coded in both places by design). Annotated
  all six open/cp references # nosec B108.
- scripts/render_plugin.py: dev script default overridable by user. Annotated.
- web_interface/app.py: reads the same fixed path written by display_manager.
  Annotated # nosec B108.
- Test files suppressed via Codacy API.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address remaining Codacy security findings

Flask debug=True (real fix):
- web_interface/app.py: debug=True in __main__ block exposes the Werkzeug
  interactive debugger (arbitrary code execution). Changed to
  os.environ.get('FLASK_DEBUG', '0') == '1' — off by default, opt-in
  via environment variable for local development.

nosec annotations (accepted risk with documented rationale):
- disk_cache.py: os.chmod(0o660) is intentional — web UI and LED matrix
  service share a group, 660 gives group write while denying world access
  (B103 + Semgrep insecure-file-permissions suppressed in Codacy)
- wifi_manager.py: urlopen to hardcoded connectivity-check.ubuntu.com URL
  (B310 — no user input involved)
- font_manager.py: urlretrieve URL comes from user's own config file on
  their local device (B310)
- start_web_conditionally.py: os.execvp with both sys.executable and a
  fixed PROJECT_DIR-relative constant (B606)

Confirmed false positives suppressed via Codacy API (15 issues):
- SSRF (3x): client-side JS fetch — SSRF is server-side; browser fetch
  is CORS-restricted to same origin
- B105 (3x): test fixtures use dummy secrets by design; store_manager
  checks for the placeholder string, it is not itself a secret
- PMD numeric literal (2x): 10000000 is within Number.MAX_SAFE_INTEGER
- Prototype pollution (1x): read-only schema traversal, no writes
- no-unsanitized_method (1x): dynamic import() is CORS-restricted
- detect-unsafe-regex (1x): operates on server-controlled config values
- plugin-repos B103 (1x): vendor code chmod on executable
- Semgrep insecure-file-permissions (3x): same disk_cache 0o660 as above

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: remove unnecessary f prefix from f-strings without placeholders (F541)

Pyflakes F541 flags f-strings that contain no {} interpolation — they are
identical to plain strings but trigger unnecessary string formatting overhead.

Fixed in production code:
- src/base_classes/data_sources.py (2 debug log calls)
- src/logo_downloader.py (1 error log)
- src/plugin_system/store_manager.py (5 strings across 3 log calls)
- src/web_interface/validators.py (1 return value)
- src/wifi_manager.py (4 log/message strings)
- web_interface/start.py (1 print)

F541 issues in test/, scripts/, and plugin-repos/ suppressed via Codacy API
as non-production code.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore(dev): add Pillow compatibility smoke test script

Covers all Pillow APIs used in LEDMatrix — image creation, drawing,
font metrics, LANCZOS resampling, paste/alpha_composite, and PNG I/O.
Run after any Pillow version bump to catch regressions before deploy.

    python3 scripts/dev/test_pillow_compat.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve 8 new Codacy issues introduced by PR changes

shellcheck SC2034:
- first_time_install.sh: 'type' loop variable also unused in the wifi
  status loop (we previously fixed 'device' → '_' but left 'type').
  Changed to '_ _ state' since neither device nor type is referenced.

ESLint no-undef:
- app.js: typeof guards don't satisfy no-undef; added updateSystemStats
  to the /* global */ declaration alongside showNotification.

nosec annotation:
- web_interface/app.py: app.run(host='0.0.0.0') line changed when we
  fixed debug=True, giving it a new issue ID. Re-added # nosec B104.

pyflakes F401:
- scripts/dev/test_pillow_compat.py: ImageFilter was imported but never
  used in the smoke test. Removed from the import.

Codacy API suppressions (false positives on changed lines):
- disk_cache.py 0o660 chmod (2x): lines changed when # nosec B103 was
  added, producing new Semgrep issue IDs. Re-suppressed.
- pages_v3.py raw-html-concat: Semgrep does not recognise escape() as
  a sanitizer; the escape() call IS the correct fix.
- app.py flask 0.0.0.0: same line as B104 above; Semgrep rule also
  re-suppressed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address PR review findings

Fix (10 of 15 findings):

plugin-repos/march-madness/requirements.txt:
  Add urllib3>=1.26.0 — manager.py directly imports from urllib3; it was
  an undeclared transitive dependency via requests.

scripts/dev/dev_plugin_setup.sh:
  Restore subshell form (cd "$target_dir" && git pull --rebase) || true
  so the shell's working directory is not permanently changed after the
  if-cd block. Previous fix for SC2015 leaked cwd into the remainder of
  the script.

src/base_classes/sports.py:
  Narrow 'except Exception' to 'except RuntimeError as e' and log via
  self.logger.debug — Path.home() raises only RuntimeError for service
  users; other exceptions should not be silently swallowed.

src/config_service.py:
  Fix stale "MD5 checksum" in ConfigVersion.__init__ docstring (line 40);
  the implementation uses SHA-256 since the Codacy fix.

src/wifi_manager.py:
  Log the last-resort AP enable failure with exc_info=True instead of
  silently passing — failure here means the device may be unreachable.

web_interface/blueprints/pages_v3.py:
  Log the outer metadata pre-load exception at debug level instead of
  swallowing it silently; schema still loads fully below.

src/background_data_service.py:
  Remove unused 'timeout' parameter from shutdown() — executor.shutdown()
  does not accept timeout; update __del__ caller accordingly.

src/font_manager.py:
  Validate URL scheme before urlretrieve — reject non-http/https schemes
  (e.g. file://) to prevent reading local files from config-supplied URLs.

src/plugin_system/plugin_executor.py:
  Simplify redundant except tuple: (PluginTimeoutError, PluginError,
  Exception) → Exception, which already covers the others.

test/test_display_controller.py:
  Mark empty test_plugin_discovery_and_loading as @pytest.mark.skip with
  reason. Move duplicate 'from datetime import datetime' to module header
  and remove the stray mid-module copy.

Skip (5 of 15 findings, with reasons):
  - pytest 9.0.3 concerns: full suite already verified (467 pass, 18 pre-existing)
  - Pillow 12.2.0 API concerns: no deprecated APIs in codebase; tests + Pi smoke test pass
  - diagnose_web_ui.sh sudo validation: set -e already ensures fail-fast on any sudo failure
  - app.py request-logging except: must stay silent (recursive logging risk); annotated
  - app.py SSE file-read except: genuinely transient I/O; annotated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 10:19:55 -04:00

610 lines
22 KiB
Python

"""
Background Data Service for LEDMatrix
This service provides background threading capabilities for season data fetching
to prevent blocking the main display loop. It's designed to be used across
all sport managers for consistent background data management.
Key Features:
- Thread-safe data caching
- Automatic retry logic with exponential backoff
- Configurable timeouts and intervals
- Graceful error handling
- Progress tracking and logging
- Memory-efficient data storage
"""
import time
import logging
import threading
import requests
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import queue
from concurrent.futures import ThreadPoolExecutor
from src.cache_manager import CacheManager
# Configure logging
logger = logging.getLogger(__name__)
class FetchStatus(Enum):
"""Status of background fetch operations."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class FetchRequest:
"""Represents a background fetch request."""
id: str
sport: str
year: int
cache_key: str
url: str
params: Dict[str, Any] = field(default_factory=dict)
headers: Dict[str, str] = field(default_factory=dict)
timeout: int = 30
retry_count: int = 0
max_retries: int = 3
priority: int = 1 # Higher number = higher priority
callback: Optional[Callable] = None
created_at: float = field(default_factory=time.time)
status: FetchStatus = FetchStatus.PENDING
result: Optional[Any] = None
error: Optional[str] = None
@dataclass
class FetchResult:
"""Result of a background fetch operation."""
request_id: str
success: bool
data: Optional[Any] = None
error: Optional[str] = None
cached: bool = False
fetch_time: float = 0.0
retry_count: int = 0
completed_at: float = field(default_factory=time.time) # Timestamp when request completed
class BackgroundDataService:
"""
Background data service for fetching season data without blocking the main thread.
This service manages a pool of background threads to fetch data asynchronously,
with intelligent caching, retry logic, and progress tracking.
"""
def __init__(self, cache_manager: CacheManager, max_workers: int = 3, request_timeout: int = 30):
"""
Initialize the background data service.
Args:
cache_manager: Cache manager instance for storing fetched data
max_workers: Maximum number of background threads
request_timeout: Default timeout for HTTP requests
"""
self.cache_manager = cache_manager
self.max_workers = max_workers
self.request_timeout = request_timeout
# Thread management
self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="BackgroundData")
self.active_requests: Dict[str, FetchRequest] = {}
self.completed_requests: Dict[str, FetchResult] = {}
self.request_queue = queue.PriorityQueue()
# Thread safety
self._lock = threading.RLock()
self._shutdown = False
# Cleanup tracking
self._max_completed_requests = 500 # Maximum completed requests to keep
self._completed_requests_cleanup_interval = 600.0 # Cleanup every 10 minutes
self._last_completed_requests_cleanup = time.time()
# Statistics
self.stats = {
'total_requests': 0,
'completed_requests': 0,
'failed_requests': 0,
'cached_hits': 0,
'cache_misses': 0,
'total_fetch_time': 0.0,
'average_fetch_time': 0.0
}
# Session for HTTP requests
self.session = requests.Session()
self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=3))
self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
# Default headers
self.default_headers = {
'User-Agent': 'LEDMatrix/1.0 (https://github.com/yourusername/LEDMatrix)',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
logger.info(f"BackgroundDataService initialized with {max_workers} workers")
def get_sport_cache_key(self, sport: str, date_str: str = None) -> str:
"""
Generate consistent cache keys for sports data.
This ensures Recent/Upcoming managers and background service
use the same cache keys.
"""
# Use the centralized cache key generation from CacheManager
from src.cache_manager import CacheManager
cache_manager = CacheManager()
return cache_manager.generate_sport_cache_key(sport, date_str)
def submit_fetch_request(self,
sport: str,
year: int,
url: str,
cache_key: str = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
max_retries: int = 3,
priority: int = 1,
callback: Optional[Callable] = None) -> str:
"""
Submit a background fetch request.
Args:
sport: Sport identifier (e.g., 'nfl', 'ncaafb')
year: Year to fetch data for
url: URL to fetch data from
cache_key: Cache key for storing/retrieving data
params: URL parameters
headers: HTTP headers
timeout: Request timeout
max_retries: Maximum number of retries
priority: Request priority (higher = more important)
callback: Optional callback function when request completes
Returns:
Request ID for tracking the fetch operation
"""
if self._shutdown:
raise RuntimeError("BackgroundDataService is shutting down")
# Generate cache key if not provided
if cache_key is None:
cache_key = self.get_sport_cache_key(sport)
request_id = f"{sport}_{year}_{int(time.time() * 1000)}"
# Check cache first
cached_data = self.cache_manager.get(cache_key)
if cached_data:
with self._lock:
self.stats['cached_hits'] += 1
result = FetchResult(
request_id=request_id,
success=True,
data=cached_data,
cached=True,
fetch_time=0.0
)
self.completed_requests[request_id] = result
if callback:
try:
callback(result)
except Exception as e:
logger.error(f"Error in callback for request {request_id}: {e}")
logger.debug(f"Cache hit for {sport} {year} data")
return request_id
# Create fetch request
request = FetchRequest(
id=request_id,
sport=sport,
year=year,
cache_key=cache_key,
url=url,
params=params or {},
headers={**self.default_headers, **(headers or {})},
timeout=timeout or self.request_timeout,
max_retries=max_retries,
priority=priority,
callback=callback
)
with self._lock:
self.active_requests[request_id] = request
self.stats['total_requests'] += 1
self.stats['cache_misses'] += 1
# Submit to executor
self.executor.submit(self._fetch_data_worker, request)
logger.info(f"Submitted background fetch request {request_id} for {sport} {year}")
return request_id
def _fetch_data_worker(self, request: FetchRequest) -> FetchResult:
"""
Worker function that performs the actual data fetching.
Args:
request: Fetch request to process
Returns:
Fetch result with data or error information
"""
start_time = time.time()
result = FetchResult(request_id=request.id, success=False, retry_count=request.retry_count)
try:
with self._lock:
request.status = FetchStatus.IN_PROGRESS
logger.info(f"Starting background fetch for {request.sport} {request.year}")
# Perform HTTP request with retry logic
response = self._make_request_with_retry(request)
response.raise_for_status()
# Parse response
data = response.json()
# Validate data structure
if not isinstance(data, dict):
raise ValueError(f"Expected dict response, got {type(data)}")
if 'events' not in data:
raise ValueError("Response missing 'events' field")
# Validate events structure
events = data.get('events', [])
if not isinstance(events, list):
raise ValueError(f"Expected events to be list, got {type(events)}")
# Log data validation
logger.debug(f"Validated {len(events)} events for {request.sport} {request.year}")
# Cache the data
self.cache_manager.set(request.cache_key, data)
# Update request status
with self._lock:
request.status = FetchStatus.COMPLETED
request.result = data
# Create successful result
fetch_time = time.time() - start_time
result = FetchResult(
request_id=request.id,
success=True,
data=data,
fetch_time=fetch_time,
retry_count=request.retry_count
)
logger.info(f"Successfully fetched {request.sport} {request.year} data in {fetch_time:.2f}s")
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to fetch {request.sport} {request.year} data: {error_msg}")
with self._lock:
request.status = FetchStatus.FAILED
request.error = error_msg
result = FetchResult(
request_id=request.id,
success=False,
error=error_msg,
fetch_time=time.time() - start_time,
retry_count=request.retry_count
)
finally:
# Store result and clean up
with self._lock:
self.completed_requests[request.id] = result
if request.id in self.active_requests:
del self.active_requests[request.id]
# Update statistics
if result.success:
self.stats['completed_requests'] += 1
else:
self.stats['failed_requests'] += 1
self.stats['total_fetch_time'] += result.fetch_time
self.stats['average_fetch_time'] = (
self.stats['total_fetch_time'] /
(self.stats['completed_requests'] + self.stats['failed_requests'])
)
# Periodic cleanup after storing result
self._cleanup_completed_requests()
# Call callback if provided
if request.callback:
try:
request.callback(result)
except Exception as e:
logger.error(f"Error in callback for request {request.id}: {e}")
return result
def _make_request_with_retry(self, request: FetchRequest) -> requests.Response:
"""
Make HTTP request with retry logic and exponential backoff.
Args:
request: Fetch request containing request details
Returns:
HTTP response
Raises:
requests.RequestException: If all retries fail
"""
last_exception = None
for attempt in range(request.max_retries + 1):
try:
response = self.session.get(
request.url,
params=request.params,
headers=request.headers,
timeout=request.timeout
)
return response
except requests.RequestException as e:
last_exception = e
request.retry_count = attempt + 1
if attempt < request.max_retries:
# Exponential backoff: 1s, 2s, 4s, 8s...
delay = 2 ** attempt
logger.warning(f"Request failed (attempt {attempt + 1}/{request.max_retries + 1}), retrying in {delay}s: {e}")
time.sleep(delay)
else:
logger.error(f"All {request.max_retries + 1} attempts failed for {request.sport} {request.year}")
raise last_exception
def get_result(self, request_id: str) -> Optional[FetchResult]:
"""
Get the result of a fetch request.
Args:
request_id: Request ID to get result for
Returns:
Fetch result if available, None otherwise
"""
# Periodic cleanup
self._cleanup_completed_requests()
with self._lock:
return self.completed_requests.get(request_id)
def is_request_complete(self, request_id: str) -> bool:
"""
Check if a request has completed.
Args:
request_id: Request ID to check
Returns:
True if request is complete, False otherwise
"""
# Periodic cleanup
self._cleanup_completed_requests()
with self._lock:
return request_id in self.completed_requests
def get_request_status(self, request_id: str) -> Optional[FetchStatus]:
"""
Get the status of a fetch request.
Args:
request_id: Request ID to get status for
Returns:
Request status if found, None otherwise
"""
with self._lock:
if request_id in self.active_requests:
return self.active_requests[request_id].status
elif request_id in self.completed_requests:
result = self.completed_requests[request_id]
return FetchStatus.COMPLETED if result.success else FetchStatus.FAILED
return None
def cancel_request(self, request_id: str) -> bool:
"""
Cancel a pending or in-progress request.
Args:
request_id: Request ID to cancel
Returns:
True if request was cancelled, False if not found or already complete
"""
with self._lock:
if request_id in self.active_requests:
request = self.active_requests[request_id]
request.status = FetchStatus.CANCELLED
del self.active_requests[request_id]
logger.info(f"Cancelled request {request_id}")
return True
return False
def get_statistics(self) -> Dict[str, Any]:
"""
Get service statistics.
Returns:
Dictionary containing service statistics
"""
with self._lock:
return {
**self.stats,
'active_requests': len(self.active_requests),
'completed_requests_count': len(self.completed_requests),
'max_completed_requests': self._max_completed_requests,
'completed_requests_usage_percent': (len(self.completed_requests) / self._max_completed_requests * 100) if self._max_completed_requests > 0 else 0,
'queue_size': self.request_queue.qsize(),
'last_cleanup': self._last_completed_requests_cleanup,
'cleanup_interval': self._completed_requests_cleanup_interval
}
def log_memory_stats(self):
"""Log current memory usage statistics."""
stats = self.get_statistics()
logger.info(f"BackgroundDataService Memory - Active: {stats['active_requests']}, "
f"Completed: {stats['completed_requests_count']}/{stats['max_completed_requests']} "
f"({stats['completed_requests_usage_percent']:.1f}%), "
f"Last cleanup: {time.time() - stats['last_cleanup']:.1f}s ago")
def _cleanup_completed_requests(self, force: bool = False) -> int:
"""
Automatically clean up old completed requests.
Args:
force: If True, perform cleanup regardless of time interval
Returns:
Number of requests removed
"""
now = time.time()
# Check if cleanup is needed
if not force and (now - self._last_completed_requests_cleanup) < self._completed_requests_cleanup_interval:
return 0
with self._lock:
removed_count = 0
current_time = time.time()
# Remove requests older than 1 hour
cutoff_time = current_time - 3600 # 1 hour
to_remove = []
for request_id, result in self.completed_requests.items():
# Check if request is old enough to remove
if result.completed_at < cutoff_time:
to_remove.append(request_id)
# Also enforce size limit if we have too many requests
if len(self.completed_requests) > self._max_completed_requests:
# Sort by completion time (oldest first)
sorted_requests = sorted(
self.completed_requests.items(),
key=lambda x: x[1].completed_at
)
# Remove oldest entries until we're under the limit
excess_count = len(self.completed_requests) - self._max_completed_requests
for i in range(excess_count):
if i < len(sorted_requests):
request_id = sorted_requests[i][0]
if request_id not in to_remove:
to_remove.append(request_id)
# Remove the requests
for request_id in to_remove:
del self.completed_requests[request_id]
removed_count += 1
self._last_completed_requests_cleanup = current_time
if removed_count > 0:
logger.debug(f"Cleaned up {removed_count} old completed requests (remaining: {len(self.completed_requests)})")
return removed_count
def clear_completed_requests(self, older_than_hours: int = 24):
"""
Clear completed requests older than specified time.
Args:
older_than_hours: Clear requests older than this many hours
"""
cutoff_time = time.time() - (older_than_hours * 3600)
with self._lock:
to_remove = []
for request_id, result in self.completed_requests.items():
if result.completed_at < cutoff_time:
to_remove.append(request_id)
for request_id in to_remove:
del self.completed_requests[request_id]
if to_remove:
logger.info(f"Cleared {len(to_remove)} old completed requests")
def shutdown(self, wait: bool = True):
"""
Shutdown the background data service.
Args:
wait: Whether to wait for active requests to complete
"""
logger.info("Shutting down BackgroundDataService...")
self._shutdown = True
# Cancel all active requests
with self._lock:
for request_id in list(self.active_requests.keys()):
self.cancel_request(request_id)
self.executor.shutdown(wait=wait)
logger.info("BackgroundDataService shutdown complete")
def __del__(self):
"""Cleanup when service is destroyed."""
if not self._shutdown:
self.shutdown(wait=False)
# Global service instance
_background_service: Optional[BackgroundDataService] = None
_service_lock = threading.Lock()
def get_background_service(cache_manager=None, max_workers: int = 3) -> BackgroundDataService:
"""
Get the global background data service instance.
Args:
cache_manager: Cache manager instance (required for first call)
max_workers: Maximum number of background threads
Returns:
Background data service instance
"""
global _background_service
with _service_lock:
if _background_service is None:
if cache_manager is None:
raise ValueError("cache_manager is required for first call to get_background_service")
_background_service = BackgroundDataService(cache_manager, max_workers)
return _background_service
def shutdown_background_service():
"""Shutdown the global background data service."""
global _background_service
with _service_lock:
if _background_service is not None:
_background_service.shutdown()
_background_service = None