Files
LEDMatrix/src/plugin_system/plugin_state.py
Chuck 65e3e8319b fix(plugin_manager): prevent permanent ERROR state after update timeout (#316)
* fix(plugin_manager): prevent permanent ERROR state after update timeout

When execute_update() fails (timeout or unhandled exception), the plugin
state was set to ERROR with no recovery path. can_execute() returns False
for ERROR state, so the plugin's update() was never called again, leaving
it showing stale data indefinitely.

Instead, update plugin_last_update so the plugin waits one configured
interval before retrying, and keep the state ENABLED so recovery is
automatic on the next cycle.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(plugin_manager): address PR review — failure timestamp and error context

- Use time.time() at the point of failure instead of reusing current_time
  (captured before execution), so the full retry interval always elapses
  after a timeout rather than one execution-duration shorter

- Add PluginStateManager.set_error_info() to persist structured error context
  without changing plugin state; call it in both failure branches so
  get_error_info() / get_state_info() surface recoverable errors alongside
  ERROR-state errors

- Add warning log on the success=False branch (was previously silent)

- Pass a descriptive Exception (not a generic "Plugin execution failed") to
  health_tracker.record_failure() in the timeout/executor-error path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(plugin_manager): atomic state+error write via set_state_with_error

The two-step set_state() / set_error_info() sequence left a window where
readers could observe ENABLED state without the accompanying error context.

Add threading.RLock to PluginStateManager and a new set_state_with_error()
method that holds the lock for both the state-transition write and the
_error_info write together. The method inlines the state-transition logic
rather than calling set_state() internally to intentionally skip the
"clear _error_info for non-ERROR states" side effect — the recoverable
error dict is exactly what we want stored.

Replace both paired set_state / set_error_info call sites in
run_scheduled_updates() with the single atomic method.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(plugin_state): lock _error_info accesses and store defensive copies

Three verified issues:

- set_error_info wrote _error_info without holding _lock and stored the
  caller's dict by reference, allowing races and post-write mutation
- set_state_with_error stored error_info by reference (lock was already held)
- get_error_info read _error_info without _lock and returned the live
  reference, letting callers mutate the stored snapshot

Implicit fourth fix: set_state also wrote _error_info without _lock; locking
get_error_info while leaving that writer unguarded would have created a new
race, so set_state is now wrapped in _lock too for consistency.

Changes:
- set_state: wrap entire body in self._lock (covers _states, _state_history,
  and _error_info writes atomically; ERROR-path _error_info value was already
  a fresh dict literal so no copy needed)
- set_error_info: acquire self._lock + store dict(error_info) shallow copy
- set_state_with_error: store dict(error_info) shallow copy (lock already held)
- get_error_info: acquire self._lock + return dict(info) copy or None

All stored values are flat dicts of strings/floats/bools, so shallow copy
is sufficient — deepcopy is not needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(plugin_manager): apply recovery logic to update_all_plugins; extract helper

update_all_plugins still set PluginState.ERROR on both failure paths, leaving
it inconsistent with the run_scheduled_updates fix from the same PR.

Extract _record_update_failure(plugin_id, exc=None) to hold all shared failure
logic: capture actual failure time, build structured error_info, log the retry
warning, stamp plugin_last_update, call set_state_with_error(ENABLED), and
forward to health_tracker. Replace all four failure sites (two in
run_scheduled_updates, two in update_all_plugins) with calls to this helper.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 15:22:12 -04:00

267 lines
9.1 KiB
Python

"""
Plugin State Management
Manages plugin state machine (loaded → enabled → running → error)
with state transitions and queries.
"""
import threading
from enum import Enum
from typing import Optional, Dict, Any
from datetime import datetime
import logging
from src.logging_config import get_logger
class PluginState(Enum):
"""Plugin state enumeration."""
UNLOADED = "unloaded" # Plugin not loaded
LOADED = "loaded" # Plugin module loaded but not instantiated
ENABLED = "enabled" # Plugin instantiated and enabled
RUNNING = "running" # Plugin is currently executing
ERROR = "error" # Plugin encountered an error
DISABLED = "disabled" # Plugin is disabled in config
class PluginStateManager:
"""Manages plugin state transitions and queries."""
def __init__(self, logger: Optional[logging.Logger] = None) -> None:
"""
Initialize the plugin state manager.
Args:
logger: Optional logger instance
"""
self.logger = logger or get_logger(__name__)
self._lock = threading.RLock()
self._states: Dict[str, PluginState] = {}
self._state_history: Dict[str, list] = {}
self._error_info: Dict[str, Dict[str, Any]] = {}
self._last_update: Dict[str, datetime] = {}
self._last_display: Dict[str, datetime] = {}
def set_state(
self,
plugin_id: str,
state: PluginState,
error: Optional[Exception] = None
) -> None:
"""
Set plugin state and record transition.
Args:
plugin_id: Plugin identifier
state: New state
error: Optional error if transitioning to ERROR state
"""
with self._lock:
old_state = self._states.get(plugin_id, PluginState.UNLOADED)
self._states[plugin_id] = state
if plugin_id not in self._state_history:
self._state_history[plugin_id] = []
transition = {
'timestamp': datetime.now(),
'from': old_state.value,
'to': state.value,
'error': str(error) if error else None
}
self._state_history[plugin_id].append(transition)
# Store error info if transitioning to ERROR state
if state == PluginState.ERROR and error:
self._error_info[plugin_id] = {
'error': str(error),
'error_type': type(error).__name__,
'timestamp': datetime.now()
}
elif state != PluginState.ERROR:
# Clear error info when leaving ERROR state
self._error_info.pop(plugin_id, None)
self.logger.debug(
"Plugin %s state transition: %s%s",
plugin_id,
old_state.value,
state.value
)
def get_state(self, plugin_id: str) -> PluginState:
"""
Get current state of a plugin.
Args:
plugin_id: Plugin identifier
Returns:
Current plugin state
"""
return self._states.get(plugin_id, PluginState.UNLOADED)
def is_loaded(self, plugin_id: str) -> bool:
"""Check if plugin is loaded."""
state = self.get_state(plugin_id)
return state in [PluginState.LOADED, PluginState.ENABLED, PluginState.RUNNING]
def is_enabled(self, plugin_id: str) -> bool:
"""Check if plugin is enabled."""
state = self.get_state(plugin_id)
return state == PluginState.ENABLED
def is_running(self, plugin_id: str) -> bool:
"""Check if plugin is currently running."""
state = self.get_state(plugin_id)
return state == PluginState.RUNNING
def is_error(self, plugin_id: str) -> bool:
"""Check if plugin is in error state."""
state = self.get_state(plugin_id)
return state == PluginState.ERROR
def can_execute(self, plugin_id: str) -> bool:
"""Check if plugin can execute (update/display)."""
state = self.get_state(plugin_id)
return state == PluginState.ENABLED
def get_state_history(self, plugin_id: str) -> list:
"""
Get state transition history for a plugin.
Args:
plugin_id: Plugin identifier
Returns:
List of state transitions
"""
return self._state_history.get(plugin_id, [])
def set_error_info(self, plugin_id: str, error_info: Dict[str, Any]) -> None:
"""
Persist structured error context without changing plugin state.
Used for recoverable failures (e.g. update timeout) where the plugin
stays ENABLED but the error details should remain queryable.
Args:
plugin_id: Plugin identifier
error_info: Arbitrary dict describing the error
"""
with self._lock:
self._error_info[plugin_id] = dict(error_info)
def set_state_with_error(
self,
plugin_id: str,
state: PluginState,
error_info: Dict[str, Any],
error: Optional[Exception] = None,
) -> None:
"""Set plugin state and persist error context atomically.
Unlike calling set_state() then set_error_info() separately, this
method holds ``_lock`` for both writes so no reader can observe the
new state without the accompanying error context.
Intentionally does not clear ``_error_info`` the way set_state() does
for non-ERROR transitions — this is the recoverable-failure path where
the error dict is the entire point.
Args:
plugin_id: Plugin identifier
state: New state
error_info: Structured error dict to persist alongside the state
error: Optional exception recorded in the transition history
"""
with self._lock:
old_state = self._states.get(plugin_id, PluginState.UNLOADED)
self._states[plugin_id] = state
if plugin_id not in self._state_history:
self._state_history[plugin_id] = []
self._state_history[plugin_id].append({
'timestamp': datetime.now(),
'from': old_state.value,
'to': state.value,
'error': str(error) if error else None,
})
self._error_info[plugin_id] = dict(error_info)
self.logger.debug(
"Plugin %s state transition: %s%s (recoverable error stored)",
plugin_id,
old_state.value,
state.value,
)
def get_error_info(self, plugin_id: str) -> Optional[Dict[str, Any]]:
"""
Get error information for a plugin.
Returns the stored error dict whether the plugin is in ERROR state or
still ENABLED after a recoverable failure. Returns a shallow copy so
callers cannot mutate the stored snapshot.
Args:
plugin_id: Plugin identifier
Returns:
Copy of the error information dict, or None
"""
with self._lock:
info = self._error_info.get(plugin_id)
return dict(info) if info is not None else None
def record_update(self, plugin_id: str) -> None:
"""Record that plugin update() was called."""
self._last_update[plugin_id] = datetime.now()
def record_display(self, plugin_id: str) -> None:
"""Record that plugin display() was called."""
self._last_display[plugin_id] = datetime.now()
def get_last_update(self, plugin_id: str) -> Optional[datetime]:
"""Get timestamp of last update() call."""
return self._last_update.get(plugin_id)
def get_last_display(self, plugin_id: str) -> Optional[datetime]:
"""Get timestamp of last display() call."""
return self._last_display.get(plugin_id)
def get_state_info(self, plugin_id: str) -> Dict[str, Any]:
"""
Get comprehensive state information for a plugin.
Args:
plugin_id: Plugin identifier
Returns:
Dictionary with state information
"""
state = self.get_state(plugin_id)
info = {
'state': state.value,
'is_loaded': self.is_loaded(plugin_id),
'is_enabled': self.is_enabled(plugin_id),
'is_running': self.is_running(plugin_id),
'is_error': self.is_error(plugin_id),
'can_execute': self.can_execute(plugin_id),
'last_update': self.get_last_update(plugin_id),
'last_display': self.get_last_display(plugin_id),
'error_info': self.get_error_info(plugin_id),
'state_history_count': len(self.get_state_history(plugin_id))
}
return info
def clear_state(self, plugin_id: str) -> None:
"""Clear all state information for a plugin."""
self._states.pop(plugin_id, None)
self._state_history.pop(plugin_id, None)
self._error_info.pop(plugin_id, None)
self._last_update.pop(plugin_id, None)
self._last_display.pop(plugin_id, None)