fix: add thread-safe locking to PluginManager and fix reconciliation retry

PluginManager thread safety:
- Add RLock protecting plugin_manifests and plugin_directories
- Build scan results locally in _scan_directory_for_plugins, then update
  shared state under lock
- Protect reads in get_plugin_info, get_all_plugin_info,
  get_plugin_directory, get_plugin_display_modes, find_plugin_for_mode
- Protect manifest mutation in reload_plugin
- Prevents races between background reconciliation thread and request
  handlers reading plugin state

Reconciliation retry:
- Clear _reconciliation_started on exception so next request retries
- Check result.reconciliation_successful before marking done
- Reset _reconciliation_started on non-success results to allow retry

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
ChuckBuilds
2026-03-25 15:47:05 -04:00
parent 73109f73f7
commit 6ff7fcba8d
2 changed files with 52 additions and 26 deletions

View File

@@ -14,6 +14,7 @@ import importlib.util
import sys import sys
import subprocess import subprocess
import time import time
import threading
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
import logging import logging
@@ -74,6 +75,10 @@ class PluginManager:
self.state_manager = PluginStateManager(logger=self.logger) self.state_manager = PluginStateManager(logger=self.logger)
self.schema_manager = SchemaManager(plugins_dir=self.plugins_dir, logger=self.logger) self.schema_manager = SchemaManager(plugins_dir=self.plugins_dir, logger=self.logger)
# Lock protecting plugin_manifests and plugin_directories from
# concurrent mutation (background reconciliation) and reads (requests).
self._discovery_lock = threading.RLock()
# Active plugins # Active plugins
self.plugins: Dict[str, Any] = {} self.plugins: Dict[str, Any] = {}
self.plugin_manifests: Dict[str, Dict[str, Any]] = {} self.plugin_manifests: Dict[str, Dict[str, Any]] = {}
@@ -94,23 +99,27 @@ class PluginManager:
def _scan_directory_for_plugins(self, directory: Path) -> List[str]: def _scan_directory_for_plugins(self, directory: Path) -> List[str]:
""" """
Scan a directory for plugins. Scan a directory for plugins.
Args: Args:
directory: Directory to scan directory: Directory to scan
Returns: Returns:
List of plugin IDs found List of plugin IDs found
""" """
plugin_ids = [] plugin_ids = []
if not directory.exists(): if not directory.exists():
return plugin_ids return plugin_ids
# Build new state locally before acquiring lock
new_manifests: Dict[str, Dict[str, Any]] = {}
new_directories: Dict[str, Path] = {}
try: try:
for item in directory.iterdir(): for item in directory.iterdir():
if not item.is_dir(): if not item.is_dir():
continue continue
manifest_path = item / "manifest.json" manifest_path = item / "manifest.json"
if manifest_path.exists(): if manifest_path.exists():
try: try:
@@ -119,18 +128,21 @@ class PluginManager:
plugin_id = manifest.get('id') plugin_id = manifest.get('id')
if plugin_id: if plugin_id:
plugin_ids.append(plugin_id) plugin_ids.append(plugin_id)
self.plugin_manifests[plugin_id] = manifest new_manifests[plugin_id] = manifest
new_directories[plugin_id] = item
# Store directory mapping
if not hasattr(self, 'plugin_directories'):
self.plugin_directories = {}
self.plugin_directories[plugin_id] = item
except (json.JSONDecodeError, PermissionError, OSError) as e: except (json.JSONDecodeError, PermissionError, OSError) as e:
self.logger.warning("Error reading manifest from %s: %s", manifest_path, e, exc_info=True) self.logger.warning("Error reading manifest from %s: %s", manifest_path, e, exc_info=True)
continue continue
except (OSError, PermissionError) as e: except (OSError, PermissionError) as e:
self.logger.error("Error scanning directory %s: %s", directory, e, exc_info=True) self.logger.error("Error scanning directory %s: %s", directory, e, exc_info=True)
# Update shared state under lock
with self._discovery_lock:
self.plugin_manifests.update(new_manifests)
if not hasattr(self, 'plugin_directories'):
self.plugin_directories = {}
self.plugin_directories.update(new_directories)
return plugin_ids return plugin_ids
def discover_plugins(self) -> List[str]: def discover_plugins(self) -> List[str]:
@@ -459,7 +471,9 @@ class PluginManager:
if manifest_path.exists(): if manifest_path.exists():
try: try:
with open(manifest_path, 'r', encoding='utf-8') as f: with open(manifest_path, 'r', encoding='utf-8') as f:
self.plugin_manifests[plugin_id] = json.load(f) manifest = json.load(f)
with self._discovery_lock:
self.plugin_manifests[plugin_id] = manifest
except Exception as e: except Exception as e:
self.logger.error("Error reading manifest: %s", e, exc_info=True) self.logger.error("Error reading manifest: %s", e, exc_info=True)
return False return False
@@ -506,10 +520,11 @@ class PluginManager:
Returns: Returns:
Dict with plugin information or None if not found Dict with plugin information or None if not found
""" """
manifest = self.plugin_manifests.get(plugin_id) with self._discovery_lock:
manifest = self.plugin_manifests.get(plugin_id)
if not manifest: if not manifest:
return None return None
info = manifest.copy() info = manifest.copy()
# Add runtime information if plugin is loaded # Add runtime information if plugin is loaded
@@ -533,7 +548,9 @@ class PluginManager:
Returns: Returns:
List of plugin info dictionaries List of plugin info dictionaries
""" """
return [info for info in [self.get_plugin_info(pid) for pid in self.plugin_manifests.keys()] if info] with self._discovery_lock:
pids = list(self.plugin_manifests.keys())
return [info for info in [self.get_plugin_info(pid) for pid in pids] if info]
def get_plugin_directory(self, plugin_id: str) -> Optional[str]: def get_plugin_directory(self, plugin_id: str) -> Optional[str]:
""" """
@@ -545,8 +562,9 @@ class PluginManager:
Returns: Returns:
Directory path as string or None if not found Directory path as string or None if not found
""" """
if hasattr(self, 'plugin_directories') and plugin_id in self.plugin_directories: with self._discovery_lock:
return str(self.plugin_directories[plugin_id]) if hasattr(self, 'plugin_directories') and plugin_id in self.plugin_directories:
return str(self.plugin_directories[plugin_id])
plugin_dir = self.plugins_dir / plugin_id plugin_dir = self.plugins_dir / plugin_id
if plugin_dir.exists(): if plugin_dir.exists():
@@ -568,10 +586,11 @@ class PluginManager:
Returns: Returns:
List of display mode names List of display mode names
""" """
manifest = self.plugin_manifests.get(plugin_id) with self._discovery_lock:
manifest = self.plugin_manifests.get(plugin_id)
if not manifest: if not manifest:
return [] return []
display_modes = manifest.get('display_modes', []) display_modes = manifest.get('display_modes', [])
if isinstance(display_modes, list): if isinstance(display_modes, list):
return display_modes return display_modes
@@ -588,12 +607,14 @@ class PluginManager:
Plugin identifier or None if not found. Plugin identifier or None if not found.
""" """
normalized_mode = mode.strip().lower() normalized_mode = mode.strip().lower()
for plugin_id, manifest in self.plugin_manifests.items(): with self._discovery_lock:
manifests_snapshot = dict(self.plugin_manifests)
for plugin_id, manifest in manifests_snapshot.items():
display_modes = manifest.get('display_modes') display_modes = manifest.get('display_modes')
if isinstance(display_modes, list) and display_modes: if isinstance(display_modes, list) and display_modes:
if any(m.lower() == normalized_mode for m in display_modes): if any(m.lower() == normalized_mode for m in display_modes):
return plugin_id return plugin_id
return None return None
def _get_plugin_update_interval(self, plugin_id: str, plugin_instance: Any) -> Optional[float]: def _get_plugin_update_interval(self, plugin_id: str, plugin_instance: Any) -> Optional[float]:

View File

@@ -656,7 +656,7 @@ _reconciliation_started = False
def _run_startup_reconciliation(): def _run_startup_reconciliation():
"""Run state reconciliation in background to auto-repair missing plugins.""" """Run state reconciliation in background to auto-repair missing plugins."""
global _reconciliation_done global _reconciliation_done, _reconciliation_started
from src.logging_config import get_logger from src.logging_config import get_logger
_logger = get_logger('reconciliation') _logger = get_logger('reconciliation')
@@ -672,11 +672,16 @@ def _run_startup_reconciliation():
result = reconciler.reconcile_state() result = reconciler.reconcile_state()
if result.inconsistencies_found: if result.inconsistencies_found:
_logger.info("[Reconciliation] %s", result.message) _logger.info("[Reconciliation] %s", result.message)
if result.inconsistencies_fixed: if result.reconciliation_successful:
plugin_manager.discover_plugins() if result.inconsistencies_fixed:
_reconciliation_done = True plugin_manager.discover_plugins()
_reconciliation_done = True
else:
_logger.warning("[Reconciliation] Finished with unresolved issues, will retry")
_reconciliation_started = False
except Exception as e: except Exception as e:
_logger.error("[Reconciliation] Error: %s", e, exc_info=True) _logger.error("[Reconciliation] Error: %s", e, exc_info=True)
_reconciliation_started = False
# Initialize health monitor and run reconciliation on first request # Initialize health monitor and run reconciliation on first request
@app.before_request @app.before_request