Files
LEDMatrix/src/plugin_system/schema_manager.py
Chuck 8fb2800495 feat: add error detection, monitoring, and code quality improvements (#223)
* feat: add error detection, monitoring, and code quality improvements

This comprehensive update addresses automatic error detection, code
quality, and plugin development experience:

## Error Detection & Monitoring
- Add ErrorAggregator service for centralized error tracking
- Add pattern detection for recurring errors (5+ in 60 min)
- Add error dashboard API endpoints (/api/v3/errors/*)
- Integrate error recording into plugin executor

## Code Quality
- Remove 10 silent `except: pass` blocks in sports.py and football.py
- Remove hardcoded debug log paths
- Add pre-commit hooks to prevent future bare except clauses

## Validation & Type Safety
- Add warnings when plugins lack config_schema.json
- Add config key collision detection for plugins
- Improve type coercion logging in BasePlugin

## Testing
- Add test_config_validation_edge_cases.py
- Add test_plugin_loading_failures.py
- Add test_error_aggregator.py

## Documentation
- Add PLUGIN_ERROR_HANDLING.md guide
- Add CONFIG_DEBUGGING.md guide

Note: GitHub Actions CI workflow is available in the plan but requires
workflow scope to push. Add .github/workflows/ci.yml manually.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: address code review issues

- Fix GitHub issues URL in CONFIG_DEBUGGING.md
- Use RLock in error_aggregator.py to prevent deadlock in export_to_file
- Distinguish missing vs invalid schema files in plugin_manager.py
- Add assertions to test_null_value_for_required_field test
- Remove unused initial_count variable in test_plugin_load_error_recorded
- Add validation for max_age_hours in clear_old_errors API endpoint

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 10:05:09 -05:00

507 lines
21 KiB
Python

"""
Schema Manager
Manages plugin configuration schemas with caching, validation, and reliable path resolution.
Provides utilities for extracting defaults, validating configurations, and managing schema lifecycle.
"""
import copy
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import jsonschema
from jsonschema import Draft7Validator, ValidationError
class SchemaManager:
"""
Manages plugin configuration schemas with caching and validation.
Features:
- Schema loading and caching
- Default value extraction from schemas
- Configuration validation against schemas
- Reliable path resolution for schema files
- Cache invalidation on plugin changes
"""
def __init__(self, plugins_dir: Optional[Path] = None, project_root: Optional[Path] = None, logger: Optional[logging.Logger] = None):
"""
Initialize the Schema Manager.
Args:
plugins_dir: Base plugins directory path
project_root: Project root directory path
logger: Optional logger instance
"""
self.logger = logger or logging.getLogger(__name__)
self.plugins_dir = plugins_dir
self.project_root = project_root or Path.cwd()
# Schema cache: plugin_id -> schema dict
self._schema_cache: Dict[str, Dict[str, Any]] = {}
# Default config cache: plugin_id -> default config dict
self._defaults_cache: Dict[str, Dict[str, Any]] = {}
def get_schema_path(self, plugin_id: str) -> Optional[Path]:
"""
Get the path to a plugin's config_schema.json file.
Tries multiple locations in order:
1. plugins_dir / plugin_id / config_schema.json
2. PROJECT_ROOT / plugins / plugin_id / config_schema.json
3. PROJECT_ROOT / plugin-repos / plugin_id / config_schema.json
Args:
plugin_id: Plugin identifier
Returns:
Path to schema file or None if not found
"""
possible_paths = []
# Try plugins_dir if set
if self.plugins_dir:
possible_paths.append(self.plugins_dir / plugin_id / 'config_schema.json')
# Try standard locations relative to project root
possible_paths.extend([
self.project_root / 'plugins' / plugin_id / 'config_schema.json',
self.project_root / 'plugin-repos' / plugin_id / 'config_schema.json',
])
# Try case-insensitive directory matching
for base_dir in [self.project_root / 'plugins', self.project_root / 'plugin-repos']:
if base_dir.exists():
for item in base_dir.iterdir():
if item.is_dir() and item.name.lower() == plugin_id.lower():
possible_paths.append(item / 'config_schema.json')
# Try each path
for path in possible_paths:
if path.exists():
self.logger.debug(f"Found schema for {plugin_id} at {path}")
return path
self.logger.warning(f"Schema file not found for plugin {plugin_id}")
return None
def load_schema(self, plugin_id: str, use_cache: bool = True) -> Optional[Dict[str, Any]]:
"""
Load a plugin's configuration schema.
Args:
plugin_id: Plugin identifier
use_cache: If True, return cached schema if available
Returns:
Schema dictionary or None if not found
"""
# Check cache first
if use_cache and plugin_id in self._schema_cache:
return self._schema_cache[plugin_id]
schema_path = self.get_schema_path(plugin_id)
if not schema_path:
return None
try:
with open(schema_path, 'r', encoding='utf-8') as f:
schema = json.load(f)
# Validate schema structure (basic check)
if not isinstance(schema, dict):
self.logger.error(f"Invalid schema format for {plugin_id}: not a dictionary")
return None
# Cache the schema
self._schema_cache[plugin_id] = schema
# Invalidate defaults cache when schema changes
if plugin_id in self._defaults_cache:
del self._defaults_cache[plugin_id]
return schema
except json.JSONDecodeError as e:
self.logger.error(f"Invalid JSON in schema file for {plugin_id}: {e}")
return None
except Exception as e:
self.logger.error(f"Error loading schema for {plugin_id}: {e}")
return None
def invalidate_cache(self, plugin_id: Optional[str] = None) -> None:
"""
Invalidate schema cache for a plugin or all plugins.
Args:
plugin_id: Plugin identifier to invalidate, or None to clear all
"""
if plugin_id:
self._schema_cache.pop(plugin_id, None)
self._defaults_cache.pop(plugin_id, None)
self.logger.debug(f"Invalidated cache for plugin {plugin_id}")
else:
self._schema_cache.clear()
self._defaults_cache.clear()
self.logger.debug("Invalidated all schema caches")
def extract_defaults_from_schema(self, schema: Dict[str, Any], prefix: str = '') -> Dict[str, Any]:
"""
Recursively extract default values from a JSON Schema.
Handles nested objects, arrays, and all schema types.
Args:
schema: JSON Schema dictionary
prefix: Optional prefix for logging/debugging
Returns:
Dictionary of default values
"""
defaults = {}
# Handle schema with properties
properties = schema.get('properties', {})
if not properties:
return defaults
for key, prop_schema in properties.items():
field_path = f"{prefix}.{key}" if prefix else key
# If property has a default, use it
if 'default' in prop_schema:
defaults[key] = prop_schema['default']
self.logger.debug(f"Found default for {field_path}: {prop_schema['default']}")
continue
# Handle nested objects
if prop_schema.get('type') == 'object' and 'properties' in prop_schema:
nested_defaults = self.extract_defaults_from_schema(prop_schema, field_path)
if nested_defaults:
defaults[key] = nested_defaults
# Handle arrays with object items
elif prop_schema.get('type') == 'array' and 'items' in prop_schema:
items_schema = prop_schema['items']
if items_schema.get('type') == 'object' and 'properties' in items_schema:
# For arrays of objects, use empty array as default
# Individual objects will use their defaults when created
defaults[key] = []
elif 'default' in items_schema:
# Array with default item value
defaults[key] = [items_schema['default']]
else:
# Empty array as default
defaults[key] = []
# For other types without defaults, don't add to defaults dict
# This allows plugins to handle missing values as needed
return defaults
def generate_default_config(self, plugin_id: str, use_cache: bool = True) -> Dict[str, Any]:
"""
Generate default configuration for a plugin from its schema.
Args:
plugin_id: Plugin identifier
use_cache: If True, return cached defaults if available
Returns:
Dictionary of default configuration values
"""
# Check cache first
if use_cache and plugin_id in self._defaults_cache:
return self._defaults_cache[plugin_id].copy()
schema = self.load_schema(plugin_id, use_cache=use_cache)
if not schema:
# Return minimal defaults if no schema
return {
'enabled': False,
'display_duration': 15
}
# Extract defaults from schema
defaults = self.extract_defaults_from_schema(schema)
# Ensure core properties have defaults (they may not be in the schema)
# These match BasePlugin behavior
if 'enabled' not in defaults:
defaults['enabled'] = schema.get('properties', {}).get('enabled', {}).get('default', True)
if 'display_duration' not in defaults:
defaults['display_duration'] = schema.get('properties', {}).get('display_duration', {}).get('default', 15)
if 'live_priority' not in defaults:
defaults['live_priority'] = schema.get('properties', {}).get('live_priority', {}).get('default', False)
# Cache the defaults
self._defaults_cache[plugin_id] = defaults.copy()
return defaults
def validate_config_against_schema(self, config: Dict[str, Any], schema: Dict[str, Any],
plugin_id: Optional[str] = None) -> Tuple[bool, List[str]]:
"""
Validate configuration against a JSON Schema.
Uses jsonschema library for comprehensive validation.
Automatically injects core plugin properties (enabled, display_duration, etc.)
into the schema before validation to ensure they're always allowed.
Args:
config: Configuration dictionary to validate
schema: JSON Schema dictionary
plugin_id: Optional plugin ID for error messages
Returns:
Tuple of (is_valid, list_of_error_messages)
"""
errors = []
try:
# Core plugin properties that should always be allowed
# These are handled by the base plugin system and should not cause validation failures
# Defaults match BasePlugin behavior: enabled=True, display_duration=15, live_priority=False
core_properties = {
"enabled": {
"type": "boolean",
"default": True,
"description": "Enable or disable this plugin"
},
"display_duration": {
"type": "number",
"default": 15,
"minimum": 1,
"maximum": 300,
"description": "How long to display this plugin in seconds"
},
"live_priority": {
"type": "boolean",
"default": False,
"description": "Enable live priority takeover when plugin has live content"
}
}
# Create a deep copy of the schema to modify (to avoid mutating the original)
enhanced_schema = copy.deepcopy(schema)
if "properties" not in enhanced_schema:
enhanced_schema["properties"] = {}
# Inject core properties if they're not already defined in the schema
# This ensures core properties are always allowed even if not in the plugin's schema
properties_added = []
for prop_name, prop_def in core_properties.items():
if prop_name not in enhanced_schema["properties"]:
enhanced_schema["properties"][prop_name] = copy.deepcopy(prop_def)
properties_added.append(prop_name)
# Log if we added any core properties (for debugging)
if properties_added and plugin_id:
self.logger.debug(
f"Injected core properties into schema for {plugin_id}: {properties_added}"
)
# Remove core properties from required array (they're system-managed)
# Core properties should be allowed but not required for validation
if "required" in enhanced_schema:
core_prop_names = list(core_properties.keys())
removed_from_required = [
field for field in enhanced_schema["required"]
if field in core_prop_names
]
enhanced_schema["required"] = [
field for field in enhanced_schema["required"]
if field not in core_prop_names
]
# Log if we removed any core properties from required (for debugging)
if removed_from_required and plugin_id:
self.logger.debug(
f"Removed core properties from required array for {plugin_id}: {removed_from_required}"
)
# Create validator with enhanced schema
validator = Draft7Validator(enhanced_schema)
# Collect all validation errors
for error in validator.iter_errors(config):
error_msg = self._format_validation_error(error, plugin_id)
errors.append(error_msg)
# Check required fields
required_fields = enhanced_schema.get('required', [])
for field in required_fields:
if field not in config:
errors.append(f"Missing required field: '{field}'")
if errors:
return False, errors
return True, []
except jsonschema.SchemaError as e:
error_msg = f"Schema error{' for ' + plugin_id if plugin_id else ''}: {str(e)}"
self.logger.error(error_msg)
return False, [error_msg]
except Exception as e:
error_msg = f"Validation error{' for ' + plugin_id if plugin_id else ''}: {str(e)}"
self.logger.error(error_msg)
return False, [error_msg]
def _format_validation_error(self, error: ValidationError, plugin_id: Optional[str] = None) -> str:
"""
Format a validation error into a readable message.
Args:
error: ValidationError from jsonschema
plugin_id: Optional plugin ID for context
Returns:
Formatted error message
"""
path = '.'.join(str(p) for p in error.path)
field_path = f"'{path}'" if path else "root"
if error.validator == 'required':
missing = error.validator_value
return f"Field {field_path}: Missing required property '{missing}'"
elif error.validator == 'type':
expected = error.validator_value
actual = type(error.instance).__name__
return f"Field {field_path}: Expected type {expected}, got {actual}"
elif error.validator == 'enum':
allowed = error.validator_value
return f"Field {field_path}: Value '{error.instance}' not in allowed values {allowed}"
elif error.validator in ['minimum', 'maximum']:
limit = error.validator_value
return f"Field {field_path}: Value {error.instance} violates {error.validator} constraint ({limit})"
elif error.validator in ['minLength', 'maxLength']:
limit = error.validator_value
return f"Field {field_path}: Length {len(error.instance)} violates {error.validator} constraint ({limit})"
elif error.validator in ['minItems', 'maxItems']:
limit = error.validator_value
return f"Field {field_path}: Array length {len(error.instance)} violates {error.validator} constraint ({limit})"
else:
return f"Field {field_path}: {error.message}"
def merge_with_defaults(self, config: Dict[str, Any], defaults: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge configuration with defaults, preserving user values.
Also replaces None values with defaults to ensure config never has None from the start.
Args:
config: User configuration
defaults: Default values from schema
Returns:
Merged configuration with defaults applied where missing or None
"""
merged = copy.deepcopy(defaults)
def deep_merge(target: Dict[str, Any], source: Dict[str, Any], default_dict: Dict[str, Any]) -> None:
"""Recursively merge source into target, replacing None with defaults."""
for key, value in source.items():
default_value = default_dict.get(key)
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
# Both are dicts, recursively merge
if isinstance(default_value, dict):
deep_merge(target[key], value, default_value)
else:
deep_merge(target[key], value, {})
elif value is None and default_value is not None:
# Value is None and we have a default, use the default
target[key] = copy.deepcopy(default_value) if isinstance(default_value, (dict, list)) else default_value
else:
# Normal merge: user value takes precedence (copy if dict/list)
if isinstance(value, (dict, list)):
target[key] = copy.deepcopy(value)
else:
target[key] = value
deep_merge(merged, config, defaults)
# Final pass: replace any remaining None values at any level with defaults
def replace_none_with_defaults(target: Dict[str, Any], default_dict: Dict[str, Any]) -> None:
"""Recursively replace None values with defaults."""
for key in list(target.keys()):
value = target[key]
default_value = default_dict.get(key)
if value is None and default_value is not None:
# Replace None with default
target[key] = copy.deepcopy(default_value) if isinstance(default_value, (dict, list)) else default_value
elif isinstance(value, dict) and isinstance(default_value, dict):
# Recursively process nested dicts
replace_none_with_defaults(value, default_value)
replace_none_with_defaults(merged, defaults)
return merged
def detect_config_key_collisions(
self,
plugin_ids: List[str]
) -> List[Dict[str, Any]]:
"""
Detect config key collisions between plugins.
Checks for:
1. Plugin IDs that collide with reserved system config keys
2. Plugin IDs that might cause confusion or conflicts
Args:
plugin_ids: List of plugin identifiers to check
Returns:
List of collision warnings, each containing:
- type: 'reserved_key_collision' or 'case_collision'
- plugin_id: The plugin ID involved
- message: Human-readable warning message
"""
collisions = []
# Reserved top-level config keys that plugins should not use as IDs
reserved_keys = {
'display', 'schedule', 'timezone', 'plugin_system',
'display_modes', 'system', 'hardware', 'debug',
'log_level', 'emulator', 'web_interface'
}
# Track plugin IDs for case collision detection
lowercase_ids: Dict[str, str] = {}
for plugin_id in plugin_ids:
# Check reserved key collision
if plugin_id.lower() in {k.lower() for k in reserved_keys}:
collisions.append({
"type": "reserved_key_collision",
"plugin_id": plugin_id,
"message": f"Plugin ID '{plugin_id}' conflicts with reserved config key. "
f"This may cause configuration issues."
})
# Check for case-insensitive collisions between plugins
lower_id = plugin_id.lower()
if lower_id in lowercase_ids:
existing_id = lowercase_ids[lower_id]
if existing_id != plugin_id:
collisions.append({
"type": "case_collision",
"plugin_id": plugin_id,
"conflicting_id": existing_id,
"message": f"Plugin ID '{plugin_id}' may conflict with '{existing_id}' "
f"on case-insensitive file systems."
})
else:
lowercase_ids[lower_id] = plugin_id
return collisions