mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-05-26 22:13:32 +00:00
* fix(web-ui): fix quick actions not firing, add toast feedback, suppress install handler warning - base.html: add htmx:afterSettle listener to set data-loaded on tab containers after HTMX swaps their content, preventing the overview partial from being re-fetched (and handlers lost) on every tab switch - base.html: call htmx.process() in loadOverviewDirect/loadPluginsDirect fallbacks so buttons get HTMX handlers even if HTMX finished its initial body scan before the fallback fetch completed - overview.html + index.html (11 buttons): replace event.detail.xhr.responseJSON (undefined in HTMX 1.9.x) with JSON.parse(event.detail.xhr.responseText) so quick action toast notifications actually fire - plugins_manager.js: add guarded htmx:afterSettle listener that only calls attachInstallButtonHandler when #install-plugin-from-url is in the DOM, eliminating the spurious console warning on non-plugin tab loads Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(web-ui): ensure quick-action toasts always fire even on xhr/parse failure Replace silent catch(e){} in all 11 hx-on:htmx:after-request handlers with a pattern that sets default message/status before the try block and calls showNotification(m,s) unconditionally after it, so a fallback toast is shown whenever xhr is absent or responseText is not valid JSON. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(web-ui): show error toast on non-JSON 4xx/5xx quick-action responses In the catch block of all 11 hx-on:htmx:after-request handlers, check xhr.status >= 400 and downgrade s to 'error' so a failed action that returns an HTML error page (or other non-JSON body) surfaces as an error toast instead of the optimistic 'success'/'info' default. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(web-ui): guard setTimeout fallback for attachInstallButtonHandler The 500ms fallback setTimeout was calling attachInstallButtonHandler() unconditionally even when the plugins partial wasn't in the DOM, causing a spurious console.warn on every page load. Add the same element-existence check already present on the htmx:afterSettle listener. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix backup API 404s, hardware status 500, and HTMX loading race - Add all backup API routes to api_v3.py: preview, list, export, validate, restore (with plugin reinstall), download, delete - Fix PermissionError on /hardware/status: return graceful 200 instead of 500 when the status file is owned by a different user; also fix root cause by writing the file world-readable (0o644) in display_manager - Fix HTMX race: dispatch htmx:ready window event from HTMX onload callback; loadTabContent now waits for that event instead of immediately falling back to direct fetch (eliminating the "HTMX not available" console warning on initial load) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Cancel HTMX fallback timers when htmx:ready fires The 5-second setTimeout fallbacks for plugins and overview were firing before the htmx:ready event arrived, logging spurious warnings. Each timer now self-cancels via htmx:ready so the fallback only triggers when HTMX genuinely fails to load. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Address review feedback: error leaks, ok:false, htmx:ready coverage - Backup endpoints: replace raw str(e) in user-facing responses with a generic message; full exception still logged via exc_info=True - hardware/status: change ok:null to ok:false for PermissionError and json.JSONDecodeError so the UI's hw.ok===false check triggers correctly - base.html: dispatch htmx:ready from the fallback load path so any deferred listeners fire on CDN-fallback loads too - loadTabContent: also listen for htmx-load-failed so overview/wifi/plugins fall back to direct fetch when HTMX is completely unavailable Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Treat system-managed pip packages as satisfied for dependency marker When a plugin's requirements.txt includes a package installed via the system package manager (dnf/apt), pip fails with 'uninstall-no-record-file' because it can't replace the system-tracked copy. The package is present and functional, but the missing marker caused the install to be retried on every service restart. Detect this specific error pattern: if the only pip failure is uninstall-no-record-file, write the .dependencies_installed marker and log a warning instead of returning False, suppressing the repeated warning. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix uninstall-no-record-file detection condition The previous check used a string replacement that left 'error:' in the remaining text, causing the condition to always evaluate false. Simplify to a direct substring check: if 'uninstall-no-record-file' appears in pip stderr the affected package is installed at the system level and we write the marker, suppressing the repeated warning on every restart. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Resolve CodeQL security findings in backup API Path traversal (CWE-22): - backup_download: switch from send_file(user-tainted-path) to send_from_directory(_BACKUP_EXPORT_DIR, filename); Flask uses werkzeug safe_join internally which CodeQL recognises as a sanitizer - backup_delete: enumerate the export directory and match by name so entry.unlink() operates on a filesystem-derived Path rather than one constructed from user input; _safe_backup_path still guards first Information exposure through exceptions (CWE-209): - backup_validate: err_msg from validate_backup() can embed exception strings containing temp-file paths; log the detail, return a generic 'Invalid or corrupted backup file' to the client - Other backup endpoints: already fixed (str(e) -> generic message); CodeQL alerts will clear on next scan plugin_loader.py:185 (path traversal): false positive — requirements_file is constructed from plugin_dir returned by find_plugin_directory() (a filesystem scan), not from raw HTTP request input; no change needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix pre-existing information exposure in version and action endpoints - get_system_version (alert #218): replaced str(e) with generic message; exception still logged via logger.error(exc_info=True) - execute_system_action (alert #216): removed str(e) and full traceback.format_exc() from the HTTP response — the full stack trace was being sent directly to clients; replaced with generic message and proper logger.error call Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix remaining GitHub CodeQL security alerts - py/stack-trace-exposure: Remove str(e) and traceback.format_exc() from all HTTP responses across api_v3.py, pages_v3.py, and app.py; replace with generic messages and logger.error(exc_info=True) - py/reflective-xss: Escape partial_name via markupsafe.escape in the load_partial 404 response - py/path-injection: Add regex validation of plugin_id before filesystem use in _load_plugin_config_partial - py/incomplete-url-substring-sanitization: Replace 'github.com' in substring checks with urlparse hostname comparison in store_manager.py - py/clear-text-logging-sensitive-data: Remove football-scoreboard debug prints and sensitive request-body prints from update endpoint - js/bad-tag-filter: Replace script-only regex in BaseWidget.sanitizeValue with DOM-based textContent stripping that removes all HTML - js/incomplete-sanitization: Fix escapeAttr to properly encode &, ", ', <, > using HTML entities instead of backslash escaping - js/prototype-pollution-utility: Add __proto__/constructor/prototype key guards to deepMerge function in plugins_manager.js - app.py error handlers: Always return generic messages; remove debug-mode branches that could expose tracebacks in production Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix three remaining CodeQL path-injection and info-exposure alerts - plugin_loader.py: resolve plugin_dir with strict=True and validate marker_path with relative_to() before any filesystem writes, giving CodeQL the positive sanitization pattern it requires (py/path-injection) - api_v3.py _safe_backup_path: replace substring negative checks with a strict positive regex (^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$) that CodeQL recognises as sanitising the user-supplied filename (py/path-injection) - api_v3.py backup_validate: whitelist known-safe manifest fields before returning JSON, preventing any exception strings captured inside validate_backup() from reaching the HTTP response (py/stack-trace-exposure) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Resolve 29 open CodeQL security alerts across 5 files py/flask-debug (#214): - debug_web_manual.py: read debug mode from LEDMATRIX_FLASK_DEBUG env var instead of hardcoded True py/stack-trace-exposure (#216, #218): - api_v3.py execute_system_action: remove subprocess stdout/stderr from HTTP responses; log via logger instead - api_v3.py get_git_version: validate output matches safe ref format (^[a-zA-Z0-9._-]+$) before including in response - api_v3.py: remove all remaining traceback.format_exc() dead variables and print() debug calls (replaced with logger.debug/warning) py/reflective-xss (#207, #208, #209, #210, #211, #212): - api_v3.py: remove plugin_id from all error/success response messages (uninstall, install, update, health, not-found responses) - pages_v3.py load_partial: return static "Partial not found" message instead of echoing partial_name - pages_v3.py _load_starlark_config_partial: add app_id regex validation, use static error messages instead of f-strings with app_id py/path-injection (#187–#206): - pages_v3.py _load_plugin_config_partial: resolve plugins_base and validate _plugin_dir with relative_to() before all file operations; same for assets metadata directory - pages_v3.py _load_starlark_config_partial: resolve starlark_base and validate schema_file/config_file paths with relative_to() - plugin_loader.py _find_plugin_directory: resolve plugins_dir and validate strategy-2 candidates with relative_to() - plugin_loader.py install_dependencies: resolve plugin_dir first, then construct requirements_file and marker_path from resolved base - plugin_loader.py load_module: resolve plugin_dir with strict=True and validate entry_file with relative_to() before exec_module Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix 15 remaining CodeQL path-injection and stack-trace-exposure alerts Switch from resolve()+relative_to() to os.path.basename() reassignment, which CodeQL recognizes as a path sanitizer that breaks the taint chain. Also remove exception objects from backup_manager validate_backup return strings to eliminate the stack-trace-exposure taint source. Fixes alerts #227, #233, #234, #235, #237, #238, #239, #240, #241, #242, #243, #244, #245, #246, #247. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix broken logger format string and leaked exception in config save error - pages_v3.py: plain string was used instead of %-style substitution, so every manifest-read failure logged the literal "{plugin_id}" - api_v3.py save_main_config: exception message was still leaking through the error response; replace with generic message (consistent with the rest of the CodeQL sweep in this PR) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Chuck <chuck@example.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2500 lines
119 KiB
Python
2500 lines
119 KiB
Python
"""
|
|
Plugin Store Manager for LEDMatrix
|
|
|
|
Handles plugin discovery, installation, updates, and uninstallation
|
|
from both the official registry and custom GitHub repositories.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import stat
|
|
import subprocess
|
|
import shutil
|
|
import threading
|
|
import zipfile
|
|
import tempfile
|
|
import requests
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional, Any, Tuple
|
|
import logging
|
|
|
|
from urllib.parse import urlparse
|
|
|
|
from src.common.permission_utils import sudo_remove_directory
|
|
|
|
try:
|
|
from jsonschema import Draft7Validator, ValidationError
|
|
JSONSCHEMA_AVAILABLE = True
|
|
except ImportError:
|
|
JSONSCHEMA_AVAILABLE = False
|
|
|
|
|
|
class PluginStoreManager:
|
|
"""
|
|
Manages plugin discovery, installation, and updates from GitHub.
|
|
|
|
Supports two installation methods:
|
|
1. From official registry (curated plugins)
|
|
2. From custom GitHub URL (any repo)
|
|
"""
|
|
|
|
REGISTRY_URL = "https://raw.githubusercontent.com/ChuckBuilds/ledmatrix-plugins/main/plugins.json"
|
|
|
|
def __init__(self, plugins_dir: str = "plugins"):
|
|
"""
|
|
Initialize the plugin store manager.
|
|
|
|
Args:
|
|
plugins_dir: Directory where plugins are installed
|
|
"""
|
|
self.plugins_dir = Path(plugins_dir)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.registry_cache = None
|
|
self.registry_cache_time = None # Timestamp of when registry was cached
|
|
self.github_cache = {} # Cache for GitHub API responses
|
|
self.cache_timeout = 3600 # 1 hour cache timeout (repo info: stars, default_branch)
|
|
# 15 minutes for registry cache. Long enough that the plugin list
|
|
# endpoint on a warm cache never hits the network, short enough that
|
|
# new plugins show up within a reasonable window. See also the
|
|
# stale-cache fallback in fetch_registry for transient network
|
|
# failures.
|
|
self.registry_cache_timeout = 900
|
|
self.commit_info_cache = {} # Cache for latest commit info: {key: (timestamp, data)}
|
|
# 30 minutes for commit/manifest caches. Plugin Store users browse
|
|
# the catalog via /plugins/store/list which fetches commit info and
|
|
# manifest data per plugin. 5-min TTLs meant every fresh browse on
|
|
# a Pi4 paid for ~3 HTTP requests x N plugins (30-60s serial). 30
|
|
# minutes keeps the cache warm across a realistic session while
|
|
# still picking up upstream updates within a reasonable window.
|
|
self.commit_cache_timeout = 1800
|
|
self.manifest_cache = {} # Cache for GitHub manifest fetches: {key: (timestamp, data)}
|
|
self.manifest_cache_timeout = 1800
|
|
self.github_token = self._load_github_token()
|
|
self._token_validation_cache = {} # Cache for token validation results: {token: (is_valid, timestamp, error_message)}
|
|
self._token_validation_cache_timeout = 300 # 5 minutes cache for token validation
|
|
|
|
# Per-plugin tombstone timestamps for plugins that were uninstalled
|
|
# recently via the UI. Used by the state reconciler to avoid
|
|
# resurrecting a plugin the user just deleted when reconciliation
|
|
# races against the uninstall operation. Cleared after ``_uninstall_tombstone_ttl``.
|
|
self._uninstall_tombstones: Dict[str, float] = {}
|
|
self._uninstall_tombstone_ttl = 300 # 5 minutes
|
|
|
|
# Cache for _get_local_git_info: {plugin_path_str: (signature, data)}
|
|
# where ``signature`` is a tuple of (head_mtime, resolved_ref_mtime,
|
|
# head_contents) so a fast-forward update to the current branch
|
|
# (which touches .git/refs/heads/<branch> but NOT .git/HEAD) still
|
|
# invalidates the cache. Before this cache, every
|
|
# /plugins/installed request fired 4 git subprocesses per plugin,
|
|
# which pegged the CPU on a Pi4 with a dozen plugins. The cached
|
|
# ``data`` dict is the same shape returned by ``_get_local_git_info``
|
|
# itself (sha / short_sha / branch / optional remote_url, date_iso,
|
|
# date) — all string-keyed strings.
|
|
self._git_info_cache: Dict[str, Tuple[Tuple, Dict[str, str]]] = {}
|
|
|
|
# How long to wait before re-attempting a failed GitHub metadata
|
|
# fetch after we've already served a stale cache hit. Without this,
|
|
# a single expired-TTL + network-error would cause every subsequent
|
|
# request to re-hit the network (and fail again) until the network
|
|
# actually came back — amplifying the failure and blocking request
|
|
# handlers. Bumping the cached-entry timestamp on failure serves
|
|
# the stale payload cheaply until the backoff expires.
|
|
self._failure_backoff_seconds = 60
|
|
# Prevents concurrent callers from each firing a network request when
|
|
# the registry cache expires. Only one thread fetches; others wait and
|
|
# then get the result from the warm cache (double-checked locking).
|
|
self._registry_fetch_lock = threading.Lock()
|
|
|
|
# Ensure plugins directory exists
|
|
self.plugins_dir.mkdir(exist_ok=True)
|
|
|
|
def _record_cache_backoff(self, cache_dict: Dict, cache_key: str,
|
|
cache_timeout: int, payload: Any) -> None:
|
|
"""Bump a cache entry's timestamp so subsequent lookups hit the
|
|
cache rather than re-failing over the network.
|
|
|
|
Used by the stale-on-error fallbacks in the GitHub metadata fetch
|
|
paths. Without this, a cache entry whose TTL just expired would
|
|
cause every subsequent request to re-hit the network and fail
|
|
again until the network actually came back. We write a synthetic
|
|
timestamp ``(now + backoff - cache_timeout)`` so the cache-valid
|
|
check ``(now - ts) < cache_timeout`` succeeds for another
|
|
``backoff`` seconds.
|
|
"""
|
|
synthetic_ts = time.time() + self._failure_backoff_seconds - cache_timeout
|
|
cache_dict[cache_key] = (synthetic_ts, payload)
|
|
|
|
def mark_recently_uninstalled(self, plugin_id: str) -> None:
|
|
"""Record that ``plugin_id`` was just uninstalled by the user."""
|
|
self._uninstall_tombstones[plugin_id] = time.time()
|
|
|
|
def was_recently_uninstalled(self, plugin_id: str) -> bool:
|
|
"""Return True if ``plugin_id`` has an active uninstall tombstone."""
|
|
ts = self._uninstall_tombstones.get(plugin_id)
|
|
if ts is None:
|
|
return False
|
|
if time.time() - ts > self._uninstall_tombstone_ttl:
|
|
# Expired — clean up so the dict doesn't grow unbounded.
|
|
self._uninstall_tombstones.pop(plugin_id, None)
|
|
return False
|
|
return True
|
|
|
|
def _load_github_token(self) -> Optional[str]:
|
|
"""
|
|
Load GitHub API token from config_secrets.json if available.
|
|
|
|
Returns:
|
|
GitHub token or None if not configured
|
|
"""
|
|
try:
|
|
config_path = Path(__file__).parent.parent.parent / "config" / "config_secrets.json"
|
|
if config_path.exists():
|
|
with open(config_path, 'r') as f:
|
|
config = json.load(f)
|
|
token = config.get('github', {}).get('api_token', '').strip()
|
|
if token and token != "YOUR_GITHUB_PERSONAL_ACCESS_TOKEN":
|
|
return token
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not load GitHub token: {e}")
|
|
return None
|
|
|
|
def _validate_github_token(self, token: str) -> tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate a GitHub token by making a lightweight API call.
|
|
|
|
Args:
|
|
token: GitHub personal access token to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
- is_valid: True if token is valid, False otherwise
|
|
- error_message: None if valid, error description if invalid
|
|
"""
|
|
if not token:
|
|
return (False, "No token provided")
|
|
|
|
# Check cache first
|
|
cache_key = token[:10] # Use first 10 chars as cache key for privacy
|
|
if cache_key in self._token_validation_cache:
|
|
cached_valid, cached_time, cached_error = self._token_validation_cache[cache_key]
|
|
if time.time() - cached_time < self._token_validation_cache_timeout:
|
|
return (cached_valid, cached_error)
|
|
|
|
# Validate token by making a lightweight API call to /user endpoint
|
|
try:
|
|
api_url = "https://api.github.com/user"
|
|
headers = {
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
'User-Agent': 'LEDMatrix-Plugin-Manager/1.0',
|
|
'Authorization': f'token {token}'
|
|
}
|
|
|
|
response = requests.get(api_url, headers=headers, timeout=5)
|
|
|
|
if response.status_code == 200:
|
|
# Token is valid
|
|
result = (True, None)
|
|
self._token_validation_cache[cache_key] = (True, time.time(), None)
|
|
return result
|
|
elif response.status_code == 401:
|
|
# Token is invalid or expired
|
|
error_msg = "Token is invalid or expired"
|
|
result = (False, error_msg)
|
|
self._token_validation_cache[cache_key] = (False, time.time(), error_msg)
|
|
return result
|
|
elif response.status_code == 403:
|
|
# Rate limit or forbidden (but token might be valid)
|
|
# Check if it's a rate limit issue
|
|
if 'rate limit' in response.text.lower():
|
|
# Rate limit: return error but don't cache (rate limits are temporary)
|
|
error_msg = "Rate limit exceeded"
|
|
result = (False, error_msg)
|
|
return result
|
|
else:
|
|
# Token lacks permissions: cache the result (permissions don't change)
|
|
error_msg = "Token lacks required permissions"
|
|
result = (False, error_msg)
|
|
self._token_validation_cache[cache_key] = (False, time.time(), error_msg)
|
|
return result
|
|
else:
|
|
# Other error
|
|
error_msg = f"GitHub API error: {response.status_code}"
|
|
result = (False, error_msg)
|
|
self._token_validation_cache[cache_key] = (False, time.time(), error_msg)
|
|
return result
|
|
|
|
except requests.exceptions.Timeout:
|
|
error_msg = "GitHub API request timed out"
|
|
result = (False, error_msg)
|
|
# Don't cache timeout errors
|
|
return result
|
|
except requests.exceptions.RequestException as e:
|
|
error_msg = f"Network error: {str(e)}"
|
|
result = (False, error_msg)
|
|
# Don't cache network errors
|
|
return result
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error: {str(e)}"
|
|
result = (False, error_msg)
|
|
# Don't cache unexpected errors
|
|
return result
|
|
|
|
@staticmethod
|
|
def _iso_to_date(iso_timestamp: str) -> str:
|
|
"""Convert an ISO timestamp to YYYY-MM-DD string."""
|
|
if not iso_timestamp:
|
|
return ""
|
|
|
|
try:
|
|
dt = datetime.fromisoformat(iso_timestamp.replace('Z', '+00:00'))
|
|
return dt.strftime('%Y-%m-%d')
|
|
except Exception:
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _distinct_sequence(values: List[str]) -> List[str]:
|
|
"""Return list preserving order while removing duplicates and falsey entries."""
|
|
seen = set()
|
|
ordered = []
|
|
for value in values:
|
|
if not value:
|
|
continue
|
|
if value in seen:
|
|
continue
|
|
seen.add(value)
|
|
ordered.append(value)
|
|
return ordered
|
|
|
|
def _validate_manifest_version_fields(self, manifest: Dict[str, Any]) -> List[str]:
|
|
"""
|
|
Validate version-related fields in manifest for consistency.
|
|
|
|
Checks:
|
|
- compatible_versions is present and is an array
|
|
- Standardized field names are used (min_ledmatrix_version, max_ledmatrix_version)
|
|
- Deprecated fields are not used (ledmatrix_version)
|
|
- versions array entries use ledmatrix_min_version instead of ledmatrix_min
|
|
|
|
Args:
|
|
manifest: Manifest dictionary to validate
|
|
|
|
Returns:
|
|
List of validation error/warning messages (empty if valid)
|
|
"""
|
|
errors = []
|
|
|
|
# Check compatible_versions is an array
|
|
if 'compatible_versions' in manifest:
|
|
if not isinstance(manifest['compatible_versions'], list):
|
|
errors.append("compatible_versions must be an array")
|
|
elif len(manifest['compatible_versions']) == 0:
|
|
errors.append("compatible_versions array cannot be empty")
|
|
|
|
# Warn about deprecated ledmatrix_version field
|
|
if 'ledmatrix_version' in manifest:
|
|
errors.append("ledmatrix_version is deprecated, use compatible_versions instead")
|
|
|
|
# Check versions array entries use standardized field names
|
|
if 'versions' in manifest and isinstance(manifest['versions'], list):
|
|
for i, version_entry in enumerate(manifest['versions']):
|
|
if not isinstance(version_entry, dict):
|
|
continue
|
|
|
|
# Check for old ledmatrix_min field
|
|
if 'ledmatrix_min' in version_entry and 'ledmatrix_min_version' not in version_entry:
|
|
errors.append(f"versions[{i}] uses deprecated 'ledmatrix_min', should use 'ledmatrix_min_version'")
|
|
|
|
return errors
|
|
|
|
def _validate_manifest_schema(self, manifest: Dict[str, Any], plugin_id: str) -> List[str]:
|
|
"""
|
|
Validate manifest against JSON schema if available.
|
|
|
|
Args:
|
|
manifest: Manifest dictionary to validate
|
|
plugin_id: Plugin ID for error messages
|
|
|
|
Returns:
|
|
List of validation error messages (empty if valid or schema unavailable)
|
|
"""
|
|
if not JSONSCHEMA_AVAILABLE:
|
|
return []
|
|
|
|
try:
|
|
# Load manifest schema
|
|
schema_path = Path(__file__).parent.parent.parent / "schema" / "manifest_schema.json"
|
|
if not schema_path.exists():
|
|
return [] # Schema not available, skip validation
|
|
|
|
with open(schema_path, 'r', encoding='utf-8') as f:
|
|
schema = json.load(f)
|
|
|
|
# Validate schema itself
|
|
Draft7Validator.check_schema(schema)
|
|
|
|
# Validate manifest against schema
|
|
validator = Draft7Validator(schema)
|
|
errors = []
|
|
for error in validator.iter_errors(manifest):
|
|
error_path = '.'.join(str(p) for p in error.path)
|
|
errors.append(f"{error_path}: {error.message}")
|
|
|
|
return errors
|
|
except json.JSONDecodeError as e:
|
|
self.logger.warning(f"Could not parse manifest schema: {e}")
|
|
return []
|
|
except ValidationError as e:
|
|
self.logger.warning(f"Manifest schema is invalid: {e}")
|
|
return []
|
|
except Exception as e:
|
|
self.logger.debug(f"Error validating manifest schema for {plugin_id}: {e}")
|
|
return []
|
|
|
|
def _get_github_repo_info(self, repo_url: str) -> Dict[str, Any]:
|
|
"""Fetch GitHub repository information (stars, etc.)"""
|
|
# Extract owner/repo from URL
|
|
try:
|
|
# Handle different URL formats
|
|
_parsed_url = urlparse(repo_url)
|
|
if _parsed_url.hostname in ('github.com', 'www.github.com'):
|
|
parts = repo_url.strip('/').split('/')
|
|
if len(parts) >= 2:
|
|
owner = parts[-2]
|
|
repo = parts[-1]
|
|
if repo.endswith('.git'):
|
|
repo = repo[:-4]
|
|
|
|
cache_key = f"{owner}/{repo}"
|
|
|
|
# Check cache first
|
|
if cache_key in self.github_cache:
|
|
cached_time, cached_data = self.github_cache[cache_key]
|
|
if time.time() - cached_time < self.cache_timeout:
|
|
return cached_data
|
|
|
|
# Fetch from GitHub API
|
|
api_url = f"https://api.github.com/repos/{owner}/{repo}"
|
|
headers = {
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
'User-Agent': 'LEDMatrix-Plugin-Manager/1.0'
|
|
}
|
|
|
|
# Add authentication if token is available
|
|
if self.github_token:
|
|
headers['Authorization'] = f'token {self.github_token}'
|
|
|
|
try:
|
|
response = requests.get(api_url, headers=headers, timeout=10)
|
|
except requests.RequestException as req_err:
|
|
# Network error: prefer a stale cache hit over an
|
|
# empty default so the UI keeps working on a flaky
|
|
# Pi WiFi link. Bump the cached entry's timestamp
|
|
# into a short backoff window so subsequent
|
|
# requests serve the stale payload cheaply instead
|
|
# of re-hitting the network on every request.
|
|
if cache_key in self.github_cache:
|
|
_, stale = self.github_cache[cache_key]
|
|
self._record_cache_backoff(self.github_cache, cache_key, self.cache_timeout, stale)
|
|
self.logger.warning(
|
|
"GitHub repo info fetch failed for %s (%s); serving stale cache.",
|
|
cache_key, req_err,
|
|
)
|
|
return stale
|
|
raise
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
pushed_at = data.get('pushed_at', '') or data.get('updated_at', '')
|
|
repo_info = {
|
|
'stars': data.get('stargazers_count', 0),
|
|
'forks': data.get('forks_count', 0),
|
|
'open_issues': data.get('open_issues_count', 0),
|
|
'updated_at_iso': data.get('updated_at', ''),
|
|
'last_commit_iso': pushed_at,
|
|
'last_commit_date': self._iso_to_date(pushed_at),
|
|
'language': data.get('language', ''),
|
|
'license': data.get('license', {}).get('name', '') if data.get('license') else '',
|
|
'default_branch': data.get('default_branch', 'main')
|
|
}
|
|
|
|
# Cache the result
|
|
self.github_cache[cache_key] = (time.time(), repo_info)
|
|
return repo_info
|
|
elif response.status_code == 403:
|
|
# Rate limit or authentication issue. If we have a
|
|
# previously-cached value, serve it rather than
|
|
# returning empty defaults — a stale star count is
|
|
# better than a reset to zero. Apply the same
|
|
# failure-backoff bump as the network-error path
|
|
# so we don't hammer the API with repeat requests
|
|
# while rate-limited.
|
|
if cache_key in self.github_cache:
|
|
_, stale = self.github_cache[cache_key]
|
|
self._record_cache_backoff(self.github_cache, cache_key, self.cache_timeout, stale)
|
|
self.logger.warning(
|
|
"GitHub API 403 for %s; serving stale cache.", cache_key,
|
|
)
|
|
return stale
|
|
if not self.github_token:
|
|
self.logger.warning(
|
|
"GitHub API rate limit likely exceeded (403). "
|
|
"Add a GitHub personal access token to config/config_secrets.json "
|
|
"under 'github.api_token' to increase rate limits from 60 to 5000/hour."
|
|
)
|
|
else:
|
|
self.logger.warning(
|
|
f"GitHub API request failed: 403 for {api_url}. "
|
|
f"Your token may have insufficient permissions or rate limit exceeded."
|
|
)
|
|
else:
|
|
self.logger.warning(f"GitHub API request failed: {response.status_code} for {api_url}")
|
|
if cache_key in self.github_cache:
|
|
_, stale = self.github_cache[cache_key]
|
|
self._record_cache_backoff(self.github_cache, cache_key, self.cache_timeout, stale)
|
|
return stale
|
|
|
|
return {
|
|
'stars': 0,
|
|
'forks': 0,
|
|
'open_issues': 0,
|
|
'updated_at_iso': '',
|
|
'last_commit_iso': '',
|
|
'last_commit_date': '',
|
|
'language': '',
|
|
'license': '',
|
|
'default_branch': 'main'
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching GitHub repo info for {repo_url}: {e}")
|
|
return {
|
|
'stars': 0,
|
|
'forks': 0,
|
|
'open_issues': 0,
|
|
'updated_at_iso': '',
|
|
'last_commit_iso': '',
|
|
'last_commit_date': '',
|
|
'language': '',
|
|
'license': '',
|
|
'default_branch': 'main'
|
|
}
|
|
|
|
def _http_get_with_retries(self, url: str, *, timeout: int = 10, stream: bool = False, headers: Dict[str, str] = None, max_retries: int = 3, backoff_sec: float = 0.75):
|
|
"""
|
|
HTTP GET with simple retry strategy and exponential backoff.
|
|
|
|
Returns a requests.Response or raises the last exception.
|
|
"""
|
|
last_exc = None
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
resp = requests.get(url, timeout=timeout, stream=stream, headers=headers)
|
|
return resp
|
|
except requests.RequestException as e:
|
|
last_exc = e
|
|
self.logger.warning(f"HTTP GET failed (attempt {attempt}/{max_retries}) for {url}: {e}")
|
|
if attempt < max_retries:
|
|
time.sleep(backoff_sec * attempt)
|
|
# Exhausted retries
|
|
raise last_exc
|
|
|
|
def fetch_registry_from_url(self, repo_url: str) -> Optional[Dict]:
|
|
"""
|
|
Fetch a registry-style plugins.json from a custom GitHub repository URL.
|
|
|
|
This allows users to point to a registry-style monorepo (like the official
|
|
ledmatrix-plugins repo) and browse/install plugins from it.
|
|
|
|
Args:
|
|
repo_url: GitHub repository URL (e.g., https://github.com/user/ledmatrix-plugins)
|
|
|
|
Returns:
|
|
Registry dict with plugins list, or None if not found/invalid
|
|
"""
|
|
try:
|
|
# Clean up URL
|
|
repo_url = repo_url.rstrip('/').replace('.git', '')
|
|
|
|
# Try to find plugins.json in common locations
|
|
# First try root directory
|
|
registry_urls = []
|
|
|
|
# Extract owner/repo from URL
|
|
_parsed_repo_url = urlparse(repo_url)
|
|
if _parsed_repo_url.hostname in ('github.com', 'www.github.com'):
|
|
parts = repo_url.split('/')
|
|
if len(parts) >= 2:
|
|
owner = parts[-2]
|
|
repo = parts[-1]
|
|
|
|
# Try common branch names
|
|
for branch in ['main', 'master']:
|
|
registry_urls.append(f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/plugins.json")
|
|
registry_urls.append(f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/registry.json")
|
|
|
|
# Try each URL
|
|
for url in registry_urls:
|
|
try:
|
|
response = self._http_get_with_retries(url, timeout=10)
|
|
if response.status_code == 200:
|
|
registry = response.json()
|
|
# Validate it looks like a registry
|
|
if isinstance(registry, dict) and 'plugins' in registry:
|
|
self.logger.info(f"Successfully fetched registry from {url}")
|
|
return registry
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to fetch from {url}: {e}")
|
|
continue
|
|
|
|
self.logger.warning(f"No valid registry found at {repo_url}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching registry from URL: {e}", exc_info=True)
|
|
return None
|
|
|
|
def fetch_registry(self, force_refresh: bool = False, raise_on_failure: bool = False) -> Dict:
|
|
"""
|
|
Fetch the plugin registry from GitHub.
|
|
|
|
Args:
|
|
force_refresh: Force refresh even if cached
|
|
raise_on_failure: If True, re-raise network / JSON errors instead
|
|
of silently falling back to stale cache / empty dict. UI
|
|
callers prefer the stale-fallback default so the plugin
|
|
list keeps working on flaky WiFi; the state reconciler
|
|
needs the explicit failure signal so it can distinguish
|
|
"plugin genuinely not in registry" from "I couldn't reach
|
|
the registry at all" and not mark everything unrecoverable.
|
|
|
|
Returns:
|
|
Registry data with list of available plugins
|
|
|
|
Raises:
|
|
requests.RequestException / json.JSONDecodeError when
|
|
``raise_on_failure`` is True and the fetch fails.
|
|
"""
|
|
# Check if cache is still valid (within timeout)
|
|
current_time = time.time()
|
|
if (self.registry_cache and self.registry_cache_time and
|
|
not force_refresh and
|
|
(current_time - self.registry_cache_time) < self.registry_cache_timeout):
|
|
return self.registry_cache
|
|
|
|
with self._registry_fetch_lock:
|
|
# Re-check inside the lock — a concurrent caller that was waiting
|
|
# may have already populated the cache while we blocked.
|
|
current_time = time.time()
|
|
if (self.registry_cache and self.registry_cache_time and
|
|
not force_refresh and
|
|
(current_time - self.registry_cache_time) < self.registry_cache_timeout):
|
|
return self.registry_cache
|
|
|
|
try:
|
|
self.logger.info(f"Fetching plugin registry from {self.REGISTRY_URL}")
|
|
response = self._http_get_with_retries(self.REGISTRY_URL, timeout=10)
|
|
response.raise_for_status()
|
|
self.registry_cache = response.json()
|
|
self.registry_cache_time = current_time
|
|
self.logger.info(f"Fetched registry with {len(self.registry_cache.get('plugins', []))} plugins")
|
|
return self.registry_cache
|
|
except requests.RequestException as e:
|
|
self.logger.error(f"Error fetching registry: {e}")
|
|
if raise_on_failure:
|
|
raise
|
|
# Prefer stale cache over an empty list so the plugin list UI
|
|
# keeps working on a flaky connection (e.g. Pi on WiFi). Bump
|
|
# registry_cache_time into a short backoff window so the next
|
|
# request serves the stale payload cheaply instead of
|
|
# re-hitting the network on every request (matches the
|
|
# pattern used by github_cache / commit_info_cache).
|
|
if self.registry_cache:
|
|
self.logger.warning("Falling back to stale registry cache")
|
|
self.registry_cache_time = (
|
|
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
|
|
)
|
|
return self.registry_cache
|
|
return {"plugins": []}
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Error parsing registry JSON: {e}")
|
|
if raise_on_failure:
|
|
raise
|
|
if self.registry_cache:
|
|
self.registry_cache_time = (
|
|
time.time() + self._failure_backoff_seconds - self.registry_cache_timeout
|
|
)
|
|
return self.registry_cache
|
|
return {"plugins": []}
|
|
|
|
def search_plugins(self, query: str = "", category: str = "", tags: List[str] = None, fetch_commit_info: bool = True, include_saved_repos: bool = True, saved_repositories_manager = None) -> List[Dict]:
|
|
"""
|
|
Search for plugins in the registry with enhanced metadata.
|
|
|
|
GitHub is now treated as the source of truth for live metadata like
|
|
stars and last commit timestamps. The registry provides descriptive
|
|
information (name, description, repo URL, etc.).
|
|
|
|
Args:
|
|
query: Search query string (searches name, description, id)
|
|
category: Filter by category (e.g., 'sports', 'weather', 'time')
|
|
tags: Filter by tags (matches any tag in list)
|
|
fetch_commit_info: If True (default), fetch commit metadata from GitHub.
|
|
|
|
Returns:
|
|
List of matching plugin metadata enriched with GitHub information
|
|
"""
|
|
if tags is None:
|
|
tags = []
|
|
|
|
# Fetch from official registry
|
|
registry = self.fetch_registry()
|
|
plugins = registry.get('plugins', []) or []
|
|
|
|
# Also fetch from saved repositories if enabled
|
|
if include_saved_repos and saved_repositories_manager:
|
|
saved_repos = saved_repositories_manager.get_registry_repositories()
|
|
for repo_info in saved_repos:
|
|
repo_url = repo_info.get('url')
|
|
if repo_url:
|
|
try:
|
|
custom_registry = self.fetch_registry_from_url(repo_url)
|
|
if custom_registry:
|
|
custom_plugins = custom_registry.get('plugins', []) or []
|
|
# Mark these as from custom repository
|
|
for plugin in custom_plugins:
|
|
plugin['_source'] = 'custom_repository'
|
|
plugin['_repository_url'] = repo_url
|
|
plugin['_repository_name'] = repo_info.get('name', repo_url)
|
|
plugins.extend(custom_plugins)
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to fetch plugins from saved repository {repo_url}: {e}")
|
|
|
|
# First pass: apply cheap filters (category/tags/query) so we only
|
|
# fetch GitHub metadata for plugins that will actually be returned.
|
|
filtered: List[Dict] = []
|
|
for plugin in plugins:
|
|
if category and plugin.get('category') != category:
|
|
continue
|
|
if tags and not any(tag in plugin.get('tags', []) for tag in tags):
|
|
continue
|
|
if query:
|
|
query_lower = query.lower()
|
|
searchable_text = ' '.join([
|
|
plugin.get('name', ''),
|
|
plugin.get('description', ''),
|
|
plugin.get('id', ''),
|
|
plugin.get('author', ''),
|
|
]).lower()
|
|
if query_lower not in searchable_text:
|
|
continue
|
|
filtered.append(plugin)
|
|
|
|
def _enrich(plugin: Dict) -> Dict:
|
|
"""Enrich a single plugin with GitHub metadata.
|
|
|
|
Called concurrently from a ThreadPoolExecutor. Each underlying
|
|
HTTP helper (``_get_github_repo_info`` / ``_get_latest_commit_info``
|
|
/ ``_fetch_manifest_from_github``) is thread-safe — they use
|
|
``requests`` and write their own cache keys on Python dicts,
|
|
which is atomic under the GIL for single-key assignments.
|
|
"""
|
|
enhanced_plugin = plugin.copy()
|
|
repo_url = plugin.get('repo', '')
|
|
if not repo_url:
|
|
return enhanced_plugin
|
|
|
|
github_info = self._get_github_repo_info(repo_url)
|
|
enhanced_plugin['stars'] = github_info.get('stars', plugin.get('stars', 0))
|
|
enhanced_plugin['default_branch'] = github_info.get('default_branch', plugin.get('branch', 'main'))
|
|
enhanced_plugin['last_updated_iso'] = github_info.get('last_commit_iso')
|
|
enhanced_plugin['last_updated'] = github_info.get('last_commit_date')
|
|
|
|
if fetch_commit_info:
|
|
branch = plugin.get('branch') or github_info.get('default_branch', 'main')
|
|
|
|
commit_info = self._get_latest_commit_info(repo_url, branch)
|
|
if commit_info:
|
|
enhanced_plugin['last_commit'] = commit_info.get('short_sha')
|
|
enhanced_plugin['last_commit_sha'] = commit_info.get('sha')
|
|
enhanced_plugin['last_updated'] = commit_info.get('date') or enhanced_plugin.get('last_updated')
|
|
enhanced_plugin['last_updated_iso'] = commit_info.get('date_iso') or enhanced_plugin.get('last_updated_iso')
|
|
enhanced_plugin['last_commit_message'] = commit_info.get('message')
|
|
enhanced_plugin['last_commit_author'] = commit_info.get('author')
|
|
enhanced_plugin['branch'] = commit_info.get('branch', branch)
|
|
enhanced_plugin['last_commit_branch'] = commit_info.get('branch')
|
|
|
|
# Intentionally NO per-plugin manifest.json fetch here.
|
|
# The registry's plugins.json already carries ``description``
|
|
# (it is generated from each plugin's manifest by
|
|
# ``update_registry.py``), and ``last_updated`` is filled in
|
|
# from the commit info above. An earlier implementation
|
|
# fetched manifest.json per plugin anyway, which meant one
|
|
# extra HTTPS round trip per result; on a Pi4 with a flaky
|
|
# WiFi link the tail retries of that one extra call
|
|
# (_http_get_with_retries does 3 attempts with exponential
|
|
# backoff) dominated wall time even after parallelization.
|
|
|
|
return enhanced_plugin
|
|
|
|
# Fan out the per-plugin GitHub enrichment. The previous
|
|
# implementation did this serially, which on a Pi4 with ~15 plugins
|
|
# and a fresh cache meant 30+ HTTP requests in strict sequence (the
|
|
# "connecting to display" hang reported by users). With a thread
|
|
# pool, latency is dominated by the slowest request rather than
|
|
# their sum. Workers capped at 10 to stay well under the
|
|
# unauthenticated GitHub rate limit burst and avoid overwhelming a
|
|
# Pi's WiFi link. For a small number of plugins the pool is
|
|
# essentially free.
|
|
if not filtered:
|
|
return []
|
|
|
|
# Not worth the pool overhead for tiny workloads. Parenthesized to
|
|
# make Python's default ``and`` > ``or`` precedence explicit: a
|
|
# single plugin, OR a small batch where we don't need commit info.
|
|
if (len(filtered) == 1) or ((not fetch_commit_info) and (len(filtered) < 4)):
|
|
return [_enrich(p) for p in filtered]
|
|
|
|
max_workers = min(10, len(filtered))
|
|
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='plugin-search') as executor:
|
|
# executor.map preserves input order, which the UI relies on.
|
|
return list(executor.map(_enrich, filtered))
|
|
|
|
def _fetch_manifest_from_github(self, repo_url: str, branch: str = "master", manifest_path: str = "manifest.json", force_refresh: bool = False) -> Optional[Dict]:
|
|
"""
|
|
Fetch manifest.json directly from a GitHub repository.
|
|
|
|
Args:
|
|
repo_url: GitHub repository URL
|
|
branch: Branch name (default: master)
|
|
manifest_path: Path to manifest within the repo (default: manifest.json).
|
|
For monorepo plugins this will be e.g. "plugins/football-scoreboard/manifest.json".
|
|
force_refresh: If True, bypass the cache.
|
|
|
|
Returns:
|
|
Manifest data or None if not found
|
|
"""
|
|
try:
|
|
# Convert repo URL to raw content URL
|
|
# https://github.com/user/repo -> https://raw.githubusercontent.com/user/repo/branch/manifest.json
|
|
_parsed_manifest_url = urlparse(repo_url)
|
|
if _parsed_manifest_url.hostname in ('github.com', 'www.github.com'):
|
|
# Handle different URL formats
|
|
repo_url = repo_url.rstrip('/')
|
|
if repo_url.endswith('.git'):
|
|
repo_url = repo_url[:-4]
|
|
|
|
parts = repo_url.split('/')
|
|
if len(parts) >= 2:
|
|
owner = parts[-2]
|
|
repo = parts[-1]
|
|
|
|
# Check cache first
|
|
cache_key = f"{owner}/{repo}:{branch}:{manifest_path}"
|
|
if not force_refresh and cache_key in self.manifest_cache:
|
|
cached_time, cached_data = self.manifest_cache[cache_key]
|
|
if time.time() - cached_time < self.manifest_cache_timeout:
|
|
return cached_data
|
|
|
|
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{manifest_path}"
|
|
|
|
response = self._http_get_with_retries(raw_url, timeout=10)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
self.manifest_cache[cache_key] = (time.time(), result)
|
|
return result
|
|
elif response.status_code == 404:
|
|
# Try main branch instead
|
|
if branch != "main":
|
|
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/main/{manifest_path}"
|
|
response = self._http_get_with_retries(raw_url, timeout=10)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
self.manifest_cache[cache_key] = (time.time(), result)
|
|
return result
|
|
|
|
# Cache negative result
|
|
self.manifest_cache[cache_key] = (time.time(), None)
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not fetch manifest from GitHub for {repo_url}: {e}")
|
|
|
|
return None
|
|
|
|
def _get_latest_commit_info(self, repo_url: str, branch: str = "main", force_refresh: bool = False) -> Optional[Dict[str, Any]]:
|
|
"""Return metadata about the latest commit on the given branch."""
|
|
try:
|
|
if 'github.com' not in repo_url:
|
|
return None
|
|
|
|
repo_url = repo_url.rstrip('/')
|
|
if repo_url.endswith('.git'):
|
|
repo_url = repo_url[:-4]
|
|
|
|
parts = repo_url.split('/')
|
|
if len(parts) < 2:
|
|
return None
|
|
|
|
owner = parts[-2]
|
|
repo = parts[-1]
|
|
|
|
# Check cache first
|
|
cache_key = f"{owner}/{repo}:{branch}"
|
|
if not force_refresh and cache_key in self.commit_info_cache:
|
|
cached_time, cached_data = self.commit_info_cache[cache_key]
|
|
if time.time() - cached_time < self.commit_cache_timeout:
|
|
return cached_data
|
|
|
|
branches_to_try = self._distinct_sequence([branch, 'main', 'master'])
|
|
|
|
headers = {
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
'User-Agent': 'LEDMatrix-Plugin-Manager/1.0'
|
|
}
|
|
|
|
if self.github_token:
|
|
headers['Authorization'] = f'token {self.github_token}'
|
|
|
|
last_error = None
|
|
for branch_name in branches_to_try:
|
|
api_url = f"https://api.github.com/repos/{owner}/{repo}/commits/{branch_name}"
|
|
try:
|
|
response = requests.get(api_url, headers=headers, timeout=10)
|
|
except requests.RequestException as req_err:
|
|
# Network failure: fall back to a stale cache hit if
|
|
# available so the plugin store UI keeps populating
|
|
# commit info on a flaky WiFi link. Bump the cached
|
|
# timestamp into the backoff window so we don't
|
|
# re-retry on every request.
|
|
if cache_key in self.commit_info_cache:
|
|
_, stale = self.commit_info_cache[cache_key]
|
|
if stale is not None:
|
|
self._record_cache_backoff(
|
|
self.commit_info_cache, cache_key,
|
|
self.commit_cache_timeout, stale,
|
|
)
|
|
self.logger.warning(
|
|
"GitHub commit fetch failed for %s (%s); serving stale cache.",
|
|
cache_key, req_err,
|
|
)
|
|
return stale
|
|
last_error = str(req_err)
|
|
continue
|
|
if response.status_code == 200:
|
|
commit_data = response.json()
|
|
commit_sha_full = commit_data.get('sha', '')
|
|
commit_sha_short = commit_sha_full[:7] if commit_sha_full else ''
|
|
commit_meta = commit_data.get('commit', {})
|
|
commit_author = commit_meta.get('author', {})
|
|
commit_date_iso = commit_author.get('date', '')
|
|
|
|
result = {
|
|
'branch': branch_name,
|
|
'sha': commit_sha_full,
|
|
'short_sha': commit_sha_short,
|
|
'date_iso': commit_date_iso,
|
|
'date': self._iso_to_date(commit_date_iso),
|
|
'author': commit_author.get('name', ''),
|
|
'message': commit_meta.get('message', ''),
|
|
}
|
|
self.commit_info_cache[cache_key] = (time.time(), result)
|
|
return result
|
|
|
|
if response.status_code == 403 and not self.github_token:
|
|
self.logger.debug("GitHub commit API rate limited (403). Consider adding a token.")
|
|
last_error = response.text
|
|
else:
|
|
last_error = response.text
|
|
|
|
if last_error:
|
|
self.logger.debug(f"Unable to fetch commit info for {repo_url}: {last_error}")
|
|
|
|
# All branches returned a non-200 response (e.g. 404 on every
|
|
# candidate, or a transient 5xx). If we already had a good
|
|
# cached value, prefer serving that — overwriting it with
|
|
# None here would wipe out commit info the UI just showed
|
|
# on the previous request. Bump the timestamp into the
|
|
# backoff window so subsequent lookups hit the cache.
|
|
if cache_key in self.commit_info_cache:
|
|
_, prior = self.commit_info_cache[cache_key]
|
|
if prior is not None:
|
|
self._record_cache_backoff(
|
|
self.commit_info_cache, cache_key,
|
|
self.commit_cache_timeout, prior,
|
|
)
|
|
return prior
|
|
|
|
# No prior good value — cache the negative result so we don't
|
|
# hammer a plugin that genuinely has no reachable commits.
|
|
self.commit_info_cache[cache_key] = (time.time(), None)
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"Error fetching latest commit metadata for {repo_url}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def get_plugin_info(self, plugin_id: str, fetch_latest_from_github: bool = True, force_refresh: bool = False) -> Optional[Dict]:
|
|
"""
|
|
Get detailed information about a plugin from the registry.
|
|
|
|
GitHub provides authoritative metadata such as stars and the latest
|
|
commit. The registry supplies descriptive information (name, id, repo URL).
|
|
|
|
Args:
|
|
plugin_id: Plugin identifier
|
|
fetch_latest_from_github: If True (default), augment with GitHub commit metadata.
|
|
force_refresh: If True, bypass caches for commit/manifest data.
|
|
|
|
Returns:
|
|
Plugin metadata or None if not found
|
|
"""
|
|
registry = self.fetch_registry()
|
|
plugins = registry.get('plugins', []) or []
|
|
plugin_info = next((p for p in plugins if p['id'] == plugin_id), None)
|
|
|
|
if not plugin_info:
|
|
return None
|
|
|
|
if fetch_latest_from_github:
|
|
repo_url = plugin_info.get('repo')
|
|
if repo_url:
|
|
plugin_info = plugin_info.copy()
|
|
|
|
github_info = self._get_github_repo_info(repo_url)
|
|
branch = plugin_info.get('branch') or github_info.get('default_branch', 'main')
|
|
|
|
plugin_info['default_branch'] = github_info.get('default_branch', branch)
|
|
plugin_info['stars'] = github_info.get('stars', plugin_info.get('stars', 0))
|
|
plugin_info['last_updated'] = github_info.get('last_commit_date', plugin_info.get('last_updated'))
|
|
plugin_info['last_updated_iso'] = github_info.get('last_commit_iso', plugin_info.get('last_updated_iso'))
|
|
|
|
commit_info = self._get_latest_commit_info(repo_url, branch, force_refresh=force_refresh)
|
|
if commit_info:
|
|
plugin_info['last_commit'] = commit_info.get('short_sha')
|
|
plugin_info['last_commit_sha'] = commit_info.get('sha')
|
|
plugin_info['last_commit_message'] = commit_info.get('message')
|
|
plugin_info['last_commit_author'] = commit_info.get('author')
|
|
plugin_info['last_updated'] = commit_info.get('date') or plugin_info.get('last_updated')
|
|
plugin_info['last_updated_iso'] = commit_info.get('date_iso') or plugin_info.get('last_updated_iso')
|
|
plugin_info['branch'] = commit_info.get('branch', branch)
|
|
plugin_info['last_commit_branch'] = commit_info.get('branch')
|
|
|
|
plugin_subpath = plugin_info.get('plugin_path', '')
|
|
manifest_rel = f"{plugin_subpath}/manifest.json" if plugin_subpath else "manifest.json"
|
|
github_manifest = self._fetch_manifest_from_github(repo_url, branch, manifest_rel, force_refresh=force_refresh)
|
|
if github_manifest:
|
|
if 'last_updated' in github_manifest and not plugin_info.get('last_updated'):
|
|
plugin_info['last_updated'] = github_manifest['last_updated']
|
|
if 'description' in github_manifest:
|
|
plugin_info['description'] = github_manifest['description']
|
|
|
|
return plugin_info
|
|
|
|
def get_registry_info(self, plugin_id: str) -> Optional[Dict]:
|
|
"""
|
|
Get plugin information from the registry cache only (no GitHub API calls).
|
|
|
|
Use this for lightweight lookups where only registry fields are needed
|
|
(e.g., verified status, latest_version).
|
|
|
|
Args:
|
|
plugin_id: Plugin identifier
|
|
|
|
Returns:
|
|
Plugin metadata from registry or None if not found
|
|
"""
|
|
registry = self.fetch_registry()
|
|
plugins = registry.get('plugins', []) or []
|
|
return next((p for p in plugins if p.get('id') == plugin_id), None)
|
|
|
|
def install_plugin(self, plugin_id: str, branch: Optional[str] = None) -> bool:
|
|
"""
|
|
Install a plugin from the official registry. Always installs the latest commit
|
|
from the repository's default branch (or specified branch).
|
|
|
|
Args:
|
|
plugin_id: Plugin identifier
|
|
branch: Optional branch name to install from. If provided, this branch will be
|
|
prioritized. If not provided or branch doesn't exist, falls back to
|
|
default branch logic.
|
|
"""
|
|
branch_info = f" (branch: {branch})" if branch else " (latest branch head)"
|
|
self.logger.info(f"Installing plugin: {plugin_id}{branch_info}")
|
|
|
|
plugin_info = self.get_plugin_info(plugin_id, fetch_latest_from_github=True, force_refresh=True)
|
|
if not plugin_info:
|
|
self.logger.error(f"Plugin not found in registry: {plugin_id}")
|
|
return False
|
|
|
|
repo_url = plugin_info.get('repo')
|
|
if not repo_url:
|
|
self.logger.error(f"Plugin {plugin_id} missing repository URL")
|
|
return False
|
|
|
|
plugin_subpath = plugin_info.get('plugin_path')
|
|
# If branch is provided, prioritize it; otherwise use default logic
|
|
branch_candidates = self._distinct_sequence([
|
|
branch, # User-specified branch takes highest priority
|
|
plugin_info.get('branch'),
|
|
plugin_info.get('default_branch'),
|
|
plugin_info.get('last_commit_branch'),
|
|
'main',
|
|
'master'
|
|
])
|
|
|
|
# Use manifest ID for directory name (not registry plugin_id) to ensure consistency
|
|
# We'll read the manifest after installation to get the actual ID
|
|
# For now, use plugin_id but we'll correct it after reading manifest
|
|
plugin_path = self.plugins_dir / plugin_id
|
|
if plugin_path.exists():
|
|
self.logger.warning(f"Plugin directory already exists: {plugin_id}. Removing it before reinstall.")
|
|
if not self._safe_remove_directory(plugin_path):
|
|
self.logger.error(f"Failed to remove existing plugin directory: {plugin_path}")
|
|
return False
|
|
|
|
try:
|
|
branch_used = None
|
|
|
|
if plugin_subpath:
|
|
self.logger.info(f"Installing from monorepo subdirectory: {plugin_subpath}")
|
|
for candidate in branch_candidates:
|
|
download_url = f"{repo_url}/archive/refs/heads/{candidate}.zip"
|
|
if self._install_from_monorepo(download_url, plugin_subpath, plugin_path):
|
|
branch_used = candidate
|
|
break
|
|
|
|
if branch_used is None:
|
|
self.logger.error(f"Failed to install plugin from monorepo path {plugin_subpath} for {plugin_id}")
|
|
return False
|
|
else:
|
|
branch_used = self._install_via_git(repo_url, plugin_path, branch_candidates)
|
|
if branch_used is None and not plugin_path.exists():
|
|
# Git failed entirely; fall back to zip download
|
|
self.logger.info("Git not available or clone failed, attempting archive download...")
|
|
for candidate in branch_candidates:
|
|
download_url = f"{repo_url}/archive/refs/heads/{candidate}.zip"
|
|
if self._install_via_download(download_url, plugin_path):
|
|
branch_used = candidate
|
|
break
|
|
|
|
if branch_used is None and not plugin_path.exists():
|
|
self.logger.error(f"Failed to install plugin {plugin_id} via git or archive download")
|
|
return False
|
|
|
|
manifest_path = plugin_path / "manifest.json"
|
|
if not manifest_path.exists():
|
|
self.logger.error(f"No manifest.json found in plugin: {plugin_id}")
|
|
self._safe_remove_directory(plugin_path)
|
|
return False
|
|
|
|
try:
|
|
with open(manifest_path, 'r', encoding='utf-8') as mf:
|
|
manifest = json.load(mf)
|
|
|
|
# Get the actual plugin ID from manifest (source of truth)
|
|
manifest_plugin_id = manifest.get('id')
|
|
if not manifest_plugin_id:
|
|
self.logger.error("Plugin manifest missing 'id' field")
|
|
self._safe_remove_directory(plugin_path)
|
|
return False
|
|
|
|
# If manifest ID doesn't match directory name, rename directory to match manifest
|
|
if manifest_plugin_id != plugin_id:
|
|
self.logger.warning(
|
|
f"Manifest ID '{manifest_plugin_id}' doesn't match registry ID '{plugin_id}'. "
|
|
f"Renaming directory to match manifest ID."
|
|
)
|
|
correct_path = self.plugins_dir / manifest_plugin_id
|
|
if correct_path.exists():
|
|
self.logger.warning(f"Target directory {manifest_plugin_id} already exists, removing it")
|
|
if not self._safe_remove_directory(correct_path):
|
|
self.logger.error(f"Failed to remove existing directory {correct_path}, cannot rename plugin")
|
|
return False
|
|
shutil.move(str(plugin_path), str(correct_path))
|
|
plugin_path = correct_path
|
|
manifest_path = plugin_path / "manifest.json"
|
|
# Update plugin_id to match manifest for rest of function
|
|
plugin_id = manifest_plugin_id
|
|
|
|
required_fields = ['id', 'name', 'class_name', 'display_modes']
|
|
missing = [field for field in required_fields if field not in manifest]
|
|
|
|
manifest_modified = False
|
|
|
|
if 'class_name' in missing:
|
|
entry_point = manifest.get('entry_point', 'manager.py')
|
|
manager_file = plugin_path / entry_point
|
|
if manager_file.exists():
|
|
try:
|
|
detected_class = self._detect_class_name(manager_file)
|
|
if detected_class:
|
|
manifest['class_name'] = detected_class
|
|
missing.remove('class_name')
|
|
manifest_modified = True
|
|
self.logger.info(f"Auto-detected class_name '{detected_class}' from {entry_point}")
|
|
except Exception as err:
|
|
self.logger.warning(f"Could not auto-detect class_name for {plugin_id}: {err}")
|
|
|
|
if missing:
|
|
self.logger.error(f"Plugin manifest missing required fields for {plugin_id}: {', '.join(missing)}")
|
|
self._safe_remove_directory(plugin_path)
|
|
return False
|
|
|
|
if 'entry_point' not in manifest:
|
|
manifest['entry_point'] = 'manager.py'
|
|
manifest_modified = True
|
|
self.logger.info(f"Added missing entry_point field to {plugin_id} manifest (defaulted to manager.py)")
|
|
|
|
if manifest_modified:
|
|
with open(manifest_path, 'w', encoding='utf-8') as mf:
|
|
json.dump(manifest, mf, indent=2)
|
|
|
|
except Exception as manifest_error:
|
|
self.logger.error(f"Failed to read/validate manifest for {plugin_id}: {manifest_error}")
|
|
self._safe_remove_directory(plugin_path)
|
|
return False
|
|
|
|
if not self._install_dependencies(plugin_path):
|
|
self.logger.warning(f"Some dependencies may not have installed correctly for {plugin_id}")
|
|
|
|
branch_display = branch_used or plugin_info.get('branch') or plugin_info.get('default_branch', 'unknown')
|
|
self.logger.info(f"Successfully installed plugin: {plugin_id} (branch {branch_display})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error installing plugin {plugin_id}: {e}", exc_info=True)
|
|
if plugin_path.exists():
|
|
self._safe_remove_directory(plugin_path)
|
|
return False
|
|
|
|
def install_from_url(self, repo_url: str, plugin_id: str = None, plugin_path: str = None, branch: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Install a plugin directly from a GitHub URL.
|
|
This allows users to install custom/unverified plugins.
|
|
|
|
Supports two installation modes:
|
|
1. Direct plugin repo: Repository contains a single plugin with manifest.json at root
|
|
2. Monorepo with plugin_path: Repository contains multiple plugins, install from subdirectory
|
|
|
|
Args:
|
|
repo_url: GitHub repository URL (e.g., https://github.com/user/repo)
|
|
plugin_id: Optional plugin ID (extracted from manifest if not provided)
|
|
plugin_path: Optional subdirectory path for monorepo installations (e.g., "plugins/hello-world")
|
|
branch: Optional branch name to install from. If provided, this branch will be
|
|
prioritized. If not provided or branch doesn't exist, falls back to
|
|
default branch logic (main, then master).
|
|
|
|
Returns:
|
|
Dict with status and plugin_id or error message
|
|
"""
|
|
branch_info = f" (branch: {branch})" if branch else ""
|
|
self.logger.info(f"Installing plugin from custom URL: {repo_url}{branch_info}" + (f" (subpath: {plugin_path})" if plugin_path else ""))
|
|
|
|
# Clean up URL (remove .git suffix if present)
|
|
repo_url = repo_url.rstrip('/').replace('.git', '')
|
|
|
|
temp_dir = None
|
|
try:
|
|
# Create temporary directory
|
|
temp_dir = Path(tempfile.mkdtemp(prefix='ledmatrix_plugin_'))
|
|
|
|
# Build branch candidates list - prioritize user-specified branch
|
|
branch_candidates = self._distinct_sequence([branch, 'main', 'master']) if branch else ['main', 'master']
|
|
|
|
# For monorepo installations, download and extract subdirectory
|
|
if plugin_path:
|
|
branch_used = None
|
|
for candidate in branch_candidates:
|
|
download_url = f"{repo_url}/archive/refs/heads/{candidate}.zip"
|
|
if self._install_from_monorepo(download_url, plugin_path, temp_dir):
|
|
branch_used = candidate
|
|
break
|
|
|
|
if branch_used is None:
|
|
return {
|
|
'success': False,
|
|
'error': f'Failed to download or extract plugin from monorepo subdirectory: {plugin_path}'
|
|
}
|
|
else:
|
|
# Try git clone for direct plugin repos
|
|
branch_used = self._install_via_git(repo_url, temp_dir, branch_candidates)
|
|
if branch_used:
|
|
self.logger.info(f"Cloned via git (branch: {branch_used})")
|
|
else:
|
|
# Git failed; try downloading as zip
|
|
branch_used = None
|
|
for candidate in branch_candidates:
|
|
download_url = f"{repo_url}/archive/refs/heads/{candidate}.zip"
|
|
if self._install_via_download(download_url, temp_dir):
|
|
branch_used = candidate
|
|
break
|
|
|
|
if branch_used is None:
|
|
return {
|
|
'success': False,
|
|
'error': 'Failed to clone or download repository'
|
|
}
|
|
|
|
# Read manifest to get plugin ID
|
|
manifest_path = temp_dir / "manifest.json"
|
|
if not manifest_path.exists():
|
|
return {
|
|
'success': False,
|
|
'error': 'No manifest.json found in repository' + (f' at path: {plugin_path}' if plugin_path else '')
|
|
}
|
|
|
|
with open(manifest_path, 'r') as f:
|
|
manifest = json.load(f)
|
|
|
|
plugin_id = plugin_id or manifest.get('id')
|
|
if not plugin_id:
|
|
return {
|
|
'success': False,
|
|
'error': 'No plugin ID found in manifest'
|
|
}
|
|
|
|
# Validate manifest has required fields
|
|
required_fields = ['id', 'name', 'class_name', 'display_modes']
|
|
missing_fields = [field for field in required_fields if field not in manifest]
|
|
if missing_fields:
|
|
return {
|
|
'success': False,
|
|
'error': f'Manifest missing required fields: {", ".join(missing_fields)}'
|
|
}
|
|
|
|
# Validate version fields consistency (warnings only, not required)
|
|
validation_errors = self._validate_manifest_version_fields(manifest)
|
|
if validation_errors:
|
|
self.logger.warning(f"Manifest version field validation warnings for {plugin_id}: {', '.join(validation_errors)}")
|
|
|
|
# Optional: Full schema validation if available
|
|
schema_errors = self._validate_manifest_schema(manifest, plugin_id)
|
|
if schema_errors:
|
|
self.logger.warning(f"Manifest schema validation warnings for {plugin_id}: {', '.join(schema_errors)}")
|
|
|
|
# entry_point is optional, default to "manager.py" if not specified
|
|
if 'entry_point' not in manifest:
|
|
manifest['entry_point'] = 'manager.py'
|
|
# Write updated manifest back to file
|
|
with open(manifest_path, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
self.logger.info(f"Added missing entry_point field to {plugin_id} manifest (defaulted to manager.py)")
|
|
|
|
# Move to plugins directory - use manifest ID as source of truth
|
|
# This ensures directory name always matches manifest ID
|
|
final_path = self.plugins_dir / plugin_id
|
|
if final_path.exists():
|
|
self.logger.warning(f"Plugin {plugin_id} already exists, removing existing copy")
|
|
if not self._safe_remove_directory(final_path):
|
|
return {
|
|
'success': False,
|
|
'error': f'Failed to remove existing plugin directory: {final_path}'
|
|
}
|
|
|
|
shutil.move(str(temp_dir), str(final_path))
|
|
temp_dir = None # Prevent cleanup since we moved it
|
|
|
|
# Note: plugin_id here is already from manifest (line 749), so directory name matches manifest ID
|
|
|
|
# Install dependencies
|
|
self._install_dependencies(final_path)
|
|
|
|
branch_info = f" (branch: {branch_used})" if branch_used else ""
|
|
self.logger.info(f"Successfully installed plugin from URL: {plugin_id}{branch_info}")
|
|
result = {
|
|
'success': True,
|
|
'plugin_id': plugin_id,
|
|
'name': manifest.get('name')
|
|
}
|
|
if branch_used:
|
|
result['branch'] = branch_used
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Error parsing manifest JSON: {e}")
|
|
return {
|
|
'success': False,
|
|
'error': f'Invalid manifest.json: {str(e)}'
|
|
}
|
|
except Exception as e:
|
|
self.logger.error(f"Error installing from URL: {e}", exc_info=True)
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
finally:
|
|
# Cleanup temp directory if it still exists
|
|
if temp_dir and temp_dir.exists():
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
def _detect_class_name(self, manager_file: Path) -> Optional[str]:
|
|
"""
|
|
Attempt to auto-detect the plugin class name from the manager file.
|
|
|
|
Args:
|
|
manager_file: Path to the manager.py file
|
|
|
|
Returns:
|
|
Class name if found, None otherwise
|
|
"""
|
|
try:
|
|
import re
|
|
with open(manager_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Look for class definition that inherits from BasePlugin
|
|
pattern = r'class\s+(\w+)\s*\([^)]*BasePlugin[^)]*\)'
|
|
match = re.search(pattern, content)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
# Fallback: find first class definition
|
|
pattern = r'^class\s+(\w+)'
|
|
match = re.search(pattern, content, re.MULTILINE)
|
|
if match:
|
|
return match.group(1)
|
|
|
|
return None
|
|
except Exception as e:
|
|
self.logger.warning(f"Error detecting class name from {manager_file}: {e}")
|
|
return None
|
|
|
|
def _install_via_git(self, repo_url: str, target_path: Path, branches: Optional[List[str]] = None) -> Optional[str]:
|
|
"""Clone a repository into ``target_path``. Returns the branch name on success."""
|
|
branches_to_try = self._distinct_sequence(branches or [])
|
|
if not branches_to_try:
|
|
branches_to_try = ['main', 'master']
|
|
|
|
last_error = None
|
|
for try_branch in branches_to_try:
|
|
try:
|
|
cmd = ['git', 'clone', '--depth', '1', '--branch', try_branch, repo_url, str(target_path)]
|
|
subprocess.run(
|
|
cmd,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60
|
|
)
|
|
self.logger.debug(f"Successfully cloned {repo_url} (branch: {try_branch}) to {target_path}")
|
|
return try_branch
|
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as e:
|
|
last_error = e
|
|
self.logger.debug(f"Git clone failed for branch {try_branch}: {e}")
|
|
if target_path.exists():
|
|
self._safe_remove_directory(target_path)
|
|
|
|
# Try default branch (Git's configured default) as last resort
|
|
try:
|
|
cmd = ['git', 'clone', '--depth', '1', repo_url, str(target_path)]
|
|
subprocess.run(
|
|
cmd,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60
|
|
)
|
|
self.logger.debug(f"Successfully cloned {repo_url} (git default branch) to {target_path}")
|
|
return None # Unknown branch name, git default used
|
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as e:
|
|
last_error = e
|
|
if target_path.exists():
|
|
self._safe_remove_directory(target_path)
|
|
|
|
self.logger.error(f"Git clone failed for all attempted branches: {last_error}")
|
|
return None
|
|
|
|
def _install_from_monorepo(self, download_url: str, plugin_subpath: str, target_path: Path) -> bool:
|
|
"""
|
|
Install a plugin from a monorepo by downloading only the target subdirectory.
|
|
|
|
Uses the GitHub Git Trees API to list files, then downloads each file
|
|
individually from raw.githubusercontent.com. Falls back to downloading
|
|
the full ZIP archive if the API approach fails.
|
|
|
|
Args:
|
|
download_url: URL to download zip from (used as fallback and to extract repo info)
|
|
plugin_subpath: Path within repo (e.g., "plugins/hello-world")
|
|
target_path: Target directory for plugin
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
# Try the API-based approach first (downloads only the target directory)
|
|
repo_url, branch = self._parse_monorepo_download_url(download_url)
|
|
if repo_url and branch:
|
|
result = self._install_from_monorepo_api(repo_url, branch, plugin_subpath, target_path)
|
|
if result:
|
|
return True
|
|
self.logger.info(f"API-based install failed for {plugin_subpath}, falling back to ZIP download")
|
|
# Ensure no partial files remain before ZIP fallback
|
|
if target_path.exists():
|
|
self._safe_remove_directory(target_path)
|
|
|
|
# Fallback: download full ZIP and extract subdirectory
|
|
return self._install_from_monorepo_zip(download_url, plugin_subpath, target_path)
|
|
|
|
@staticmethod
|
|
def _parse_monorepo_download_url(download_url: str):
|
|
"""Extract repo URL and branch from a GitHub archive download URL.
|
|
|
|
Example: "https://github.com/ChuckBuilds/ledmatrix-plugins/archive/refs/heads/main.zip"
|
|
Returns: ("https://github.com/ChuckBuilds/ledmatrix-plugins", "main")
|
|
"""
|
|
try:
|
|
# Pattern: {repo_url}/archive/refs/heads/{branch}.zip
|
|
if '/archive/refs/heads/' in download_url:
|
|
parts = download_url.split('/archive/refs/heads/')
|
|
repo_url = parts[0]
|
|
branch = parts[1].removesuffix('.zip')
|
|
return repo_url, branch
|
|
except (IndexError, AttributeError):
|
|
pass
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def _normalize_repo_url(url: str) -> str:
|
|
"""Normalize a GitHub repo URL for comparison (strip trailing / and .git)."""
|
|
url = url.rstrip('/')
|
|
if url.endswith('.git'):
|
|
url = url[:-4]
|
|
return url.lower()
|
|
|
|
def _install_from_monorepo_api(self, repo_url: str, branch: str, plugin_subpath: str, target_path: Path) -> bool:
|
|
"""
|
|
Install a plugin subdirectory using the GitHub Git Trees API.
|
|
|
|
Downloads only the files in the target subdirectory (~200KB) instead
|
|
of the entire repository ZIP (~5MB+). Uses one API call for the tree
|
|
listing, then downloads individual files from raw.githubusercontent.com.
|
|
|
|
Args:
|
|
repo_url: GitHub repository URL (e.g., "https://github.com/owner/repo")
|
|
branch: Branch name (e.g., "main")
|
|
plugin_subpath: Path within repo (e.g., "plugins/hello-world")
|
|
target_path: Target directory for plugin
|
|
|
|
Returns:
|
|
True if successful, False to trigger ZIP fallback
|
|
"""
|
|
try:
|
|
# Parse owner/repo from URL
|
|
clean_url = repo_url.rstrip('/')
|
|
if clean_url.endswith('.git'):
|
|
clean_url = clean_url[:-4]
|
|
parts = clean_url.split('/')
|
|
if len(parts) < 2:
|
|
return False
|
|
owner, repo = parts[-2], parts[-1]
|
|
|
|
# Step 1: Get the recursive tree listing (1 API call)
|
|
api_url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=true"
|
|
headers = {
|
|
'Accept': 'application/vnd.github.v3+json',
|
|
'User-Agent': 'LEDMatrix-Plugin-Manager/1.0'
|
|
}
|
|
if self.github_token:
|
|
headers['Authorization'] = f'token {self.github_token}'
|
|
|
|
tree_response = self._http_get_with_retries(api_url, timeout=15, headers=headers)
|
|
if tree_response.status_code != 200:
|
|
self.logger.debug(f"Trees API returned {tree_response.status_code} for {owner}/{repo}")
|
|
return False
|
|
|
|
tree_data = tree_response.json()
|
|
if tree_data.get('truncated'):
|
|
self.logger.debug(f"Tree response truncated for {owner}/{repo}, falling back to ZIP")
|
|
return False
|
|
|
|
# Step 2: Filter for files in the target subdirectory
|
|
prefix = f"{plugin_subpath.strip('/')}/"
|
|
file_entries = [
|
|
entry for entry in tree_data.get('tree', [])
|
|
if entry['path'].startswith(prefix) and entry['type'] == 'blob'
|
|
]
|
|
|
|
if not file_entries:
|
|
self.logger.error(f"No files found under '{plugin_subpath}' in tree for {owner}/{repo}")
|
|
return False
|
|
|
|
# Sanity check: refuse unreasonably large plugin directories
|
|
max_files = 500
|
|
if len(file_entries) > max_files:
|
|
self.logger.error(
|
|
f"Plugin {plugin_subpath} has {len(file_entries)} files (limit {max_files}), "
|
|
f"falling back to ZIP"
|
|
)
|
|
return False
|
|
|
|
self.logger.info(f"Downloading {len(file_entries)} files for {plugin_subpath} via API")
|
|
|
|
# Step 3: Create target directory and download each file
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_plugin_dir_mode
|
|
)
|
|
ensure_directory_permissions(target_path.parent, get_plugin_dir_mode())
|
|
target_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
prefix_len = len(prefix)
|
|
target_root = target_path.resolve()
|
|
for entry in file_entries:
|
|
# Relative path within the plugin directory
|
|
rel_path = entry['path'][prefix_len:]
|
|
dest_file = target_path / rel_path
|
|
|
|
# Guard against path traversal
|
|
if not dest_file.resolve().is_relative_to(target_root):
|
|
self.logger.error(
|
|
f"Path traversal detected: {entry['path']!r} resolves outside target directory"
|
|
)
|
|
if target_path.exists():
|
|
self._safe_remove_directory(target_path)
|
|
return False
|
|
|
|
# Create parent directories
|
|
dest_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Download from raw.githubusercontent.com (no API rate limit cost)
|
|
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{entry['path']}"
|
|
file_response = self._http_get_with_retries(raw_url, timeout=30)
|
|
if file_response.status_code != 200:
|
|
self.logger.error(f"Failed to download {entry['path']}: HTTP {file_response.status_code}")
|
|
# Clean up partial download
|
|
if target_path.exists():
|
|
self._safe_remove_directory(target_path)
|
|
return False
|
|
|
|
dest_file.write_bytes(file_response.content)
|
|
|
|
self.logger.info(f"Successfully installed {plugin_subpath} via API ({len(file_entries)} files)")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"API-based monorepo install failed: {e}")
|
|
# Clean up partial download
|
|
if target_path.exists():
|
|
self._safe_remove_directory(target_path)
|
|
return False
|
|
|
|
def _install_from_monorepo_zip(self, download_url: str, plugin_subpath: str, target_path: Path) -> bool:
|
|
"""
|
|
Fallback: install a plugin from a monorepo by downloading the full ZIP.
|
|
|
|
Used when the API-based approach fails (rate limited, auth issues, etc.).
|
|
"""
|
|
tmp_zip_path = None
|
|
temp_extract = None
|
|
try:
|
|
self.logger.info(f"Downloading monorepo ZIP from: {download_url}")
|
|
response = self._http_get_with_retries(download_url, timeout=60, stream=True)
|
|
response.raise_for_status()
|
|
|
|
# Download to temporary file
|
|
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp_file:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
tmp_file.write(chunk)
|
|
tmp_zip_path = tmp_file.name
|
|
|
|
with zipfile.ZipFile(tmp_zip_path, 'r') as zip_ref:
|
|
zip_contents = zip_ref.namelist()
|
|
if not zip_contents:
|
|
return False
|
|
|
|
root_dir = zip_contents[0].split('/')[0]
|
|
plugin_prefix = f"{root_dir}/{plugin_subpath}/"
|
|
|
|
# Extract ONLY files under the plugin subdirectory
|
|
plugin_members = [m for m in zip_contents if m.startswith(plugin_prefix)]
|
|
|
|
if not plugin_members:
|
|
self.logger.error(f"Plugin path not found in archive: {plugin_subpath}")
|
|
return False
|
|
|
|
temp_extract = Path(tempfile.mkdtemp())
|
|
temp_extract_resolved = temp_extract.resolve()
|
|
|
|
for member in plugin_members:
|
|
# Guard against zip-slip (directory traversal)
|
|
member_dest = (temp_extract / member).resolve()
|
|
if not member_dest.is_relative_to(temp_extract_resolved):
|
|
self.logger.error(
|
|
f"Zip-slip detected: member {member!r} resolves outside "
|
|
f"temp directory, aborting"
|
|
)
|
|
shutil.rmtree(temp_extract, ignore_errors=True)
|
|
return False
|
|
zip_ref.extract(member, temp_extract)
|
|
|
|
source_plugin_dir = temp_extract / root_dir / plugin_subpath
|
|
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_plugin_dir_mode
|
|
)
|
|
ensure_directory_permissions(target_path.parent, get_plugin_dir_mode())
|
|
# Ensure target doesn't exist to prevent shutil.move nesting
|
|
if target_path.exists():
|
|
if not self._safe_remove_directory(target_path):
|
|
self.logger.error(f"Cannot remove existing target {target_path} for monorepo install")
|
|
return False
|
|
shutil.move(str(source_plugin_dir), str(target_path))
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Monorepo ZIP download failed: {e}", exc_info=True)
|
|
return False
|
|
finally:
|
|
if tmp_zip_path and os.path.exists(tmp_zip_path):
|
|
os.remove(tmp_zip_path)
|
|
if temp_extract and temp_extract.exists():
|
|
shutil.rmtree(temp_extract, ignore_errors=True)
|
|
|
|
def _install_via_download(self, download_url: str, target_path: Path) -> bool:
|
|
"""
|
|
Install plugin by downloading and extracting zip archive.
|
|
|
|
Args:
|
|
download_url: URL to download zip from
|
|
target_path: Target directory
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
try:
|
|
self.logger.info(f"Downloading from: {download_url}")
|
|
# Allow redirects (GitHub archive URLs redirect to codeload.github.com)
|
|
response = self._http_get_with_retries(download_url, timeout=60, stream=True, headers={'User-Agent': 'LEDMatrix-Plugin-Manager/1.0'})
|
|
response.raise_for_status()
|
|
|
|
# Download to temporary file
|
|
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp_file:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
tmp_file.write(chunk)
|
|
tmp_zip_path = tmp_file.name
|
|
|
|
try:
|
|
# Extract zip
|
|
with zipfile.ZipFile(tmp_zip_path, 'r') as zip_ref:
|
|
# GitHub zips have a root directory, we need to extract contents
|
|
zip_contents = zip_ref.namelist()
|
|
if not zip_contents:
|
|
return False
|
|
|
|
# Find the root directory in the zip
|
|
root_dir = zip_contents[0].split('/')[0]
|
|
|
|
# Extract to temp location with zip-slip protection
|
|
temp_extract = Path(tempfile.mkdtemp())
|
|
temp_extract_resolved = temp_extract.resolve()
|
|
for member in zip_ref.namelist():
|
|
member_dest = (temp_extract / member).resolve()
|
|
if not member_dest.is_relative_to(temp_extract_resolved):
|
|
self.logger.error(
|
|
f"Zip-slip detected: member {member!r} resolves outside "
|
|
f"temp directory, aborting"
|
|
)
|
|
shutil.rmtree(temp_extract, ignore_errors=True)
|
|
return False
|
|
zip_ref.extractall(temp_extract)
|
|
|
|
# Move contents from root_dir to target
|
|
source_dir = temp_extract / root_dir
|
|
if source_dir.exists():
|
|
from src.common.permission_utils import (
|
|
ensure_directory_permissions,
|
|
get_plugin_dir_mode
|
|
)
|
|
ensure_directory_permissions(target_path.parent, get_plugin_dir_mode())
|
|
shutil.move(str(source_dir), str(target_path))
|
|
else:
|
|
# No root dir, move everything
|
|
shutil.move(str(temp_extract), str(target_path))
|
|
|
|
# Cleanup temp extract dir
|
|
if temp_extract.exists():
|
|
shutil.rmtree(temp_extract, ignore_errors=True)
|
|
|
|
return True
|
|
|
|
finally:
|
|
# Remove temporary zip file
|
|
if os.path.exists(tmp_zip_path):
|
|
os.remove(tmp_zip_path)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Download failed: {e}")
|
|
return False
|
|
|
|
def _install_dependencies(self, plugin_path: Path) -> bool:
|
|
"""
|
|
Install Python dependencies from requirements.txt.
|
|
|
|
Args:
|
|
plugin_path: Path to plugin directory
|
|
|
|
Returns:
|
|
True if successful or no requirements file
|
|
"""
|
|
requirements_file = plugin_path / "requirements.txt"
|
|
|
|
if not requirements_file.exists():
|
|
self.logger.debug(f"No requirements.txt found in {plugin_path.name}")
|
|
return True
|
|
|
|
try:
|
|
self.logger.info(f"Installing dependencies for {plugin_path.name}")
|
|
subprocess.run(
|
|
['pip3', 'install', '--break-system-packages', '-r', str(requirements_file)],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=300
|
|
)
|
|
self.logger.info(f"Dependencies installed successfully for {plugin_path.name}")
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
self.logger.error(f"Error installing dependencies: {e.stderr}")
|
|
return False
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.error("Dependency installation timed out")
|
|
return False
|
|
except (BrokenPipeError, OSError) as e:
|
|
# Handle broken pipe errors (errno 32) which can occur during pip downloads
|
|
# Often caused by network interruptions or output buffer issues
|
|
if isinstance(e, OSError) and e.errno == 32:
|
|
self.logger.error(
|
|
f"Broken pipe error during dependency installation for {plugin_path.name}. "
|
|
f"This usually indicates a network interruption or pip output buffer issue. "
|
|
f"Try installing again or check your network connection."
|
|
)
|
|
else:
|
|
self.logger.error(f"OS error during dependency installation: {e}")
|
|
return False
|
|
except Exception as e:
|
|
# Catch any other unexpected errors
|
|
self.logger.error(f"Unexpected error installing dependencies for {plugin_path.name}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def _git_cache_signature(self, git_dir: Path) -> Optional[Tuple]:
|
|
"""Build a cache signature that invalidates on the kind of updates
|
|
a plugin user actually cares about.
|
|
|
|
Caching on ``.git/HEAD`` mtime alone is not enough: a ``git pull``
|
|
that fast-forwards the current branch updates
|
|
``.git/refs/heads/<branch>`` (or ``.git/packed-refs``) but leaves
|
|
HEAD's contents and mtime untouched. And the cached ``result``
|
|
dict includes ``remote_url`` — a value read from ``.git/config`` —
|
|
so a config-only change (e.g. a monorepo-migration re-pointing
|
|
``remote.origin.url``) must also invalidate the cache.
|
|
|
|
Signature components:
|
|
- HEAD contents (catches detach / branch switch)
|
|
- HEAD mtime
|
|
- if HEAD points at a ref, that ref file's mtime (catches
|
|
fast-forward / reset on the current branch)
|
|
- packed-refs mtime as a coarse fallback for repos using packed refs
|
|
- .git/config contents + mtime (catches remote URL changes and
|
|
any other config-only edit that affects what the cached
|
|
``remote_url`` field should contain)
|
|
|
|
Returns ``None`` if HEAD cannot be read at all (caller will skip
|
|
the cache and take the slow path).
|
|
"""
|
|
head_file = git_dir / 'HEAD'
|
|
try:
|
|
head_mtime = head_file.stat().st_mtime
|
|
head_contents = head_file.read_text(encoding='utf-8', errors='replace').strip()
|
|
except OSError:
|
|
return None
|
|
|
|
ref_mtime = None
|
|
if head_contents.startswith('ref: '):
|
|
ref_path = head_contents[len('ref: '):].strip()
|
|
# ``ref_path`` looks like ``refs/heads/main``. It lives either
|
|
# as a loose file under .git/ or inside .git/packed-refs.
|
|
loose_ref = git_dir / ref_path
|
|
try:
|
|
ref_mtime = loose_ref.stat().st_mtime
|
|
except OSError:
|
|
ref_mtime = None
|
|
|
|
packed_refs_mtime = None
|
|
if ref_mtime is None:
|
|
try:
|
|
packed_refs_mtime = (git_dir / 'packed-refs').stat().st_mtime
|
|
except OSError:
|
|
packed_refs_mtime = None
|
|
|
|
config_mtime = None
|
|
config_contents = None
|
|
config_file = git_dir / 'config'
|
|
try:
|
|
config_mtime = config_file.stat().st_mtime
|
|
config_contents = config_file.read_text(encoding='utf-8', errors='replace').strip()
|
|
except OSError:
|
|
config_mtime = None
|
|
config_contents = None
|
|
|
|
return (
|
|
head_contents, head_mtime,
|
|
ref_mtime, packed_refs_mtime,
|
|
config_contents, config_mtime,
|
|
)
|
|
|
|
def _get_local_git_info(self, plugin_path: Path) -> Optional[Dict[str, str]]:
|
|
"""Return local git branch, commit hash, and commit date if the plugin is a git checkout.
|
|
|
|
Results are cached keyed on a signature that includes HEAD
|
|
contents plus the mtime of HEAD AND the resolved ref (or
|
|
packed-refs). Repeated calls skip the four ``git`` subprocesses
|
|
when nothing has changed, and a ``git pull`` that fast-forwards
|
|
the branch correctly invalidates the cache.
|
|
"""
|
|
git_dir = plugin_path / '.git'
|
|
if not git_dir.exists():
|
|
return None
|
|
|
|
cache_key = str(plugin_path)
|
|
signature = self._git_cache_signature(git_dir)
|
|
|
|
if signature is not None:
|
|
cached = self._git_info_cache.get(cache_key)
|
|
if cached is not None and cached[0] == signature:
|
|
return cached[1]
|
|
|
|
try:
|
|
# .git may be a file (worktree / submodule) containing "gitdir: <path>".
|
|
# Resolve it to the actual git directory before reading any files.
|
|
try:
|
|
if git_dir.is_file():
|
|
pointer = git_dir.read_text(encoding='utf-8', errors='replace').strip()
|
|
if pointer.startswith('gitdir:'):
|
|
resolved = (plugin_path / pointer[len('gitdir:'):].strip()).resolve()
|
|
if resolved.is_dir():
|
|
git_dir = resolved
|
|
else:
|
|
return None
|
|
else:
|
|
return None
|
|
except (OSError, NotADirectoryError):
|
|
return None
|
|
|
|
# Read branch directly from .git/HEAD (no subprocess).
|
|
branch = ''
|
|
try:
|
|
head_text = (git_dir / 'HEAD').read_text(encoding='utf-8', errors='replace').strip()
|
|
if head_text.startswith('ref: refs/heads/'):
|
|
branch = head_text[len('ref: refs/heads/'):]
|
|
elif head_text.startswith('ref: '):
|
|
branch = head_text[len('ref: '):]
|
|
# else: detached HEAD — branch stays ''
|
|
except (OSError, NotADirectoryError):
|
|
pass
|
|
|
|
# Remote URL from .git/config — parse [remote "origin"] url line.
|
|
remote_url = None
|
|
try:
|
|
config_text = (git_dir / 'config').read_text(encoding='utf-8', errors='replace')
|
|
in_origin = False
|
|
for line in config_text.splitlines():
|
|
stripped = line.strip()
|
|
if stripped == '[remote "origin"]':
|
|
in_origin = True
|
|
elif stripped.startswith('['):
|
|
in_origin = False
|
|
elif in_origin and stripped.startswith('url') and '=' in stripped:
|
|
remote_url = stripped.split('=', 1)[1].strip()
|
|
break
|
|
except (OSError, NotADirectoryError):
|
|
pass
|
|
|
|
# Single subprocess: SHA + commit date in one call.
|
|
log_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'log', '-1', '--format=%H%n%cI', 'HEAD'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=True
|
|
)
|
|
lines = log_result.stdout.strip().splitlines()
|
|
sha = lines[0] if lines else ''
|
|
commit_date_iso = lines[1] if len(lines) > 1 else ''
|
|
|
|
result = {
|
|
'sha': sha,
|
|
'short_sha': sha[:7] if sha else '',
|
|
'branch': branch,
|
|
}
|
|
|
|
if remote_url:
|
|
result['remote_url'] = remote_url
|
|
|
|
if commit_date_iso:
|
|
result['date_iso'] = commit_date_iso
|
|
result['date'] = self._iso_to_date(commit_date_iso)
|
|
|
|
if signature is not None:
|
|
self._git_info_cache[cache_key] = (signature, result)
|
|
return result
|
|
except subprocess.CalledProcessError as err:
|
|
self.logger.debug(f"Failed to read git info for {plugin_path.name}: {err}")
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.debug(f"Timed out reading git info for {plugin_path.name}")
|
|
|
|
return None
|
|
|
|
def _safe_remove_directory(self, path: Path) -> bool:
|
|
"""
|
|
Safely remove a directory, handling permission errors for root-owned files.
|
|
|
|
Attempts removal in three stages:
|
|
1. Normal shutil.rmtree()
|
|
2. Fix permissions via os.chmod() then retry (works for same-owner files)
|
|
3. Use sudo rm -rf as last resort (works for root-owned __pycache__, etc.)
|
|
|
|
Args:
|
|
path: Path to directory to remove
|
|
|
|
Returns:
|
|
True if directory was removed successfully, False otherwise
|
|
"""
|
|
if not path.exists():
|
|
return True # Already removed
|
|
|
|
# Stage 1: Try normal removal
|
|
try:
|
|
shutil.rmtree(path)
|
|
return True
|
|
except OSError:
|
|
self.logger.warning(f"Permission error removing {path}, attempting chmod fix...")
|
|
|
|
# Stage 2: Try chmod + retry (works when we own the files)
|
|
try:
|
|
for root, _dirs, files in os.walk(path):
|
|
root_path = Path(root)
|
|
try:
|
|
os.chmod(root_path, stat.S_IRWXU)
|
|
except (OSError, PermissionError):
|
|
pass
|
|
for file in files:
|
|
try:
|
|
os.chmod(root_path / file, stat.S_IRWXU)
|
|
except (OSError, PermissionError):
|
|
pass
|
|
shutil.rmtree(path)
|
|
self.logger.info(f"Removed {path} after fixing permissions")
|
|
return True
|
|
except (PermissionError, OSError):
|
|
self.logger.warning(f"chmod fix failed for {path}, attempting sudo removal...")
|
|
|
|
# Stage 3: Use sudo rm -rf (for root-owned __pycache__, data/.cache, etc.)
|
|
if sudo_remove_directory(path):
|
|
return True
|
|
|
|
# Final check — maybe partial removal got everything
|
|
if not path.exists():
|
|
return True
|
|
|
|
self.logger.error(f"All removal strategies failed for {path}")
|
|
return False
|
|
|
|
def _find_plugin_path(self, plugin_id: str) -> Optional[Path]:
|
|
"""
|
|
Find the plugin path by checking the configured directory and standard plugins directory.
|
|
|
|
Args:
|
|
plugin_id: Plugin identifier
|
|
|
|
Returns:
|
|
Path to plugin directory if found, None otherwise
|
|
"""
|
|
# First check the configured plugins directory
|
|
plugin_path = self.plugins_dir / plugin_id
|
|
if plugin_path.exists():
|
|
return plugin_path
|
|
|
|
# Also check the standard 'plugins/' directory if it's different
|
|
# This handles the case where plugins are in plugins/ but config says plugin-repos/
|
|
try:
|
|
if self.plugins_dir.is_absolute():
|
|
project_root = self.plugins_dir.parent
|
|
else:
|
|
project_root = self.plugins_dir.resolve().parent
|
|
|
|
standard_plugins_dir = project_root / 'plugins'
|
|
if standard_plugins_dir.exists() and standard_plugins_dir != self.plugins_dir:
|
|
plugin_path = standard_plugins_dir / plugin_id
|
|
if plugin_path.exists():
|
|
return plugin_path
|
|
except (OSError, ValueError):
|
|
pass
|
|
|
|
return None
|
|
|
|
def uninstall_plugin(self, plugin_id: str) -> bool:
|
|
"""
|
|
Uninstall a plugin by removing its directory.
|
|
|
|
Args:
|
|
plugin_id: Plugin identifier
|
|
|
|
Returns:
|
|
True if uninstalled successfully (or already not installed)
|
|
"""
|
|
plugin_path = self._find_plugin_path(plugin_id)
|
|
|
|
if plugin_path is None or not plugin_path.exists():
|
|
self.logger.info(f"Plugin {plugin_id} not found (already uninstalled)")
|
|
return True # Already uninstalled, consider this success
|
|
|
|
try:
|
|
self.logger.info(f"Uninstalling plugin: {plugin_id}")
|
|
if self._safe_remove_directory(plugin_path):
|
|
self.logger.info(f"Successfully uninstalled plugin: {plugin_id}")
|
|
return True
|
|
else:
|
|
self.logger.error(f"Failed to remove plugin directory: {plugin_path}")
|
|
return False
|
|
except Exception as e:
|
|
self.logger.error(f"Error uninstalling plugin {plugin_id}: {e}")
|
|
return False
|
|
|
|
def update_plugin(self, plugin_id: str) -> bool:
|
|
"""
|
|
Update a plugin to the latest commit on its upstream branch.
|
|
"""
|
|
plugin_path = self._find_plugin_path(plugin_id)
|
|
|
|
if plugin_path is None or not plugin_path.exists():
|
|
self.logger.error(f"Plugin not installed: {plugin_id}")
|
|
return False
|
|
|
|
try:
|
|
self.logger.info(f"Checking for updates to plugin {plugin_id}")
|
|
|
|
# Check if this is a bundled/unmanaged plugin (no registry entry, no git remote)
|
|
# These are plugins shipped with LEDMatrix itself and updated via LEDMatrix updates.
|
|
metadata_path = plugin_path / ".plugin_metadata.json"
|
|
if metadata_path.exists():
|
|
try:
|
|
with open(metadata_path, 'r', encoding='utf-8') as f:
|
|
metadata = json.load(f)
|
|
if metadata.get('install_type') == 'bundled':
|
|
self.logger.info(f"Plugin {plugin_id} is a bundled plugin; updates are delivered via LEDMatrix itself")
|
|
return True
|
|
except (OSError, ValueError) as e:
|
|
self.logger.debug(f"[PluginStore] Could not read metadata for {plugin_id} at {metadata_path}: {e}")
|
|
|
|
# First check if it's a git repository - if so, we can update directly
|
|
git_info = self._get_local_git_info(plugin_path)
|
|
|
|
if git_info:
|
|
# Plugin is a git repository - try to update via git
|
|
local_branch = git_info.get('branch') or 'main'
|
|
local_sha = git_info.get('sha')
|
|
|
|
# Try to get remote info from registry (optional)
|
|
self.fetch_registry(force_refresh=True)
|
|
plugin_info_remote = self.get_plugin_info(plugin_id, fetch_latest_from_github=True, force_refresh=True)
|
|
# Try without 'ledmatrix-' prefix (monorepo migration)
|
|
resolved_id = plugin_id
|
|
if not plugin_info_remote and plugin_id.startswith('ledmatrix-'):
|
|
alt_id = plugin_id[len('ledmatrix-'):]
|
|
plugin_info_remote = self.get_plugin_info(alt_id, fetch_latest_from_github=True, force_refresh=True)
|
|
if plugin_info_remote:
|
|
resolved_id = alt_id
|
|
self.logger.info(f"Plugin {plugin_id} found in registry as {resolved_id}")
|
|
remote_branch = None
|
|
remote_sha = None
|
|
|
|
if plugin_info_remote:
|
|
remote_branch = plugin_info_remote.get('branch') or plugin_info_remote.get('default_branch')
|
|
remote_sha = plugin_info_remote.get('last_commit_sha')
|
|
|
|
# Check if the local git remote still matches the registry repo URL.
|
|
# After monorepo migration, old clones point to archived individual repos
|
|
# while the registry now points to the monorepo. Detect this and reinstall.
|
|
registry_repo = plugin_info_remote.get('repo', '')
|
|
local_remote = git_info.get('remote_url', '')
|
|
if local_remote and registry_repo and self._normalize_repo_url(local_remote) != self._normalize_repo_url(registry_repo):
|
|
self.logger.info(
|
|
f"Plugin {resolved_id} git remote ({local_remote}) differs from registry ({registry_repo}). "
|
|
f"Reinstalling from registry to migrate to new source."
|
|
)
|
|
if not self._safe_remove_directory(plugin_path):
|
|
self.logger.error(f"Failed to remove old plugin directory for {resolved_id}")
|
|
return False
|
|
return self.install_plugin(resolved_id)
|
|
|
|
# Check if already up to date
|
|
if remote_sha and local_sha and remote_sha.startswith(local_sha):
|
|
self.logger.info(f"Plugin {plugin_id} already matches remote commit {remote_sha[:7]}")
|
|
return True
|
|
|
|
# Update via git pull
|
|
self.logger.info(f"Updating {plugin_id} via git pull (local branch: {local_branch})...")
|
|
try:
|
|
# Fetch latest changes first to get all remote branch info
|
|
# If fetch fails, we'll still try to pull (might work with existing remote refs)
|
|
fetch_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'fetch', 'origin'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
check=False
|
|
)
|
|
if fetch_result.returncode != 0:
|
|
self.logger.warning(f"Git fetch failed for {plugin_id}: {fetch_result.stderr or fetch_result.stdout}. Will still attempt pull.")
|
|
else:
|
|
self.logger.debug(f"Successfully fetched remote changes for {plugin_id}")
|
|
|
|
# Determine which remote branch to pull from
|
|
# Strategy: Use what the local branch is tracking, or find the best match
|
|
remote_pull_branch = None
|
|
|
|
# First, check what the local branch is tracking
|
|
tracking_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'rev-parse', '--abbrev-ref', '--symbolic-full-name', f'{local_branch}@{{upstream}}'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=False
|
|
)
|
|
|
|
if tracking_result.returncode == 0 and tracking_result.stdout.strip():
|
|
# Local branch is tracking a remote branch
|
|
tracking_ref = tracking_result.stdout.strip()
|
|
# Extract branch name from refs/remotes/origin/branch-name or origin/branch-name
|
|
if tracking_ref.startswith('refs/remotes/origin/'):
|
|
remote_pull_branch = tracking_ref.replace('refs/remotes/origin/', '')
|
|
self.logger.info(f"Local branch {local_branch} is tracking origin/{remote_pull_branch}")
|
|
elif tracking_ref.startswith('origin/'):
|
|
remote_pull_branch = tracking_ref.replace('origin/', '')
|
|
self.logger.info(f"Local branch {local_branch} is tracking origin/{remote_pull_branch}")
|
|
|
|
# If not tracking anything, try to find the best remote branch match
|
|
if not remote_pull_branch:
|
|
# Check if remote branch from registry exists
|
|
if remote_branch:
|
|
remote_check = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'ls-remote', '--heads', 'origin', remote_branch],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=False
|
|
)
|
|
if remote_check.returncode == 0 and remote_check.stdout.strip():
|
|
remote_pull_branch = remote_branch
|
|
self.logger.info(f"Using remote branch {remote_branch} from registry")
|
|
|
|
# If registry branch doesn't exist, check if local branch name exists on remote
|
|
if not remote_pull_branch:
|
|
local_as_remote_check = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'ls-remote', '--heads', 'origin', local_branch],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=False
|
|
)
|
|
if local_as_remote_check.returncode == 0 and local_as_remote_check.stdout.strip():
|
|
remote_pull_branch = local_branch
|
|
self.logger.info(f"Using local branch name {local_branch} as remote branch")
|
|
|
|
# Last resort: try to get remote's default branch
|
|
if not remote_pull_branch:
|
|
default_branch_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'symbolic-ref', 'refs/remotes/origin/HEAD'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=False
|
|
)
|
|
if default_branch_result.returncode == 0:
|
|
default_ref = default_branch_result.stdout.strip()
|
|
if default_ref.startswith('refs/remotes/origin/'):
|
|
remote_pull_branch = default_ref.replace('refs/remotes/origin/', '')
|
|
self.logger.info(f"Using remote default branch {remote_pull_branch}")
|
|
|
|
# If we still don't have a remote branch, use local branch name (git will handle it)
|
|
if not remote_pull_branch:
|
|
remote_pull_branch = local_branch
|
|
self.logger.info(f"Falling back to local branch name {local_branch} for pull")
|
|
|
|
# Ensure we're on the local branch
|
|
checkout_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'checkout', local_branch],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
check=False
|
|
)
|
|
if checkout_result.returncode != 0:
|
|
self.logger.warning(f"Git checkout to {local_branch} failed for {plugin_id}: {checkout_result.stderr or checkout_result.stdout}. Will still attempt pull.")
|
|
|
|
# Check for local changes and untracked files that might conflict
|
|
# First, check for untracked files that would be overwritten
|
|
try:
|
|
# Check for untracked files
|
|
untracked_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'status', '--porcelain', '--untracked-files=all'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
check=False
|
|
)
|
|
untracked_files = []
|
|
if untracked_result.returncode == 0:
|
|
for line in untracked_result.stdout.strip().split('\n'):
|
|
if line.startswith('??'):
|
|
# Untracked file
|
|
file_path = line[3:].strip()
|
|
untracked_files.append(file_path)
|
|
|
|
# Remove marker files that are safe to delete (they'll be regenerated)
|
|
safe_to_remove = ['.dependencies_installed']
|
|
removed_files = []
|
|
for file_name in safe_to_remove:
|
|
file_path = plugin_path / file_name
|
|
if file_path.exists() and file_name in untracked_files:
|
|
try:
|
|
file_path.unlink()
|
|
removed_files.append(file_name)
|
|
self.logger.info(f"Removed marker file {file_name} from {plugin_id} before update")
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not remove {file_name} from {plugin_id}: {e}")
|
|
|
|
# Check for tracked file changes
|
|
status_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'status', '--porcelain', '--untracked-files=no'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
check=False
|
|
)
|
|
has_changes = bool(status_result.stdout.strip())
|
|
|
|
# If there are remaining untracked files (not safe to remove), stash them
|
|
remaining_untracked = [f for f in untracked_files if f not in removed_files]
|
|
if remaining_untracked:
|
|
self.logger.info(f"Found {len(remaining_untracked)} untracked files in {plugin_id}, will stash them")
|
|
has_changes = True
|
|
except subprocess.TimeoutExpired:
|
|
# If status check times out, assume there might be changes and proceed
|
|
self.logger.warning(f"Git status check timed out for {plugin_id}, proceeding with update")
|
|
has_changes = True
|
|
status_result = type('obj', (object,), {'stdout': '', 'stderr': 'Status check timed out'})()
|
|
|
|
stash_info = ""
|
|
if has_changes:
|
|
self.logger.info(f"Stashing local changes in {plugin_id} before update")
|
|
try:
|
|
# Use -u to include untracked files in stash
|
|
stash_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'stash', 'push', '-u', '-m', f'LEDMatrix auto-stash before update {plugin_id}'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
check=False
|
|
)
|
|
if stash_result.returncode == 0:
|
|
stash_info = " (local changes were stashed)"
|
|
self.logger.info(f"Stashed local changes (including untracked files) for {plugin_id}")
|
|
else:
|
|
self.logger.warning(f"Failed to stash local changes for {plugin_id}: {stash_result.stderr}")
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"Stash operation timed out for {plugin_id}, proceeding with pull")
|
|
|
|
# Pull from the determined remote branch
|
|
self.logger.info(f"Pulling from origin/{remote_pull_branch} for {plugin_id}...")
|
|
pull_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'pull', 'origin', remote_pull_branch],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
check=True
|
|
)
|
|
|
|
pull_message = pull_result.stdout.strip() or f"Pulled latest changes for {plugin_id}"
|
|
if stash_info:
|
|
pull_message += stash_info
|
|
self.logger.info(pull_message)
|
|
|
|
updated_git_info = self._get_local_git_info(plugin_path) or {}
|
|
updated_sha = updated_git_info.get('sha', '')
|
|
if remote_sha and updated_sha and remote_sha.startswith(updated_sha):
|
|
self.logger.info(f"Plugin {plugin_id} now at remote commit {remote_sha[:7]}{stash_info}")
|
|
elif updated_sha:
|
|
self.logger.info(f"Plugin {plugin_id} updated to commit {updated_sha[:7]}{stash_info}")
|
|
|
|
self._install_dependencies(plugin_path)
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as git_error:
|
|
error_output = git_error.stderr or git_error.stdout or "Unknown error"
|
|
cmd_str = ' '.join(git_error.cmd) if hasattr(git_error, 'cmd') else 'unknown'
|
|
self.logger.error(f"Git update failed for {plugin_id}")
|
|
self.logger.error(f"Command: {cmd_str}")
|
|
self.logger.error(f"Return code: {git_error.returncode}")
|
|
self.logger.error(f"Error output: {error_output}")
|
|
|
|
# Check for specific error conditions
|
|
error_lower = error_output.lower()
|
|
if "would be overwritten" in error_output or "local changes" in error_lower:
|
|
self.logger.warning(f"Plugin {plugin_id} has local changes that prevent update. Consider committing or stashing changes manually.")
|
|
elif "refusing to merge unrelated histories" in error_lower:
|
|
self.logger.error(f"Plugin {plugin_id} has unrelated git histories. Plugin may need to be reinstalled.")
|
|
elif "authentication" in error_lower or "permission denied" in error_lower:
|
|
self.logger.error(f"Authentication failed for {plugin_id}. Check git credentials or repository permissions.")
|
|
elif "not found" in error_lower or "does not exist" in error_lower:
|
|
self.logger.error(f"Remote branch or repository not found for {plugin_id}. Check repository URL and branch name.")
|
|
elif "merge conflict" in error_lower or "conflict" in error_lower:
|
|
self.logger.error(f"Merge conflict detected for {plugin_id}. Resolve conflicts manually or reinstall plugin.")
|
|
|
|
return False
|
|
except subprocess.TimeoutExpired:
|
|
self.logger.warning(f"Git update timed out for {plugin_id}")
|
|
return False
|
|
|
|
# Not a git repository - try to get repo URL from git config if it exists
|
|
# (in case .git directory was removed but remote URL is still in config)
|
|
repo_url = None
|
|
try:
|
|
# Use --local to avoid inheriting the parent LEDMatrix repo's git config
|
|
# when the plugin directory lives inside the main repo (e.g. plugin-repos/).
|
|
remote_url_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'config', '--local', '--get', 'remote.origin.url'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=False
|
|
)
|
|
if remote_url_result.returncode == 0:
|
|
repo_url = remote_url_result.stdout.strip()
|
|
self.logger.info(f"Found git remote URL for {plugin_id}: {repo_url}")
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not get git remote URL: {e}")
|
|
|
|
# Try registry-based update
|
|
self.logger.info(f"Plugin {plugin_id} is not a git repository, checking registry...")
|
|
self.fetch_registry(force_refresh=True)
|
|
plugin_info_remote = self.get_plugin_info(plugin_id, fetch_latest_from_github=True, force_refresh=True)
|
|
|
|
# If not found, try without 'ledmatrix-' prefix (monorepo migration)
|
|
registry_id = plugin_id
|
|
if not plugin_info_remote and plugin_id.startswith('ledmatrix-'):
|
|
alt_id = plugin_id[len('ledmatrix-'):]
|
|
plugin_info_remote = self.get_plugin_info(alt_id, fetch_latest_from_github=True, force_refresh=True)
|
|
if plugin_info_remote:
|
|
registry_id = alt_id
|
|
self.logger.info(f"Plugin {plugin_id} found in registry as {alt_id}")
|
|
|
|
# If not in registry but we have a repo URL, try reinstalling from that URL
|
|
if not plugin_info_remote and repo_url:
|
|
self.logger.info(f"Plugin {plugin_id} not in registry but has git remote URL. Reinstalling from {repo_url} to enable updates...")
|
|
try:
|
|
# Get current branch if possible
|
|
branch_result = subprocess.run(
|
|
['git', '-C', str(plugin_path), 'rev-parse', '--abbrev-ref', 'HEAD'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=False
|
|
)
|
|
branch = branch_result.stdout.strip() if branch_result.returncode == 0 else None
|
|
if branch == 'HEAD' or not branch:
|
|
branch = 'main'
|
|
|
|
# Reinstall from URL
|
|
result = self.install_from_url(repo_url, plugin_id=plugin_id, branch=branch)
|
|
if result.get('success'):
|
|
self.logger.info(f"Successfully reinstalled {plugin_id} from {repo_url} as git repository")
|
|
return True
|
|
else:
|
|
self.logger.warning(f"Failed to reinstall {plugin_id} from {repo_url}: {result.get('error')}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error reinstalling {plugin_id} from URL: {e}")
|
|
|
|
if not plugin_info_remote:
|
|
self.logger.warning(f"Plugin {plugin_id} not found in registry and not a git repository; cannot update automatically")
|
|
if not repo_url:
|
|
self.logger.warning("Plugin may have been installed via ZIP download. Try reinstalling from GitHub URL to enable updates.")
|
|
return False
|
|
|
|
repo_url = plugin_info_remote.get('repo')
|
|
remote_sha = plugin_info_remote.get('last_commit_sha')
|
|
remote_branch = plugin_info_remote.get('branch') or plugin_info_remote.get('default_branch')
|
|
|
|
# Compare local manifest version against registry latest_version
|
|
# to avoid unnecessary reinstalls for monorepo plugins
|
|
try:
|
|
local_manifest_path = plugin_path / "manifest.json"
|
|
if local_manifest_path.exists():
|
|
with open(local_manifest_path, 'r', encoding='utf-8') as f:
|
|
local_manifest = json.load(f)
|
|
local_version = local_manifest.get('version', '')
|
|
remote_version = plugin_info_remote.get('latest_version', '')
|
|
if local_version and remote_version and local_version == remote_version:
|
|
self.logger.info(f"Plugin {plugin_id} already at latest version {local_version}")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.debug(f"Could not compare versions for {plugin_id}: {e}")
|
|
|
|
# Plugin is not a git repo but is in registry and has a newer version - reinstall
|
|
self.logger.info(f"Plugin {plugin_id} not installed via git; re-installing latest archive (registry id: {registry_id})")
|
|
|
|
# Remove directory and reinstall fresh
|
|
if not self._safe_remove_directory(plugin_path):
|
|
self.logger.error(f"Failed to remove old plugin directory for {plugin_id}")
|
|
return False
|
|
return self.install_plugin(registry_id)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
self.logger.error(f"Error updating plugin {plugin_id}: {e}")
|
|
self.logger.debug(traceback.format_exc())
|
|
return False
|
|
|
|
def list_installed_plugins(self) -> List[str]:
|
|
"""
|
|
Get list of installed plugin IDs.
|
|
|
|
Returns:
|
|
List of plugin IDs
|
|
"""
|
|
if not self.plugins_dir.exists():
|
|
return []
|
|
|
|
installed = []
|
|
for item in self.plugins_dir.iterdir():
|
|
if item.is_dir() and (item / "manifest.json").exists():
|
|
installed.append(item.name)
|
|
|
|
return installed
|
|
|
|
def get_installed_plugin_info(self, plugin_id: str) -> Optional[Dict]:
|
|
"""
|
|
Get manifest information for an installed plugin.
|
|
|
|
Args:
|
|
plugin_id: Plugin identifier
|
|
|
|
Returns:
|
|
Manifest data or None if not found
|
|
"""
|
|
manifest_path = self.plugins_dir / plugin_id / "manifest.json"
|
|
|
|
if not manifest_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(manifest_path, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
self.logger.error(f"Error reading manifest for {plugin_id}: {e}")
|
|
return None
|