mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-06-19 11:08:39 +00:00
* fix(web-ui): fix quick actions not firing, add toast feedback, suppress install handler warning - base.html: add htmx:afterSettle listener to set data-loaded on tab containers after HTMX swaps their content, preventing the overview partial from being re-fetched (and handlers lost) on every tab switch - base.html: call htmx.process() in loadOverviewDirect/loadPluginsDirect fallbacks so buttons get HTMX handlers even if HTMX finished its initial body scan before the fallback fetch completed - overview.html + index.html (11 buttons): replace event.detail.xhr.responseJSON (undefined in HTMX 1.9.x) with JSON.parse(event.detail.xhr.responseText) so quick action toast notifications actually fire - plugins_manager.js: add guarded htmx:afterSettle listener that only calls attachInstallButtonHandler when #install-plugin-from-url is in the DOM, eliminating the spurious console warning on non-plugin tab loads Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(web-ui): ensure quick-action toasts always fire even on xhr/parse failure Replace silent catch(e){} in all 11 hx-on:htmx:after-request handlers with a pattern that sets default message/status before the try block and calls showNotification(m,s) unconditionally after it, so a fallback toast is shown whenever xhr is absent or responseText is not valid JSON. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(web-ui): show error toast on non-JSON 4xx/5xx quick-action responses In the catch block of all 11 hx-on:htmx:after-request handlers, check xhr.status >= 400 and downgrade s to 'error' so a failed action that returns an HTML error page (or other non-JSON body) surfaces as an error toast instead of the optimistic 'success'/'info' default. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(web-ui): guard setTimeout fallback for attachInstallButtonHandler The 500ms fallback setTimeout was calling attachInstallButtonHandler() unconditionally even when the plugins partial wasn't in the DOM, causing a spurious console.warn on every page load. Add the same element-existence check already present on the htmx:afterSettle listener. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix backup API 404s, hardware status 500, and HTMX loading race - Add all backup API routes to api_v3.py: preview, list, export, validate, restore (with plugin reinstall), download, delete - Fix PermissionError on /hardware/status: return graceful 200 instead of 500 when the status file is owned by a different user; also fix root cause by writing the file world-readable (0o644) in display_manager - Fix HTMX race: dispatch htmx:ready window event from HTMX onload callback; loadTabContent now waits for that event instead of immediately falling back to direct fetch (eliminating the "HTMX not available" console warning on initial load) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Cancel HTMX fallback timers when htmx:ready fires The 5-second setTimeout fallbacks for plugins and overview were firing before the htmx:ready event arrived, logging spurious warnings. Each timer now self-cancels via htmx:ready so the fallback only triggers when HTMX genuinely fails to load. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Address review feedback: error leaks, ok:false, htmx:ready coverage - Backup endpoints: replace raw str(e) in user-facing responses with a generic message; full exception still logged via exc_info=True - hardware/status: change ok:null to ok:false for PermissionError and json.JSONDecodeError so the UI's hw.ok===false check triggers correctly - base.html: dispatch htmx:ready from the fallback load path so any deferred listeners fire on CDN-fallback loads too - loadTabContent: also listen for htmx-load-failed so overview/wifi/plugins fall back to direct fetch when HTMX is completely unavailable Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Treat system-managed pip packages as satisfied for dependency marker When a plugin's requirements.txt includes a package installed via the system package manager (dnf/apt), pip fails with 'uninstall-no-record-file' because it can't replace the system-tracked copy. The package is present and functional, but the missing marker caused the install to be retried on every service restart. Detect this specific error pattern: if the only pip failure is uninstall-no-record-file, write the .dependencies_installed marker and log a warning instead of returning False, suppressing the repeated warning. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix uninstall-no-record-file detection condition The previous check used a string replacement that left 'error:' in the remaining text, causing the condition to always evaluate false. Simplify to a direct substring check: if 'uninstall-no-record-file' appears in pip stderr the affected package is installed at the system level and we write the marker, suppressing the repeated warning on every restart. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Resolve CodeQL security findings in backup API Path traversal (CWE-22): - backup_download: switch from send_file(user-tainted-path) to send_from_directory(_BACKUP_EXPORT_DIR, filename); Flask uses werkzeug safe_join internally which CodeQL recognises as a sanitizer - backup_delete: enumerate the export directory and match by name so entry.unlink() operates on a filesystem-derived Path rather than one constructed from user input; _safe_backup_path still guards first Information exposure through exceptions (CWE-209): - backup_validate: err_msg from validate_backup() can embed exception strings containing temp-file paths; log the detail, return a generic 'Invalid or corrupted backup file' to the client - Other backup endpoints: already fixed (str(e) -> generic message); CodeQL alerts will clear on next scan plugin_loader.py:185 (path traversal): false positive — requirements_file is constructed from plugin_dir returned by find_plugin_directory() (a filesystem scan), not from raw HTTP request input; no change needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix pre-existing information exposure in version and action endpoints - get_system_version (alert #218): replaced str(e) with generic message; exception still logged via logger.error(exc_info=True) - execute_system_action (alert #216): removed str(e) and full traceback.format_exc() from the HTTP response — the full stack trace was being sent directly to clients; replaced with generic message and proper logger.error call Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix remaining GitHub CodeQL security alerts - py/stack-trace-exposure: Remove str(e) and traceback.format_exc() from all HTTP responses across api_v3.py, pages_v3.py, and app.py; replace with generic messages and logger.error(exc_info=True) - py/reflective-xss: Escape partial_name via markupsafe.escape in the load_partial 404 response - py/path-injection: Add regex validation of plugin_id before filesystem use in _load_plugin_config_partial - py/incomplete-url-substring-sanitization: Replace 'github.com' in substring checks with urlparse hostname comparison in store_manager.py - py/clear-text-logging-sensitive-data: Remove football-scoreboard debug prints and sensitive request-body prints from update endpoint - js/bad-tag-filter: Replace script-only regex in BaseWidget.sanitizeValue with DOM-based textContent stripping that removes all HTML - js/incomplete-sanitization: Fix escapeAttr to properly encode &, ", ', <, > using HTML entities instead of backslash escaping - js/prototype-pollution-utility: Add __proto__/constructor/prototype key guards to deepMerge function in plugins_manager.js - app.py error handlers: Always return generic messages; remove debug-mode branches that could expose tracebacks in production Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix three remaining CodeQL path-injection and info-exposure alerts - plugin_loader.py: resolve plugin_dir with strict=True and validate marker_path with relative_to() before any filesystem writes, giving CodeQL the positive sanitization pattern it requires (py/path-injection) - api_v3.py _safe_backup_path: replace substring negative checks with a strict positive regex (^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$) that CodeQL recognises as sanitising the user-supplied filename (py/path-injection) - api_v3.py backup_validate: whitelist known-safe manifest fields before returning JSON, preventing any exception strings captured inside validate_backup() from reaching the HTTP response (py/stack-trace-exposure) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Resolve 29 open CodeQL security alerts across 5 files py/flask-debug (#214): - debug_web_manual.py: read debug mode from LEDMATRIX_FLASK_DEBUG env var instead of hardcoded True py/stack-trace-exposure (#216, #218): - api_v3.py execute_system_action: remove subprocess stdout/stderr from HTTP responses; log via logger instead - api_v3.py get_git_version: validate output matches safe ref format (^[a-zA-Z0-9._-]+$) before including in response - api_v3.py: remove all remaining traceback.format_exc() dead variables and print() debug calls (replaced with logger.debug/warning) py/reflective-xss (#207, #208, #209, #210, #211, #212): - api_v3.py: remove plugin_id from all error/success response messages (uninstall, install, update, health, not-found responses) - pages_v3.py load_partial: return static "Partial not found" message instead of echoing partial_name - pages_v3.py _load_starlark_config_partial: add app_id regex validation, use static error messages instead of f-strings with app_id py/path-injection (#187–#206): - pages_v3.py _load_plugin_config_partial: resolve plugins_base and validate _plugin_dir with relative_to() before all file operations; same for assets metadata directory - pages_v3.py _load_starlark_config_partial: resolve starlark_base and validate schema_file/config_file paths with relative_to() - plugin_loader.py _find_plugin_directory: resolve plugins_dir and validate strategy-2 candidates with relative_to() - plugin_loader.py install_dependencies: resolve plugin_dir first, then construct requirements_file and marker_path from resolved base - plugin_loader.py load_module: resolve plugin_dir with strict=True and validate entry_file with relative_to() before exec_module Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix 15 remaining CodeQL path-injection and stack-trace-exposure alerts Switch from resolve()+relative_to() to os.path.basename() reassignment, which CodeQL recognizes as a path sanitizer that breaks the taint chain. Also remove exception objects from backup_manager validate_backup return strings to eliminate the stack-trace-exposure taint source. Fixes alerts #227, #233, #234, #235, #237, #238, #239, #240, #241, #242, #243, #244, #245, #246, #247. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Fix broken logger format string and leaked exception in config save error - pages_v3.py: plain string was used instead of %-style substitution, so every manifest-read failure logged the literal "{plugin_id}" - api_v3.py save_main_config: exception message was still leaking through the error response; replace with generic message (consistent with the rest of the CodeQL sweep in this PR) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Chuck <chuck@example.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
606 lines
22 KiB
Python
606 lines
22 KiB
Python
"""
|
|
User configuration backup and restore.
|
|
|
|
Packages the user's LEDMatrix configuration, secrets, WiFi settings,
|
|
user-uploaded fonts, plugin image uploads, and installed-plugin manifest
|
|
into a single ``.zip`` that can be exported from one installation and
|
|
imported on a fresh install.
|
|
|
|
This module is intentionally Flask-free so it can be unit-tested and
|
|
used from scripts.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import socket
|
|
import tempfile
|
|
import zipfile
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SCHEMA_VERSION = 1
|
|
|
|
# Filenames shipped with the LEDMatrix repository under ``assets/fonts/``.
|
|
# Anything present on disk but NOT in this set is treated as a user upload
|
|
# and included in backups. Keep this snapshot in sync with the repo — regenerate
|
|
# with::
|
|
#
|
|
# ls assets/fonts/
|
|
#
|
|
# Tests assert the set matches the checked-in fonts.
|
|
BUNDLED_FONTS: frozenset[str] = frozenset({
|
|
"10x20.bdf",
|
|
"4x6.bdf",
|
|
"4x6-font.ttf",
|
|
"5by7.regular.ttf",
|
|
"5x7.bdf",
|
|
"5x8.bdf",
|
|
"6x9.bdf",
|
|
"6x10.bdf",
|
|
"6x12.bdf",
|
|
"6x13.bdf",
|
|
"6x13B.bdf",
|
|
"6x13O.bdf",
|
|
"7x13.bdf",
|
|
"7x13B.bdf",
|
|
"7x13O.bdf",
|
|
"7x14.bdf",
|
|
"7x14B.bdf",
|
|
"8x13.bdf",
|
|
"8x13B.bdf",
|
|
"8x13O.bdf",
|
|
"9x15.bdf",
|
|
"9x15B.bdf",
|
|
"9x18.bdf",
|
|
"9x18B.bdf",
|
|
"AUTHORS",
|
|
"bdf_font_guide",
|
|
"clR6x12.bdf",
|
|
"helvR12.bdf",
|
|
"ic8x8u.bdf",
|
|
"MatrixChunky8.bdf",
|
|
"MatrixChunky8X.bdf",
|
|
"MatrixLight6.bdf",
|
|
"MatrixLight6X.bdf",
|
|
"MatrixLight8X.bdf",
|
|
"PressStart2P-Regular.ttf",
|
|
"README",
|
|
"README.md",
|
|
"texgyre-27.bdf",
|
|
"tom-thumb.bdf",
|
|
})
|
|
|
|
# Relative paths inside the project that the backup knows how to round-trip.
|
|
_CONFIG_REL = Path("config/config.json")
|
|
_SECRETS_REL = Path("config/config_secrets.json")
|
|
_WIFI_REL = Path("config/wifi_config.json")
|
|
_FONTS_REL = Path("assets/fonts")
|
|
_PLUGIN_UPLOADS_REL = Path("assets/plugins")
|
|
_STATE_REL = Path("data/plugin_state.json")
|
|
|
|
MANIFEST_NAME = "manifest.json"
|
|
PLUGINS_MANIFEST_NAME = "plugins.json"
|
|
|
|
# Hard cap on the size of a single file we'll accept inside an uploaded ZIP
|
|
# to limit zip-bomb risk. 50 MB matches the existing plugin-image upload cap.
|
|
_MAX_MEMBER_BYTES = 50 * 1024 * 1024
|
|
# Hard cap on the total uncompressed size of an uploaded ZIP.
|
|
_MAX_TOTAL_BYTES = 200 * 1024 * 1024
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class RestoreOptions:
|
|
"""Which sections of a backup should be restored."""
|
|
|
|
restore_config: bool = True
|
|
restore_secrets: bool = True
|
|
restore_wifi: bool = True
|
|
restore_fonts: bool = True
|
|
restore_plugin_uploads: bool = True
|
|
reinstall_plugins: bool = True
|
|
|
|
|
|
@dataclass
|
|
class RestoreResult:
|
|
"""Outcome of a restore operation."""
|
|
|
|
success: bool = False
|
|
restored: List[str] = field(default_factory=list)
|
|
skipped: List[str] = field(default_factory=list)
|
|
plugins_to_install: List[Dict[str, Any]] = field(default_factory=list)
|
|
plugins_installed: List[str] = field(default_factory=list)
|
|
plugins_failed: List[Dict[str, str]] = field(default_factory=list)
|
|
errors: List[str] = field(default_factory=list)
|
|
manifest: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Manifest helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _ledmatrix_version(project_root: Path) -> str:
|
|
"""Best-effort version string for the current install."""
|
|
version_file = project_root / "VERSION"
|
|
if version_file.exists():
|
|
try:
|
|
return version_file.read_text(encoding="utf-8").strip() or "unknown"
|
|
except OSError:
|
|
pass
|
|
head_file = project_root / ".git" / "HEAD"
|
|
if head_file.exists():
|
|
try:
|
|
head = head_file.read_text(encoding="utf-8").strip()
|
|
if head.startswith("ref: "):
|
|
ref = head[5:]
|
|
ref_path = project_root / ".git" / ref
|
|
if ref_path.exists():
|
|
return ref_path.read_text(encoding="utf-8").strip()[:12] or "unknown"
|
|
return head[:12] or "unknown"
|
|
except OSError:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
def _build_manifest(contents: List[str], project_root: Path) -> Dict[str, Any]:
|
|
return {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
|
|
"ledmatrix_version": _ledmatrix_version(project_root),
|
|
"hostname": socket.gethostname(),
|
|
"contents": contents,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Installed-plugin enumeration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def list_installed_plugins(project_root: Path) -> List[Dict[str, Any]]:
|
|
"""
|
|
Return a list of currently-installed plugins suitable for the backup
|
|
manifest. Each entry has ``plugin_id`` and ``version``.
|
|
|
|
Reads ``data/plugin_state.json`` if present; otherwise walks the plugin
|
|
directory and reads each ``manifest.json``.
|
|
"""
|
|
plugins: Dict[str, Dict[str, Any]] = {}
|
|
|
|
state_file = project_root / _STATE_REL
|
|
if state_file.exists():
|
|
try:
|
|
with state_file.open("r", encoding="utf-8") as f:
|
|
state = json.load(f)
|
|
raw_plugins = state.get("states", {}) if isinstance(state, dict) else {}
|
|
if isinstance(raw_plugins, dict):
|
|
for plugin_id, info in raw_plugins.items():
|
|
if not isinstance(info, dict):
|
|
continue
|
|
plugins[plugin_id] = {
|
|
"plugin_id": plugin_id,
|
|
"version": info.get("version") or "",
|
|
"enabled": bool(info.get("enabled", True)),
|
|
}
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
logger.warning("Could not read plugin_state.json: %s", e)
|
|
|
|
# Fall back to scanning plugin-repos/ for manifests.
|
|
plugins_root = project_root / "plugin-repos"
|
|
if plugins_root.exists():
|
|
for entry in sorted(plugins_root.iterdir()):
|
|
if not entry.is_dir():
|
|
continue
|
|
manifest = entry / "manifest.json"
|
|
if not manifest.exists():
|
|
continue
|
|
try:
|
|
with manifest.open("r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
except (OSError, json.JSONDecodeError):
|
|
continue
|
|
plugin_id = data.get("id") or entry.name
|
|
if plugin_id not in plugins:
|
|
plugins[plugin_id] = {
|
|
"plugin_id": plugin_id,
|
|
"version": data.get("version", ""),
|
|
"enabled": True,
|
|
}
|
|
|
|
return sorted(plugins.values(), key=lambda p: p["plugin_id"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Font filtering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def iter_user_fonts(project_root: Path) -> List[Path]:
|
|
"""Return absolute paths to user-uploaded fonts (anything in
|
|
``assets/fonts/`` not listed in :data:`BUNDLED_FONTS`)."""
|
|
fonts_dir = project_root / _FONTS_REL
|
|
if not fonts_dir.exists():
|
|
return []
|
|
user_fonts: List[Path] = []
|
|
for entry in sorted(fonts_dir.iterdir()):
|
|
if entry.is_file() and entry.name not in BUNDLED_FONTS:
|
|
user_fonts.append(entry)
|
|
return user_fonts
|
|
|
|
|
|
def iter_plugin_uploads(project_root: Path) -> List[Path]:
|
|
"""Return every file under ``assets/plugins/*/uploads/`` (recursive)."""
|
|
plugin_root = project_root / _PLUGIN_UPLOADS_REL
|
|
if not plugin_root.exists():
|
|
return []
|
|
out: List[Path] = []
|
|
for plugin_dir in sorted(plugin_root.iterdir()):
|
|
if not plugin_dir.is_dir():
|
|
continue
|
|
uploads = plugin_dir / "uploads"
|
|
if not uploads.exists():
|
|
continue
|
|
for root, _dirs, files in os.walk(uploads):
|
|
for name in sorted(files):
|
|
out.append(Path(root) / name)
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Export
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def create_backup(
|
|
project_root: Path,
|
|
output_dir: Optional[Path] = None,
|
|
) -> Path:
|
|
"""
|
|
Build a backup ZIP and write it into ``output_dir`` (defaults to
|
|
``<project_root>/config/backups/exports/``). Returns the path to the
|
|
created file.
|
|
"""
|
|
project_root = Path(project_root).resolve()
|
|
if output_dir is None:
|
|
output_dir = project_root / "config" / "backups" / "exports"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
hostname = socket.gethostname() or "ledmatrix"
|
|
safe_host = "".join(c for c in hostname if c.isalnum() or c in "-_") or "ledmatrix"
|
|
zip_name = f"ledmatrix-backup-{safe_host}-{timestamp}.zip"
|
|
zip_path = output_dir / zip_name
|
|
|
|
contents: List[str] = []
|
|
|
|
# Stream directly to a temp file so we never hold the whole ZIP in memory.
|
|
tmp_path = zip_path.with_suffix(".zip.tmp")
|
|
try:
|
|
with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
# Config files.
|
|
if (project_root / _CONFIG_REL).exists():
|
|
zf.write(project_root / _CONFIG_REL, _CONFIG_REL.as_posix())
|
|
contents.append("config")
|
|
if (project_root / _SECRETS_REL).exists():
|
|
zf.write(project_root / _SECRETS_REL, _SECRETS_REL.as_posix())
|
|
contents.append("secrets")
|
|
if (project_root / _WIFI_REL).exists():
|
|
zf.write(project_root / _WIFI_REL, _WIFI_REL.as_posix())
|
|
contents.append("wifi")
|
|
|
|
# User-uploaded fonts.
|
|
user_fonts = iter_user_fonts(project_root)
|
|
if user_fonts:
|
|
for font in user_fonts:
|
|
arcname = font.relative_to(project_root).as_posix()
|
|
zf.write(font, arcname)
|
|
contents.append("fonts")
|
|
|
|
# Plugin uploads.
|
|
plugin_uploads = iter_plugin_uploads(project_root)
|
|
if plugin_uploads:
|
|
for upload in plugin_uploads:
|
|
arcname = upload.relative_to(project_root).as_posix()
|
|
zf.write(upload, arcname)
|
|
contents.append("plugin_uploads")
|
|
|
|
# Installed plugins manifest.
|
|
plugins = list_installed_plugins(project_root)
|
|
if plugins:
|
|
zf.writestr(
|
|
PLUGINS_MANIFEST_NAME,
|
|
json.dumps(plugins, indent=2),
|
|
)
|
|
contents.append("plugins")
|
|
|
|
# Manifest goes last so that `contents` reflects what we actually wrote.
|
|
manifest = _build_manifest(contents, project_root)
|
|
zf.writestr(MANIFEST_NAME, json.dumps(manifest, indent=2))
|
|
|
|
os.replace(tmp_path, zip_path)
|
|
except Exception:
|
|
tmp_path.unlink(missing_ok=True)
|
|
raise
|
|
logger.info("Created backup %s (%d bytes)", zip_path, zip_path.stat().st_size)
|
|
return zip_path
|
|
|
|
|
|
def preview_backup_contents(project_root: Path) -> Dict[str, Any]:
|
|
"""Return a summary of what ``create_backup`` would include."""
|
|
project_root = Path(project_root).resolve()
|
|
return {
|
|
"has_config": (project_root / _CONFIG_REL).exists(),
|
|
"has_secrets": (project_root / _SECRETS_REL).exists(),
|
|
"has_wifi": (project_root / _WIFI_REL).exists(),
|
|
"user_fonts": [p.name for p in iter_user_fonts(project_root)],
|
|
"plugin_uploads": len(iter_plugin_uploads(project_root)),
|
|
"plugins": list_installed_plugins(project_root),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Validate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _safe_extract_path(base_dir: Path, member_name: str) -> Optional[Path]:
|
|
"""Resolve a ZIP member name against ``base_dir`` and reject anything
|
|
that escapes it. Returns the resolved absolute path, or ``None`` if the
|
|
name is unsafe."""
|
|
# Reject absolute paths and Windows-style drives outright.
|
|
if member_name.startswith(("/", "\\")) or (len(member_name) >= 2 and member_name[1] == ":"):
|
|
return None
|
|
target = (base_dir / member_name).resolve()
|
|
try:
|
|
target.relative_to(base_dir.resolve())
|
|
except ValueError:
|
|
return None
|
|
return target
|
|
|
|
|
|
def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
|
|
"""
|
|
Inspect a backup ZIP without extracting to disk.
|
|
|
|
Returns ``(ok, error_message, manifest_dict)``. ``manifest_dict`` contains
|
|
the parsed manifest plus diagnostic fields:
|
|
- ``detected_contents``: list of section names present in the archive
|
|
- ``plugins``: parsed plugins.json if present
|
|
- ``total_uncompressed``: sum of uncompressed sizes
|
|
"""
|
|
zip_path = Path(zip_path)
|
|
if not zip_path.exists():
|
|
return False, f"Backup file not found: {zip_path}", {}
|
|
|
|
try:
|
|
with zipfile.ZipFile(zip_path, "r") as zf:
|
|
names = zf.namelist()
|
|
if MANIFEST_NAME not in names:
|
|
return False, "Backup is missing manifest.json", {}
|
|
|
|
total = 0
|
|
with tempfile.TemporaryDirectory() as _sandbox:
|
|
sandbox = Path(_sandbox)
|
|
for info in zf.infolist():
|
|
if info.file_size > _MAX_MEMBER_BYTES:
|
|
return False, f"Member {info.filename} is too large", {}
|
|
total += info.file_size
|
|
if total > _MAX_TOTAL_BYTES:
|
|
return False, "Backup exceeds maximum allowed size", {}
|
|
# Safety: reject members with unsafe paths up front.
|
|
if _safe_extract_path(sandbox, info.filename) is None:
|
|
return False, f"Unsafe path in backup: {info.filename}", {}
|
|
|
|
try:
|
|
manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8")
|
|
manifest = json.loads(manifest_raw)
|
|
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
|
|
return False, "Invalid manifest.json", {}
|
|
|
|
if not isinstance(manifest, dict) or "schema_version" not in manifest:
|
|
return False, "Invalid manifest structure", {}
|
|
if manifest.get("schema_version") != SCHEMA_VERSION:
|
|
return (
|
|
False,
|
|
f"Unsupported backup schema version: {manifest.get('schema_version')}",
|
|
{},
|
|
)
|
|
|
|
detected: List[str] = []
|
|
if _CONFIG_REL.as_posix() in names:
|
|
detected.append("config")
|
|
if _SECRETS_REL.as_posix() in names:
|
|
detected.append("secrets")
|
|
if _WIFI_REL.as_posix() in names:
|
|
detected.append("wifi")
|
|
if any(n.startswith(_FONTS_REL.as_posix() + "/") for n in names):
|
|
detected.append("fonts")
|
|
if any(
|
|
n.startswith(_PLUGIN_UPLOADS_REL.as_posix() + "/") and "/uploads/" in n
|
|
for n in names
|
|
):
|
|
detected.append("plugin_uploads")
|
|
|
|
plugins: List[Dict[str, Any]] = []
|
|
if PLUGINS_MANIFEST_NAME in names:
|
|
try:
|
|
plugins = json.loads(zf.read(PLUGINS_MANIFEST_NAME).decode("utf-8"))
|
|
if not isinstance(plugins, list):
|
|
plugins = []
|
|
else:
|
|
detected.append("plugins")
|
|
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
|
|
plugins = []
|
|
|
|
result_manifest = dict(manifest)
|
|
result_manifest["detected_contents"] = detected
|
|
result_manifest["plugins"] = plugins
|
|
result_manifest["total_uncompressed"] = total
|
|
result_manifest["file_count"] = len(names)
|
|
return True, "", result_manifest
|
|
except zipfile.BadZipFile:
|
|
return False, "File is not a valid ZIP archive", {}
|
|
except OSError:
|
|
return False, "Could not read backup", {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Restore
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _extract_zip_safe(zip_path: Path, dest_dir: Path) -> None:
|
|
"""Extract ``zip_path`` into ``dest_dir`` rejecting any unsafe members."""
|
|
with zipfile.ZipFile(zip_path, "r") as zf:
|
|
for info in zf.infolist():
|
|
target = _safe_extract_path(dest_dir, info.filename)
|
|
if target is None:
|
|
raise ValueError(f"Unsafe path in backup: {info.filename}")
|
|
if info.is_dir():
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
continue
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
with zf.open(info, "r") as src, open(target, "wb") as dst:
|
|
shutil.copyfileobj(src, dst, length=64 * 1024)
|
|
|
|
|
|
def _copy_file(src: Path, dst: Path) -> None:
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(src, dst)
|
|
|
|
|
|
def restore_backup(
|
|
zip_path: Path,
|
|
project_root: Path,
|
|
options: Optional[RestoreOptions] = None,
|
|
) -> RestoreResult:
|
|
"""
|
|
Restore ``zip_path`` into ``project_root`` according to ``options``.
|
|
|
|
Plugin reinstalls are NOT performed here — the caller is responsible for
|
|
walking ``result.plugins_to_install`` and calling the store manager. This
|
|
keeps this module Flask-free and side-effect free beyond the filesystem.
|
|
"""
|
|
if options is None:
|
|
options = RestoreOptions()
|
|
project_root = Path(project_root).resolve()
|
|
result = RestoreResult()
|
|
|
|
ok, err, manifest = validate_backup(zip_path)
|
|
if not ok:
|
|
result.errors.append(err)
|
|
return result
|
|
result.manifest = manifest
|
|
|
|
with tempfile.TemporaryDirectory(prefix="ledmatrix_restore_") as tmp:
|
|
tmp_dir = Path(tmp)
|
|
try:
|
|
_extract_zip_safe(Path(zip_path), tmp_dir)
|
|
except (ValueError, zipfile.BadZipFile, OSError) as e:
|
|
result.errors.append(f"Failed to extract backup: {e}")
|
|
return result
|
|
|
|
# Main config.
|
|
if options.restore_config and (tmp_dir / _CONFIG_REL).exists():
|
|
try:
|
|
_copy_file(tmp_dir / _CONFIG_REL, project_root / _CONFIG_REL)
|
|
result.restored.append("config")
|
|
except OSError as e:
|
|
result.errors.append(f"Failed to restore config.json: {e}")
|
|
elif (tmp_dir / _CONFIG_REL).exists():
|
|
result.skipped.append("config")
|
|
|
|
# Secrets.
|
|
if options.restore_secrets and (tmp_dir / _SECRETS_REL).exists():
|
|
try:
|
|
_copy_file(tmp_dir / _SECRETS_REL, project_root / _SECRETS_REL)
|
|
result.restored.append("secrets")
|
|
except OSError as e:
|
|
result.errors.append(f"Failed to restore config_secrets.json: {e}")
|
|
elif (tmp_dir / _SECRETS_REL).exists():
|
|
result.skipped.append("secrets")
|
|
|
|
# WiFi.
|
|
if options.restore_wifi and (tmp_dir / _WIFI_REL).exists():
|
|
try:
|
|
_copy_file(tmp_dir / _WIFI_REL, project_root / _WIFI_REL)
|
|
result.restored.append("wifi")
|
|
except OSError as e:
|
|
result.errors.append(f"Failed to restore wifi_config.json: {e}")
|
|
elif (tmp_dir / _WIFI_REL).exists():
|
|
result.skipped.append("wifi")
|
|
|
|
# User fonts — skip anything that collides with a bundled font.
|
|
tmp_fonts = tmp_dir / _FONTS_REL
|
|
if options.restore_fonts and tmp_fonts.exists():
|
|
restored_count = 0
|
|
for font in sorted(tmp_fonts.iterdir()):
|
|
if not font.is_file():
|
|
continue
|
|
if font.name in BUNDLED_FONTS:
|
|
result.skipped.append(f"font:{font.name} (bundled)")
|
|
continue
|
|
try:
|
|
_copy_file(font, project_root / _FONTS_REL / font.name)
|
|
restored_count += 1
|
|
except OSError as e:
|
|
result.errors.append(f"Failed to restore font {font.name}: {e}")
|
|
if restored_count:
|
|
result.restored.append(f"fonts ({restored_count})")
|
|
elif tmp_fonts.exists():
|
|
result.skipped.append("fonts")
|
|
|
|
# Plugin uploads.
|
|
tmp_uploads = tmp_dir / _PLUGIN_UPLOADS_REL
|
|
if options.restore_plugin_uploads and tmp_uploads.exists():
|
|
count = 0
|
|
for root, _dirs, files in os.walk(tmp_uploads):
|
|
for name in files:
|
|
src = Path(root) / name
|
|
rel = src.relative_to(tmp_dir)
|
|
if "/uploads/" not in rel.as_posix():
|
|
result.errors.append(f"Rejected unexpected plugin path: {rel}")
|
|
continue
|
|
try:
|
|
_copy_file(src, project_root / rel)
|
|
count += 1
|
|
except OSError as e:
|
|
result.errors.append(f"Failed to restore {rel}: {e}")
|
|
if count:
|
|
result.restored.append(f"plugin_uploads ({count})")
|
|
elif tmp_uploads.exists():
|
|
result.skipped.append("plugin_uploads")
|
|
|
|
# Plugins list (for caller to reinstall).
|
|
if options.reinstall_plugins and (tmp_dir / PLUGINS_MANIFEST_NAME).exists():
|
|
try:
|
|
with (tmp_dir / PLUGINS_MANIFEST_NAME).open("r", encoding="utf-8") as f:
|
|
plugins = json.load(f)
|
|
if isinstance(plugins, list):
|
|
result.plugins_to_install = [
|
|
{"plugin_id": p.get("plugin_id"), "version": p.get("version", "")}
|
|
for p in plugins
|
|
if isinstance(p, dict) and p.get("plugin_id")
|
|
]
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
result.errors.append(f"Could not read plugins.json: {e}")
|
|
|
|
result.success = not result.errors
|
|
return result
|