mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-05-15 10:03:31 +00:00
Remove unused imports across 86 files in src/, web_interface/, test/, and scripts/ using autoflake. No logic changes — only dead import statements and unused names in from-imports are removed. Also remove bare exception aliases where the variable is never referenced in the handler body: - src/cache/disk_cache.py: except (IOError, OSError, PermissionError) as e - src/cache_manager.py: except (OSError, IOError, PermissionError) as perm_error - src/plugin_system/resource_monitor.py: except Exception as e - web_interface/app.py: except Exception as read_err 86 files changed, 205 lines removed, 18 pre-existing test failures unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
472 lines
16 KiB
Python
472 lines
16 KiB
Python
"""
|
|
Configuration Service
|
|
|
|
Provides centralized configuration management with hot-reload support,
|
|
versioning, and change notifications.
|
|
|
|
This service wraps ConfigManager and adds:
|
|
- File watching for automatic reload
|
|
- Configuration versioning
|
|
- Change notifications to subscribers
|
|
- Thread-safe configuration access
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, List, Callable
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
import logging
|
|
import hashlib
|
|
|
|
from src.exceptions import ConfigError
|
|
from src.logging_config import get_logger
|
|
from src.config_manager import ConfigManager
|
|
|
|
|
|
class ConfigVersion:
|
|
"""Represents a configuration version snapshot."""
|
|
|
|
def __init__(self, config: Dict[str, Any], version: int, timestamp: datetime, checksum: str):
|
|
"""
|
|
Initialize a configuration version.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
version: Version number
|
|
timestamp: When this version was created
|
|
checksum: MD5 checksum of the config
|
|
"""
|
|
self.config: Dict[str, Any] = config
|
|
self.version: int = version
|
|
self.timestamp: datetime = timestamp
|
|
self.checksum: str = checksum
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert version to dictionary."""
|
|
return {
|
|
'version': self.version,
|
|
'timestamp': self.timestamp.isoformat(),
|
|
'checksum': self.checksum,
|
|
'config_size': len(json.dumps(self.config))
|
|
}
|
|
|
|
|
|
class ConfigService:
|
|
"""
|
|
Centralized configuration service with hot-reload and versioning.
|
|
|
|
Features:
|
|
- Automatic file watching and reload
|
|
- Configuration versioning with history
|
|
- Change notifications to subscribers
|
|
- Thread-safe access
|
|
- Backward compatible with ConfigManager
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config_manager: Optional[ConfigManager] = None,
|
|
enable_hot_reload: bool = True,
|
|
max_versions: int = 10
|
|
) -> None:
|
|
"""
|
|
Initialize the configuration service.
|
|
|
|
Args:
|
|
config_manager: Optional ConfigManager instance (creates new if None)
|
|
enable_hot_reload: Whether to enable automatic file watching
|
|
max_versions: Maximum number of versions to keep in history
|
|
"""
|
|
self.logger: logging.Logger = get_logger(__name__)
|
|
self.config_manager: ConfigManager = config_manager or ConfigManager()
|
|
self.enable_hot_reload: bool = enable_hot_reload
|
|
self.max_versions: int = max_versions
|
|
|
|
# Thread safety
|
|
self._lock: threading.RLock = threading.RLock()
|
|
|
|
# Current configuration
|
|
self._current_config: Dict[str, Any] = {}
|
|
self._current_version: int = 0
|
|
self._last_modified: Dict[str, float] = {}
|
|
|
|
# Version history
|
|
self._versions: List[ConfigVersion] = []
|
|
|
|
# Subscribers for change notifications
|
|
# Format: {plugin_id or component_name: [callbacks]}
|
|
self._subscribers: Dict[str, List[Callable[[Dict[str, Any], Dict[str, Any]], None]]] = defaultdict(list)
|
|
|
|
# File watching
|
|
self._watch_thread: Optional[threading.Thread] = None
|
|
self._watch_interval: float = 2.0 # Check every 2 seconds
|
|
self._stop_watching: bool = False
|
|
|
|
# Load initial configuration
|
|
self._load_config()
|
|
|
|
# Start file watching if enabled
|
|
if self.enable_hot_reload:
|
|
self._start_file_watching()
|
|
|
|
def _calculate_checksum(self, config: Dict[str, Any]) -> str:
|
|
"""Calculate checksum of configuration for change detection."""
|
|
config_str = json.dumps(config, sort_keys=True)
|
|
return hashlib.sha256(config_str.encode()).hexdigest()
|
|
|
|
def _load_config(self) -> bool:
|
|
"""
|
|
Load configuration from ConfigManager.
|
|
|
|
Returns:
|
|
True if config changed, False otherwise
|
|
"""
|
|
try:
|
|
new_config = self.config_manager.load_config()
|
|
new_checksum = self._calculate_checksum(new_config)
|
|
|
|
with self._lock:
|
|
# Check if config actually changed
|
|
if self._current_version > 0:
|
|
old_checksum = self._versions[-1].checksum if self._versions else ""
|
|
if new_checksum == old_checksum:
|
|
self.logger.debug("Configuration unchanged, skipping reload")
|
|
return False
|
|
|
|
# Store old config for change detection
|
|
old_config = self._current_config.copy()
|
|
|
|
# Create new version
|
|
self._current_version += 1
|
|
version = ConfigVersion(
|
|
config=new_config.copy(),
|
|
version=self._current_version,
|
|
timestamp=datetime.now(),
|
|
checksum=new_checksum
|
|
)
|
|
|
|
# Add to history
|
|
self._versions.append(version)
|
|
|
|
# Trim history if needed
|
|
if len(self._versions) > self.max_versions:
|
|
self._versions.pop(0)
|
|
|
|
# Update current config
|
|
self._current_config = new_config
|
|
|
|
# Notify subscribers
|
|
self._notify_subscribers(old_config, new_config)
|
|
|
|
self.logger.info(
|
|
"Configuration reloaded (version %d, checksum: %s)",
|
|
self._current_version,
|
|
new_checksum[:8]
|
|
)
|
|
|
|
return True
|
|
|
|
except ConfigError as e:
|
|
self.logger.error("Error loading configuration: %s", e, exc_info=True)
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error("Unexpected error loading configuration: %s", e, exc_info=True)
|
|
return False
|
|
|
|
def _notify_subscribers(self, old_config: Dict[str, Any], new_config: Dict[str, Any]) -> None:
|
|
"""
|
|
Notify all subscribers of configuration changes.
|
|
|
|
Args:
|
|
old_config: Previous configuration
|
|
new_config: New configuration
|
|
"""
|
|
# Notify global subscribers (key: '*')
|
|
for callback in self._subscribers.get('*', []):
|
|
try:
|
|
callback(old_config, new_config)
|
|
except Exception as e:
|
|
self.logger.error("Error in global config change callback: %s", e, exc_info=True)
|
|
|
|
# Notify plugin-specific subscribers
|
|
for plugin_id in self._subscribers.keys():
|
|
if plugin_id == '*':
|
|
continue
|
|
|
|
old_plugin_config = old_config.get(plugin_id, {})
|
|
new_plugin_config = new_config.get(plugin_id, {})
|
|
|
|
# Only notify if plugin config actually changed
|
|
if old_plugin_config != new_plugin_config:
|
|
for callback in self._subscribers[plugin_id]:
|
|
try:
|
|
callback(old_plugin_config, new_plugin_config)
|
|
except Exception as e:
|
|
self.logger.error(
|
|
"Error in config change callback for %s: %s",
|
|
plugin_id,
|
|
e,
|
|
exc_info=True
|
|
)
|
|
|
|
def _check_file_changes(self) -> bool:
|
|
"""
|
|
Check if configuration files have been modified.
|
|
|
|
Returns:
|
|
True if files changed, False otherwise
|
|
"""
|
|
config_path = Path(self.config_manager.get_config_path())
|
|
secrets_path = Path(self.config_manager.get_secrets_path())
|
|
|
|
changed = False
|
|
|
|
# Check main config file
|
|
if config_path.exists():
|
|
mtime = config_path.stat().st_mtime
|
|
if mtime != self._last_modified.get(str(config_path), 0):
|
|
self._last_modified[str(config_path)] = mtime
|
|
changed = True
|
|
|
|
# Check secrets file
|
|
if secrets_path.exists():
|
|
mtime = secrets_path.stat().st_mtime
|
|
if mtime != self._last_modified.get(str(secrets_path), 0):
|
|
self._last_modified[str(secrets_path)] = mtime
|
|
changed = True
|
|
|
|
return changed
|
|
|
|
def _file_watcher_loop(self) -> None:
|
|
"""Main loop for file watching."""
|
|
self.logger.info("Configuration file watcher started")
|
|
|
|
# Initialize last modified times
|
|
config_path = Path(self.config_manager.get_config_path())
|
|
secrets_path = Path(self.config_manager.get_secrets_path())
|
|
|
|
if config_path.exists():
|
|
self._last_modified[str(config_path)] = config_path.stat().st_mtime
|
|
if secrets_path.exists():
|
|
self._last_modified[str(secrets_path)] = secrets_path.stat().st_mtime
|
|
|
|
while not self._stop_watching:
|
|
try:
|
|
if self._check_file_changes():
|
|
self.logger.info("Configuration files changed, reloading...")
|
|
self._load_config()
|
|
|
|
# Sleep with periodic checks for stop signal
|
|
for _ in range(int(self._watch_interval)):
|
|
if self._stop_watching:
|
|
break
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
self.logger.error("Error in file watcher loop: %s", e, exc_info=True)
|
|
time.sleep(self._watch_interval)
|
|
|
|
self.logger.info("Configuration file watcher stopped")
|
|
|
|
def _start_file_watching(self) -> None:
|
|
"""Start the file watching thread."""
|
|
if self._watch_thread and self._watch_thread.is_alive():
|
|
return
|
|
|
|
self._stop_watching = False
|
|
self._watch_thread = threading.Thread(
|
|
target=self._file_watcher_loop,
|
|
name="ConfigService-Watcher",
|
|
daemon=True
|
|
)
|
|
self._watch_thread.start()
|
|
self.logger.debug("File watching thread started")
|
|
|
|
def _stop_file_watching(self) -> None:
|
|
"""Stop the file watching thread."""
|
|
if self._watch_thread and self._watch_thread.is_alive():
|
|
self._stop_watching = True
|
|
self._watch_thread.join(timeout=5.0)
|
|
if self._watch_thread.is_alive():
|
|
self.logger.warning("File watching thread did not stop gracefully")
|
|
|
|
def get_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Get current configuration (thread-safe).
|
|
|
|
Returns:
|
|
Current configuration dictionary
|
|
"""
|
|
with self._lock:
|
|
return self._current_config.copy()
|
|
|
|
def get_plugin_config(self, plugin_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get configuration for a specific plugin.
|
|
|
|
Args:
|
|
plugin_id: Plugin identifier
|
|
|
|
Returns:
|
|
Plugin configuration dictionary
|
|
"""
|
|
config = self.get_config()
|
|
return config.get(plugin_id, {})
|
|
|
|
def subscribe(
|
|
self,
|
|
callback: Callable[[Dict[str, Any], Dict[str, Any]], None],
|
|
plugin_id: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
Subscribe to configuration changes.
|
|
|
|
Args:
|
|
callback: Function to call when config changes
|
|
Signature: callback(old_config, new_config)
|
|
plugin_id: Optional plugin ID to subscribe to specific plugin changes
|
|
If None, subscribes to all changes
|
|
"""
|
|
key = plugin_id or '*'
|
|
with self._lock:
|
|
if callback not in self._subscribers[key]:
|
|
self._subscribers[key].append(callback)
|
|
self.logger.debug("Subscribed to config changes for %s", key)
|
|
|
|
def unsubscribe(
|
|
self,
|
|
callback: Callable[[Dict[str, Any], Dict[str, Any]], None],
|
|
plugin_id: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
Unsubscribe from configuration changes.
|
|
|
|
Args:
|
|
callback: Callback function to remove
|
|
plugin_id: Optional plugin ID (must match subscription)
|
|
"""
|
|
key = plugin_id or '*'
|
|
with self._lock:
|
|
if callback in self._subscribers[key]:
|
|
self._subscribers[key].remove(callback)
|
|
self.logger.debug("Unsubscribed from config changes for %s", key)
|
|
|
|
def reload(self) -> bool:
|
|
"""
|
|
Manually reload configuration.
|
|
|
|
Returns:
|
|
True if reloaded successfully, False otherwise
|
|
"""
|
|
self.logger.info("Manual configuration reload requested")
|
|
return self._load_config()
|
|
|
|
def get_version(self) -> int:
|
|
"""
|
|
Get current configuration version.
|
|
|
|
Returns:
|
|
Current version number
|
|
"""
|
|
with self._lock:
|
|
return self._current_version
|
|
|
|
def get_version_history(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get configuration version history.
|
|
|
|
Returns:
|
|
List of version dictionaries
|
|
"""
|
|
with self._lock:
|
|
return [v.to_dict() for v in self._versions]
|
|
|
|
def get_version_config(self, version: int) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get configuration for a specific version.
|
|
|
|
Args:
|
|
version: Version number
|
|
|
|
Returns:
|
|
Configuration dictionary or None if version not found
|
|
"""
|
|
with self._lock:
|
|
for v in self._versions:
|
|
if v.version == version:
|
|
return v.config.copy()
|
|
return None
|
|
|
|
def rollback(self, version: int) -> bool:
|
|
"""
|
|
Rollback to a previous configuration version.
|
|
|
|
Args:
|
|
version: Version number to rollback to
|
|
|
|
Returns:
|
|
True if rollback successful, False otherwise
|
|
"""
|
|
config = self.get_version_config(version)
|
|
if config is None:
|
|
self.logger.error("Version %d not found in history", version)
|
|
return False
|
|
|
|
try:
|
|
# Save the rolled-back config
|
|
self.config_manager.save_config(config)
|
|
|
|
# Reload
|
|
return self._load_config()
|
|
|
|
except Exception as e:
|
|
self.logger.error("Error rolling back to version %d: %s", version, e, exc_info=True)
|
|
return False
|
|
|
|
def save_config(self, new_config: Dict[str, Any]) -> bool:
|
|
"""
|
|
Save new configuration.
|
|
|
|
Args:
|
|
new_config: New configuration dictionary
|
|
|
|
Returns:
|
|
True if saved successfully, False otherwise
|
|
"""
|
|
try:
|
|
self.config_manager.save_config(new_config)
|
|
return self._load_config()
|
|
except Exception as e:
|
|
self.logger.error("Error saving configuration: %s", e, exc_info=True)
|
|
return False
|
|
|
|
def shutdown(self) -> None:
|
|
"""Shutdown the configuration service."""
|
|
self.logger.info("Shutting down configuration service")
|
|
self._stop_file_watching()
|
|
|
|
with self._lock:
|
|
self._subscribers.clear()
|
|
|
|
# Backward compatibility methods
|
|
def load_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Load configuration (backward compatibility with ConfigManager).
|
|
|
|
Returns:
|
|
Current configuration dictionary
|
|
"""
|
|
return self.get_config()
|
|
|
|
def get_config_path(self) -> str:
|
|
"""Get config file path (backward compatibility)."""
|
|
return self.config_manager.get_config_path()
|
|
|
|
def get_secrets_path(self) -> str:
|
|
"""Get secrets file path (backward compatibility)."""
|
|
return self.config_manager.get_secrets_path()
|
|
|