6 Commits

Author SHA1 Message Date
Chuck
c70183fdb6 fix(install): remove weather and music credential stubs from secrets template
config_secrets.template.json shipped ledmatrix-weather and music as
top-level keys; config_manager deep-merges secrets into the main config
on load, so the reconciler treated them as plugin config entries and
auto-installed both plugins on first web UI visit after a fresh install.

Remove both keys from the template and clear the inline fallback block
in first_time_install.sh so new installs start clean.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 18:33:53 -04:00
Chuck
54b131c9f4 fix(plugin_manager): apply recovery logic to update_all_plugins; extract helper
update_all_plugins still set PluginState.ERROR on both failure paths, leaving
it inconsistent with the run_scheduled_updates fix from the same PR.

Extract _record_update_failure(plugin_id, exc=None) to hold all shared failure
logic: capture actual failure time, build structured error_info, log the retry
warning, stamp plugin_last_update, call set_state_with_error(ENABLED), and
forward to health_tracker. Replace all four failure sites (two in
run_scheduled_updates, two in update_all_plugins) with calls to this helper.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 14:53:23 -04:00
Chuck
7b50a2db59 fix(plugin_state): lock _error_info accesses and store defensive copies
Three verified issues:

- set_error_info wrote _error_info without holding _lock and stored the
  caller's dict by reference, allowing races and post-write mutation
- set_state_with_error stored error_info by reference (lock was already held)
- get_error_info read _error_info without _lock and returned the live
  reference, letting callers mutate the stored snapshot

Implicit fourth fix: set_state also wrote _error_info without _lock; locking
get_error_info while leaving that writer unguarded would have created a new
race, so set_state is now wrapped in _lock too for consistency.

Changes:
- set_state: wrap entire body in self._lock (covers _states, _state_history,
  and _error_info writes atomically; ERROR-path _error_info value was already
  a fresh dict literal so no copy needed)
- set_error_info: acquire self._lock + store dict(error_info) shallow copy
- set_state_with_error: store dict(error_info) shallow copy (lock already held)
- get_error_info: acquire self._lock + return dict(info) copy or None

All stored values are flat dicts of strings/floats/bools, so shallow copy
is sufficient — deepcopy is not needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 11:50:54 -04:00
Chuck
09c7940986 fix(plugin_manager): atomic state+error write via set_state_with_error
The two-step set_state() / set_error_info() sequence left a window where
readers could observe ENABLED state without the accompanying error context.

Add threading.RLock to PluginStateManager and a new set_state_with_error()
method that holds the lock for both the state-transition write and the
_error_info write together. The method inlines the state-transition logic
rather than calling set_state() internally to intentionally skip the
"clear _error_info for non-ERROR states" side effect — the recoverable
error dict is exactly what we want stored.

Replace both paired set_state / set_error_info call sites in
run_scheduled_updates() with the single atomic method.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-29 09:31:04 -04:00
Chuck
37566d93ac fix(plugin_manager): address PR review — failure timestamp and error context
- Use time.time() at the point of failure instead of reusing current_time
  (captured before execution), so the full retry interval always elapses
  after a timeout rather than one execution-duration shorter

- Add PluginStateManager.set_error_info() to persist structured error context
  without changing plugin state; call it in both failure branches so
  get_error_info() / get_state_info() surface recoverable errors alongside
  ERROR-state errors

- Add warning log on the success=False branch (was previously silent)

- Pass a descriptive Exception (not a generic "Plugin execution failed") to
  health_tracker.record_failure() in the timeout/executor-error path

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 14:53:23 -04:00
Chuck
d0969ad57a fix(plugin_manager): prevent permanent ERROR state after update timeout
When execute_update() fails (timeout or unhandled exception), the plugin
state was set to ERROR with no recovery path. can_execute() returns False
for ERROR state, so the plugin's update() was never called again, leaving
it showing stale data indefinitely.

Instead, update plugin_last_update so the plugin waits one configured
interval before retrying, and keep the state ENABLED so recovery is
automatic on the next cycle.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 09:35:38 -04:00
7 changed files with 10 additions and 1697 deletions

View File

@@ -1,605 +0,0 @@
"""
User configuration backup and restore.
Packages the user's LEDMatrix configuration, secrets, WiFi settings,
user-uploaded fonts, plugin image uploads, and installed-plugin manifest
into a single ``.zip`` that can be exported from one installation and
imported on a fresh install.
This module is intentionally Flask-free so it can be unit-tested and
used from scripts.
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import socket
import tempfile
import zipfile
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
SCHEMA_VERSION = 1
# Filenames shipped with the LEDMatrix repository under ``assets/fonts/``.
# Anything present on disk but NOT in this set is treated as a user upload
# and included in backups. Keep this snapshot in sync with the repo — regenerate
# with::
#
# ls assets/fonts/
#
# Tests assert the set matches the checked-in fonts.
BUNDLED_FONTS: frozenset[str] = frozenset({
"10x20.bdf",
"4x6.bdf",
"4x6-font.ttf",
"5by7.regular.ttf",
"5x7.bdf",
"5x8.bdf",
"6x9.bdf",
"6x10.bdf",
"6x12.bdf",
"6x13.bdf",
"6x13B.bdf",
"6x13O.bdf",
"7x13.bdf",
"7x13B.bdf",
"7x13O.bdf",
"7x14.bdf",
"7x14B.bdf",
"8x13.bdf",
"8x13B.bdf",
"8x13O.bdf",
"9x15.bdf",
"9x15B.bdf",
"9x18.bdf",
"9x18B.bdf",
"AUTHORS",
"bdf_font_guide",
"clR6x12.bdf",
"helvR12.bdf",
"ic8x8u.bdf",
"MatrixChunky8.bdf",
"MatrixChunky8X.bdf",
"MatrixLight6.bdf",
"MatrixLight6X.bdf",
"MatrixLight8X.bdf",
"PressStart2P-Regular.ttf",
"README",
"README.md",
"texgyre-27.bdf",
"tom-thumb.bdf",
})
# Relative paths inside the project that the backup knows how to round-trip.
_CONFIG_REL = Path("config/config.json")
_SECRETS_REL = Path("config/config_secrets.json")
_WIFI_REL = Path("config/wifi_config.json")
_FONTS_REL = Path("assets/fonts")
_PLUGIN_UPLOADS_REL = Path("assets/plugins")
_STATE_REL = Path("data/plugin_state.json")
MANIFEST_NAME = "manifest.json"
PLUGINS_MANIFEST_NAME = "plugins.json"
# Hard cap on the size of a single file we'll accept inside an uploaded ZIP
# to limit zip-bomb risk. 50 MB matches the existing plugin-image upload cap.
_MAX_MEMBER_BYTES = 50 * 1024 * 1024
# Hard cap on the total uncompressed size of an uploaded ZIP.
_MAX_TOTAL_BYTES = 200 * 1024 * 1024
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class RestoreOptions:
"""Which sections of a backup should be restored."""
restore_config: bool = True
restore_secrets: bool = True
restore_wifi: bool = True
restore_fonts: bool = True
restore_plugin_uploads: bool = True
reinstall_plugins: bool = True
@dataclass
class RestoreResult:
"""Outcome of a restore operation."""
success: bool = False
restored: List[str] = field(default_factory=list)
skipped: List[str] = field(default_factory=list)
plugins_to_install: List[Dict[str, Any]] = field(default_factory=list)
plugins_installed: List[str] = field(default_factory=list)
plugins_failed: List[Dict[str, str]] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
manifest: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
# ---------------------------------------------------------------------------
# Manifest helpers
# ---------------------------------------------------------------------------
def _ledmatrix_version(project_root: Path) -> str:
"""Best-effort version string for the current install."""
version_file = project_root / "VERSION"
if version_file.exists():
try:
return version_file.read_text(encoding="utf-8").strip() or "unknown"
except OSError:
pass
head_file = project_root / ".git" / "HEAD"
if head_file.exists():
try:
head = head_file.read_text(encoding="utf-8").strip()
if head.startswith("ref: "):
ref = head[5:]
ref_path = project_root / ".git" / ref
if ref_path.exists():
return ref_path.read_text(encoding="utf-8").strip()[:12] or "unknown"
return head[:12] or "unknown"
except OSError:
pass
return "unknown"
def _build_manifest(contents: List[str], project_root: Path) -> Dict[str, Any]:
return {
"schema_version": SCHEMA_VERSION,
"created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
"ledmatrix_version": _ledmatrix_version(project_root),
"hostname": socket.gethostname(),
"contents": contents,
}
# ---------------------------------------------------------------------------
# Installed-plugin enumeration
# ---------------------------------------------------------------------------
def list_installed_plugins(project_root: Path) -> List[Dict[str, Any]]:
"""
Return a list of currently-installed plugins suitable for the backup
manifest. Each entry has ``plugin_id`` and ``version``.
Reads ``data/plugin_state.json`` if present; otherwise walks the plugin
directory and reads each ``manifest.json``.
"""
plugins: Dict[str, Dict[str, Any]] = {}
state_file = project_root / _STATE_REL
if state_file.exists():
try:
with state_file.open("r", encoding="utf-8") as f:
state = json.load(f)
raw_plugins = state.get("states", {}) if isinstance(state, dict) else {}
if isinstance(raw_plugins, dict):
for plugin_id, info in raw_plugins.items():
if not isinstance(info, dict):
continue
plugins[plugin_id] = {
"plugin_id": plugin_id,
"version": info.get("version") or "",
"enabled": bool(info.get("enabled", True)),
}
except (OSError, json.JSONDecodeError) as e:
logger.warning("Could not read plugin_state.json: %s", e)
# Fall back to scanning plugin-repos/ for manifests.
plugins_root = project_root / "plugin-repos"
if plugins_root.exists():
for entry in sorted(plugins_root.iterdir()):
if not entry.is_dir():
continue
manifest = entry / "manifest.json"
if not manifest.exists():
continue
try:
with manifest.open("r", encoding="utf-8") as f:
data = json.load(f)
except (OSError, json.JSONDecodeError):
continue
plugin_id = data.get("id") or entry.name
if plugin_id not in plugins:
plugins[plugin_id] = {
"plugin_id": plugin_id,
"version": data.get("version", ""),
"enabled": True,
}
return sorted(plugins.values(), key=lambda p: p["plugin_id"])
# ---------------------------------------------------------------------------
# Font filtering
# ---------------------------------------------------------------------------
def iter_user_fonts(project_root: Path) -> List[Path]:
"""Return absolute paths to user-uploaded fonts (anything in
``assets/fonts/`` not listed in :data:`BUNDLED_FONTS`)."""
fonts_dir = project_root / _FONTS_REL
if not fonts_dir.exists():
return []
user_fonts: List[Path] = []
for entry in sorted(fonts_dir.iterdir()):
if entry.is_file() and entry.name not in BUNDLED_FONTS:
user_fonts.append(entry)
return user_fonts
def iter_plugin_uploads(project_root: Path) -> List[Path]:
"""Return every file under ``assets/plugins/*/uploads/`` (recursive)."""
plugin_root = project_root / _PLUGIN_UPLOADS_REL
if not plugin_root.exists():
return []
out: List[Path] = []
for plugin_dir in sorted(plugin_root.iterdir()):
if not plugin_dir.is_dir():
continue
uploads = plugin_dir / "uploads"
if not uploads.exists():
continue
for root, _dirs, files in os.walk(uploads):
for name in sorted(files):
out.append(Path(root) / name)
return out
# ---------------------------------------------------------------------------
# Export
# ---------------------------------------------------------------------------
def create_backup(
project_root: Path,
output_dir: Optional[Path] = None,
) -> Path:
"""
Build a backup ZIP and write it into ``output_dir`` (defaults to
``<project_root>/config/backups/exports/``). Returns the path to the
created file.
"""
project_root = Path(project_root).resolve()
if output_dir is None:
output_dir = project_root / "config" / "backups" / "exports"
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
hostname = socket.gethostname() or "ledmatrix"
safe_host = "".join(c for c in hostname if c.isalnum() or c in "-_") or "ledmatrix"
zip_name = f"ledmatrix-backup-{safe_host}-{timestamp}.zip"
zip_path = output_dir / zip_name
contents: List[str] = []
# Stream directly to a temp file so we never hold the whole ZIP in memory.
tmp_path = zip_path.with_suffix(".zip.tmp")
try:
with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
# Config files.
if (project_root / _CONFIG_REL).exists():
zf.write(project_root / _CONFIG_REL, _CONFIG_REL.as_posix())
contents.append("config")
if (project_root / _SECRETS_REL).exists():
zf.write(project_root / _SECRETS_REL, _SECRETS_REL.as_posix())
contents.append("secrets")
if (project_root / _WIFI_REL).exists():
zf.write(project_root / _WIFI_REL, _WIFI_REL.as_posix())
contents.append("wifi")
# User-uploaded fonts.
user_fonts = iter_user_fonts(project_root)
if user_fonts:
for font in user_fonts:
arcname = font.relative_to(project_root).as_posix()
zf.write(font, arcname)
contents.append("fonts")
# Plugin uploads.
plugin_uploads = iter_plugin_uploads(project_root)
if plugin_uploads:
for upload in plugin_uploads:
arcname = upload.relative_to(project_root).as_posix()
zf.write(upload, arcname)
contents.append("plugin_uploads")
# Installed plugins manifest.
plugins = list_installed_plugins(project_root)
if plugins:
zf.writestr(
PLUGINS_MANIFEST_NAME,
json.dumps(plugins, indent=2),
)
contents.append("plugins")
# Manifest goes last so that `contents` reflects what we actually wrote.
manifest = _build_manifest(contents, project_root)
zf.writestr(MANIFEST_NAME, json.dumps(manifest, indent=2))
os.replace(tmp_path, zip_path)
except Exception:
tmp_path.unlink(missing_ok=True)
raise
logger.info("Created backup %s (%d bytes)", zip_path, zip_path.stat().st_size)
return zip_path
def preview_backup_contents(project_root: Path) -> Dict[str, Any]:
"""Return a summary of what ``create_backup`` would include."""
project_root = Path(project_root).resolve()
return {
"has_config": (project_root / _CONFIG_REL).exists(),
"has_secrets": (project_root / _SECRETS_REL).exists(),
"has_wifi": (project_root / _WIFI_REL).exists(),
"user_fonts": [p.name for p in iter_user_fonts(project_root)],
"plugin_uploads": len(iter_plugin_uploads(project_root)),
"plugins": list_installed_plugins(project_root),
}
# ---------------------------------------------------------------------------
# Validate
# ---------------------------------------------------------------------------
def _safe_extract_path(base_dir: Path, member_name: str) -> Optional[Path]:
"""Resolve a ZIP member name against ``base_dir`` and reject anything
that escapes it. Returns the resolved absolute path, or ``None`` if the
name is unsafe."""
# Reject absolute paths and Windows-style drives outright.
if member_name.startswith(("/", "\\")) or (len(member_name) >= 2 and member_name[1] == ":"):
return None
target = (base_dir / member_name).resolve()
try:
target.relative_to(base_dir.resolve())
except ValueError:
return None
return target
def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
"""
Inspect a backup ZIP without extracting to disk.
Returns ``(ok, error_message, manifest_dict)``. ``manifest_dict`` contains
the parsed manifest plus diagnostic fields:
- ``detected_contents``: list of section names present in the archive
- ``plugins``: parsed plugins.json if present
- ``total_uncompressed``: sum of uncompressed sizes
"""
zip_path = Path(zip_path)
if not zip_path.exists():
return False, f"Backup file not found: {zip_path}", {}
try:
with zipfile.ZipFile(zip_path, "r") as zf:
names = zf.namelist()
if MANIFEST_NAME not in names:
return False, "Backup is missing manifest.json", {}
total = 0
with tempfile.TemporaryDirectory() as _sandbox:
sandbox = Path(_sandbox)
for info in zf.infolist():
if info.file_size > _MAX_MEMBER_BYTES:
return False, f"Member {info.filename} is too large", {}
total += info.file_size
if total > _MAX_TOTAL_BYTES:
return False, "Backup exceeds maximum allowed size", {}
# Safety: reject members with unsafe paths up front.
if _safe_extract_path(sandbox, info.filename) is None:
return False, f"Unsafe path in backup: {info.filename}", {}
try:
manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8")
manifest = json.loads(manifest_raw)
except (OSError, UnicodeDecodeError, json.JSONDecodeError) as e:
return False, f"Invalid manifest.json: {e}", {}
if not isinstance(manifest, dict) or "schema_version" not in manifest:
return False, "Invalid manifest structure", {}
if manifest.get("schema_version") != SCHEMA_VERSION:
return (
False,
f"Unsupported backup schema version: {manifest.get('schema_version')}",
{},
)
detected: List[str] = []
if _CONFIG_REL.as_posix() in names:
detected.append("config")
if _SECRETS_REL.as_posix() in names:
detected.append("secrets")
if _WIFI_REL.as_posix() in names:
detected.append("wifi")
if any(n.startswith(_FONTS_REL.as_posix() + "/") for n in names):
detected.append("fonts")
if any(
n.startswith(_PLUGIN_UPLOADS_REL.as_posix() + "/") and "/uploads/" in n
for n in names
):
detected.append("plugin_uploads")
plugins: List[Dict[str, Any]] = []
if PLUGINS_MANIFEST_NAME in names:
try:
plugins = json.loads(zf.read(PLUGINS_MANIFEST_NAME).decode("utf-8"))
if not isinstance(plugins, list):
plugins = []
else:
detected.append("plugins")
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
plugins = []
result_manifest = dict(manifest)
result_manifest["detected_contents"] = detected
result_manifest["plugins"] = plugins
result_manifest["total_uncompressed"] = total
result_manifest["file_count"] = len(names)
return True, "", result_manifest
except zipfile.BadZipFile:
return False, "File is not a valid ZIP archive", {}
except OSError as e:
return False, f"Could not read backup: {e}", {}
# ---------------------------------------------------------------------------
# Restore
# ---------------------------------------------------------------------------
def _extract_zip_safe(zip_path: Path, dest_dir: Path) -> None:
"""Extract ``zip_path`` into ``dest_dir`` rejecting any unsafe members."""
with zipfile.ZipFile(zip_path, "r") as zf:
for info in zf.infolist():
target = _safe_extract_path(dest_dir, info.filename)
if target is None:
raise ValueError(f"Unsafe path in backup: {info.filename}")
if info.is_dir():
target.mkdir(parents=True, exist_ok=True)
continue
target.parent.mkdir(parents=True, exist_ok=True)
with zf.open(info, "r") as src, open(target, "wb") as dst:
shutil.copyfileobj(src, dst, length=64 * 1024)
def _copy_file(src: Path, dst: Path) -> None:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
def restore_backup(
zip_path: Path,
project_root: Path,
options: Optional[RestoreOptions] = None,
) -> RestoreResult:
"""
Restore ``zip_path`` into ``project_root`` according to ``options``.
Plugin reinstalls are NOT performed here — the caller is responsible for
walking ``result.plugins_to_install`` and calling the store manager. This
keeps this module Flask-free and side-effect free beyond the filesystem.
"""
if options is None:
options = RestoreOptions()
project_root = Path(project_root).resolve()
result = RestoreResult()
ok, err, manifest = validate_backup(zip_path)
if not ok:
result.errors.append(err)
return result
result.manifest = manifest
with tempfile.TemporaryDirectory(prefix="ledmatrix_restore_") as tmp:
tmp_dir = Path(tmp)
try:
_extract_zip_safe(Path(zip_path), tmp_dir)
except (ValueError, zipfile.BadZipFile, OSError) as e:
result.errors.append(f"Failed to extract backup: {e}")
return result
# Main config.
if options.restore_config and (tmp_dir / _CONFIG_REL).exists():
try:
_copy_file(tmp_dir / _CONFIG_REL, project_root / _CONFIG_REL)
result.restored.append("config")
except OSError as e:
result.errors.append(f"Failed to restore config.json: {e}")
elif (tmp_dir / _CONFIG_REL).exists():
result.skipped.append("config")
# Secrets.
if options.restore_secrets and (tmp_dir / _SECRETS_REL).exists():
try:
_copy_file(tmp_dir / _SECRETS_REL, project_root / _SECRETS_REL)
result.restored.append("secrets")
except OSError as e:
result.errors.append(f"Failed to restore config_secrets.json: {e}")
elif (tmp_dir / _SECRETS_REL).exists():
result.skipped.append("secrets")
# WiFi.
if options.restore_wifi and (tmp_dir / _WIFI_REL).exists():
try:
_copy_file(tmp_dir / _WIFI_REL, project_root / _WIFI_REL)
result.restored.append("wifi")
except OSError as e:
result.errors.append(f"Failed to restore wifi_config.json: {e}")
elif (tmp_dir / _WIFI_REL).exists():
result.skipped.append("wifi")
# User fonts — skip anything that collides with a bundled font.
tmp_fonts = tmp_dir / _FONTS_REL
if options.restore_fonts and tmp_fonts.exists():
restored_count = 0
for font in sorted(tmp_fonts.iterdir()):
if not font.is_file():
continue
if font.name in BUNDLED_FONTS:
result.skipped.append(f"font:{font.name} (bundled)")
continue
try:
_copy_file(font, project_root / _FONTS_REL / font.name)
restored_count += 1
except OSError as e:
result.errors.append(f"Failed to restore font {font.name}: {e}")
if restored_count:
result.restored.append(f"fonts ({restored_count})")
elif tmp_fonts.exists():
result.skipped.append("fonts")
# Plugin uploads.
tmp_uploads = tmp_dir / _PLUGIN_UPLOADS_REL
if options.restore_plugin_uploads and tmp_uploads.exists():
count = 0
for root, _dirs, files in os.walk(tmp_uploads):
for name in files:
src = Path(root) / name
rel = src.relative_to(tmp_dir)
if "/uploads/" not in rel.as_posix():
result.errors.append(f"Rejected unexpected plugin path: {rel}")
continue
try:
_copy_file(src, project_root / rel)
count += 1
except OSError as e:
result.errors.append(f"Failed to restore {rel}: {e}")
if count:
result.restored.append(f"plugin_uploads ({count})")
elif tmp_uploads.exists():
result.skipped.append("plugin_uploads")
# Plugins list (for caller to reinstall).
if options.reinstall_plugins and (tmp_dir / PLUGINS_MANIFEST_NAME).exists():
try:
with (tmp_dir / PLUGINS_MANIFEST_NAME).open("r", encoding="utf-8") as f:
plugins = json.load(f)
if isinstance(plugins, list):
result.plugins_to_install = [
{"plugin_id": p.get("plugin_id"), "version": p.get("version", "")}
for p in plugins
if isinstance(p, dict) and p.get("plugin_id")
]
except (OSError, json.JSONDecodeError) as e:
result.errors.append(f"Could not read plugins.json: {e}")
result.success = not result.errors
return result

View File

@@ -1,284 +0,0 @@
"""Tests for src.backup_manager."""
from __future__ import annotations
import json
import zipfile
from pathlib import Path
import pytest
from src import backup_manager
from src.backup_manager import (
BUNDLED_FONTS,
SCHEMA_VERSION,
RestoreOptions,
create_backup,
list_installed_plugins,
preview_backup_contents,
restore_backup,
validate_backup,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_project(root: Path) -> Path:
"""Build a minimal fake project tree under ``root``."""
(root / "config").mkdir(parents=True)
(root / "config" / "config.json").write_text(
json.dumps({"web_ui": {"port": 8080}, "my-plugin": {"enabled": True, "favorites": ["A", "B"]}}),
encoding="utf-8",
)
(root / "config" / "config_secrets.json").write_text(
json.dumps({"ledmatrix-weather": {"api_key": "SECRET"}}),
encoding="utf-8",
)
(root / "config" / "wifi_config.json").write_text(
json.dumps({"ap_mode": {"ssid": "LEDMatrix"}}),
encoding="utf-8",
)
fonts = root / "assets" / "fonts"
fonts.mkdir(parents=True)
# One bundled font (should be excluded) and one user-uploaded font.
(fonts / "5x7.bdf").write_text("BUNDLED", encoding="utf-8")
(fonts / "my-custom-font.ttf").write_bytes(b"\x00\x01USER")
uploads = root / "assets" / "plugins" / "static-image" / "uploads"
uploads.mkdir(parents=True)
(uploads / "image_1.png").write_bytes(b"\x89PNG\r\n\x1a\nfake")
(uploads / ".metadata.json").write_text(json.dumps({"a": 1}), encoding="utf-8")
# plugin-repos for installed-plugin enumeration.
plugin_dir = root / "plugin-repos" / "my-plugin"
plugin_dir.mkdir(parents=True)
(plugin_dir / "manifest.json").write_text(
json.dumps({"id": "my-plugin", "version": "1.2.3"}),
encoding="utf-8",
)
# plugin_state.json
(root / "data").mkdir()
(root / "data" / "plugin_state.json").write_text(
json.dumps(
{
"version": 1,
"states": {
"my-plugin": {"version": "1.2.3", "enabled": True},
"other-plugin": {"version": "0.1.0", "enabled": False},
},
}
),
encoding="utf-8",
)
return root
@pytest.fixture
def project(tmp_path: Path) -> Path:
return _make_project(tmp_path / "src_project")
@pytest.fixture
def empty_project(tmp_path: Path) -> Path:
root = tmp_path / "dst_project"
root.mkdir()
# Pre-seed only the bundled font to simulate a fresh install.
(root / "assets" / "fonts").mkdir(parents=True)
(root / "assets" / "fonts" / "5x7.bdf").write_text("BUNDLED", encoding="utf-8")
return root
# ---------------------------------------------------------------------------
# BUNDLED_FONTS sanity
# ---------------------------------------------------------------------------
def test_bundled_fonts_matches_repo() -> None:
"""Every entry in BUNDLED_FONTS must exist on disk in assets/fonts/.
The reverse direction is intentionally not checked: real installations
have user-uploaded fonts in the same directory, and they should be
treated as user data (not bundled).
"""
repo_fonts = Path(__file__).resolve().parent.parent / "assets" / "fonts"
if not repo_fonts.exists():
pytest.skip("assets/fonts not present in test env")
on_disk = {p.name for p in repo_fonts.iterdir() if p.is_file()}
missing = set(BUNDLED_FONTS) - on_disk
assert not missing, f"BUNDLED_FONTS references files not in assets/fonts/: {missing}"
# ---------------------------------------------------------------------------
# Preview / enumeration
# ---------------------------------------------------------------------------
def test_list_installed_plugins(project: Path) -> None:
plugins = list_installed_plugins(project)
ids = [p["plugin_id"] for p in plugins]
assert "my-plugin" in ids
assert "other-plugin" in ids
my = next(p for p in plugins if p["plugin_id"] == "my-plugin")
assert my["version"] == "1.2.3"
def test_preview_backup_contents(project: Path) -> None:
preview = preview_backup_contents(project)
assert preview["has_config"] is True
assert preview["has_secrets"] is True
assert preview["has_wifi"] is True
assert preview["user_fonts"] == ["my-custom-font.ttf"]
assert preview["plugin_uploads"] >= 2
assert any(p["plugin_id"] == "my-plugin" for p in preview["plugins"])
# ---------------------------------------------------------------------------
# Export
# ---------------------------------------------------------------------------
def test_create_backup_contents(project: Path, tmp_path: Path) -> None:
out_dir = tmp_path / "exports"
zip_path = create_backup(project, output_dir=out_dir)
assert zip_path.exists()
assert zip_path.parent == out_dir
with zipfile.ZipFile(zip_path) as zf:
names = set(zf.namelist())
assert "manifest.json" in names
assert "config/config.json" in names
assert "config/config_secrets.json" in names
assert "config/wifi_config.json" in names
assert "assets/fonts/my-custom-font.ttf" in names
# Bundled font must NOT be included.
assert "assets/fonts/5x7.bdf" not in names
assert "assets/plugins/static-image/uploads/image_1.png" in names
assert "plugins.json" in names
def test_create_backup_manifest(project: Path, tmp_path: Path) -> None:
zip_path = create_backup(project, output_dir=tmp_path / "exports")
with zipfile.ZipFile(zip_path) as zf:
manifest = json.loads(zf.read("manifest.json"))
assert manifest["schema_version"] == backup_manager.SCHEMA_VERSION
assert "created_at" in manifest
assert set(manifest["contents"]) >= {"config", "secrets", "wifi", "fonts", "plugin_uploads", "plugins"}
# ---------------------------------------------------------------------------
# Validate
# ---------------------------------------------------------------------------
def test_validate_backup_ok(project: Path, tmp_path: Path) -> None:
zip_path = create_backup(project, output_dir=tmp_path / "exports")
ok, err, manifest = validate_backup(zip_path)
assert ok, err
assert err == ""
assert "config" in manifest["detected_contents"]
assert "secrets" in manifest["detected_contents"]
assert any(p["plugin_id"] == "my-plugin" for p in manifest["plugins"])
def test_validate_backup_missing_manifest(tmp_path: Path) -> None:
zip_path = tmp_path / "bad.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("config/config.json", "{}")
ok, err, _ = validate_backup(zip_path)
assert not ok
assert "manifest" in err.lower()
def test_validate_backup_bad_schema_version(tmp_path: Path) -> None:
zip_path = tmp_path / "bad.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("manifest.json", json.dumps({"schema_version": 999}))
ok, err, _ = validate_backup(zip_path)
assert not ok
assert "schema" in err.lower()
def test_validate_backup_rejects_zip_traversal(tmp_path: Path) -> None:
zip_path = tmp_path / "malicious.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("manifest.json", json.dumps({"schema_version": SCHEMA_VERSION, "contents": []}))
zf.writestr("../../etc/passwd", "x")
ok, err, _ = validate_backup(zip_path)
assert not ok
assert "unsafe" in err.lower()
def test_validate_backup_not_a_zip(tmp_path: Path) -> None:
p = tmp_path / "nope.zip"
p.write_text("hello", encoding="utf-8")
ok, _err, _ = validate_backup(p)
assert not ok
# ---------------------------------------------------------------------------
# Restore
# ---------------------------------------------------------------------------
def test_restore_roundtrip(project: Path, empty_project: Path, tmp_path: Path) -> None:
zip_path = create_backup(project, output_dir=tmp_path / "exports")
result = restore_backup(zip_path, empty_project, RestoreOptions())
assert result.success, result.errors
assert "config" in result.restored
assert "secrets" in result.restored
assert "wifi" in result.restored
# Files exist with correct contents.
restored_config = json.loads((empty_project / "config" / "config.json").read_text())
assert restored_config["my-plugin"]["favorites"] == ["A", "B"]
restored_secrets = json.loads((empty_project / "config" / "config_secrets.json").read_text())
assert restored_secrets["ledmatrix-weather"]["api_key"] == "SECRET"
# User font restored, bundled font untouched.
assert (empty_project / "assets" / "fonts" / "my-custom-font.ttf").read_bytes() == b"\x00\x01USER"
assert (empty_project / "assets" / "fonts" / "5x7.bdf").read_text() == "BUNDLED"
# Plugin uploads restored.
assert (empty_project / "assets" / "plugins" / "static-image" / "uploads" / "image_1.png").exists()
# Plugins to install surfaced for the caller.
plugin_ids = {p["plugin_id"] for p in result.plugins_to_install}
assert "my-plugin" in plugin_ids
def test_restore_honors_options(project: Path, empty_project: Path, tmp_path: Path) -> None:
zip_path = create_backup(project, output_dir=tmp_path / "exports")
opts = RestoreOptions(
restore_config=True,
restore_secrets=False,
restore_wifi=False,
restore_fonts=False,
restore_plugin_uploads=False,
reinstall_plugins=False,
)
result = restore_backup(zip_path, empty_project, opts)
assert result.success, result.errors
assert (empty_project / "config" / "config.json").exists()
assert not (empty_project / "config" / "config_secrets.json").exists()
assert not (empty_project / "config" / "wifi_config.json").exists()
assert not (empty_project / "assets" / "fonts" / "my-custom-font.ttf").exists()
assert result.plugins_to_install == []
assert "secrets" in result.skipped
assert "wifi" in result.skipped
def test_restore_rejects_malicious_zip(empty_project: Path, tmp_path: Path) -> None:
zip_path = tmp_path / "bad.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("manifest.json", json.dumps({"schema_version": SCHEMA_VERSION, "contents": []}))
zf.writestr("../escape.txt", "x")
result = restore_backup(zip_path, empty_project, RestoreOptions())
# validate_backup catches it before extraction.
assert not result.success
assert any("unsafe" in e.lower() for e in result.errors)

View File

@@ -2,14 +2,13 @@ from flask import Blueprint, request, jsonify, Response, send_from_directory
import json
import os
import re
import socket
import sys
import subprocess
import time
import hashlib
import uuid
import logging
from datetime import datetime, timezone
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple, Dict, Any, Type
@@ -1106,290 +1105,6 @@ def save_raw_secrets_config():
status_code=500
)
# ---------------------------------------------------------------------------
# Backup & Restore
# ---------------------------------------------------------------------------
_BACKUP_FILENAME_RE = re.compile(r'^ledmatrix-backup-[A-Za-z0-9_-]+-\d{8}_\d{6}\.zip$')
def _backup_exports_dir() -> Path:
"""Directory where user-downloadable backup ZIPs are stored."""
d = PROJECT_ROOT / 'config' / 'backups' / 'exports'
d.mkdir(parents=True, exist_ok=True)
return d
def _is_safe_backup_filename(name: str) -> bool:
"""Allow-list filter for backup filenames used in download/delete."""
return bool(_BACKUP_FILENAME_RE.match(name))
@api_v3.route('/backup/preview', methods=['GET'])
def backup_preview():
"""Return a summary of what a new backup would include."""
try:
from src import backup_manager
preview = backup_manager.preview_backup_contents(PROJECT_ROOT)
return jsonify({'status': 'success', 'data': preview})
except Exception:
logger.exception("[Backup] preview failed")
return jsonify({'status': 'error', 'message': 'Failed to compute backup preview'}), 500
@api_v3.route('/backup/export', methods=['POST'])
def backup_export():
"""Create a new backup ZIP and return its filename."""
try:
from src import backup_manager
zip_path = backup_manager.create_backup(PROJECT_ROOT, output_dir=_backup_exports_dir())
return jsonify({
'status': 'success',
'filename': zip_path.name,
'size': zip_path.stat().st_size,
'created_at': datetime.now(timezone.utc).isoformat(),
})
except Exception:
logger.exception("[Backup] export failed")
return jsonify({'status': 'error', 'message': 'Failed to create backup'}), 500
@api_v3.route('/backup/list', methods=['GET'])
def backup_list():
"""List backup ZIPs currently stored on disk."""
try:
exports = _backup_exports_dir()
entries = []
for path in sorted(exports.glob('ledmatrix-backup-*.zip'), reverse=True):
if not _is_safe_backup_filename(path.name):
continue
stat = path.stat()
entries.append({
'filename': path.name,
'size': stat.st_size,
'created_at': datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat(),
})
return jsonify({'status': 'success', 'data': entries})
except Exception:
logger.exception("[Backup] list failed")
return jsonify({'status': 'error', 'message': 'Failed to list backups'}), 500
@api_v3.route('/backup/download/<path:filename>', methods=['GET'])
def backup_download(filename):
"""Stream a previously-created backup ZIP to the browser."""
try:
if not _is_safe_backup_filename(filename):
return jsonify({'status': 'error', 'message': 'Invalid backup filename'}), 400
exports = _backup_exports_dir()
target = exports / filename
if not target.exists():
return jsonify({'status': 'error', 'message': 'Backup not found'}), 404
return send_from_directory(
str(exports),
filename,
as_attachment=True,
mimetype='application/zip',
)
except Exception:
logger.exception("[Backup] download failed")
return jsonify({'status': 'error', 'message': 'Failed to download backup'}), 500
@api_v3.route('/backup/<path:filename>', methods=['DELETE'])
def backup_delete(filename):
"""Delete a stored backup ZIP."""
try:
if not _is_safe_backup_filename(filename):
return jsonify({'status': 'error', 'message': 'Invalid backup filename'}), 400
target = _backup_exports_dir() / filename
if not target.exists():
return jsonify({'status': 'error', 'message': 'Backup not found'}), 404
target.unlink()
return jsonify({'status': 'success', 'message': f'Deleted {filename}'})
except Exception:
logger.exception("[Backup] delete failed")
return jsonify({'status': 'error', 'message': 'Failed to delete backup'}), 500
def _save_uploaded_backup_to_temp() -> Tuple[Optional[Path], Optional[Tuple[Response, int]]]:
"""Shared upload handler for validate/restore endpoints. Returns
``(temp_path, None)`` on success or ``(None, error_response)`` on failure.
The caller is responsible for deleting the returned temp file."""
import tempfile as _tempfile
if 'backup_file' not in request.files:
return None, (jsonify({'status': 'error', 'message': 'No backup file provided'}), 400)
upload = request.files['backup_file']
if not upload.filename:
return None, (jsonify({'status': 'error', 'message': 'No file selected'}), 400)
is_valid, err = validate_file_upload(
upload.filename,
max_size_mb=200,
allowed_extensions=['.zip'],
)
if not is_valid:
return None, (jsonify({'status': 'error', 'message': err}), 400)
fd, tmp_name = _tempfile.mkstemp(prefix='ledmatrix_upload_', suffix='.zip')
os.close(fd)
tmp_path = Path(tmp_name)
max_bytes = 200 * 1024 * 1024
try:
written = 0
with open(tmp_path, 'wb') as fh:
while True:
chunk = upload.stream.read(65536)
if not chunk:
break
written += len(chunk)
if written > max_bytes:
fh.close()
tmp_path.unlink(missing_ok=True)
return None, (jsonify({'status': 'error', 'message': 'Backup file exceeds 200 MB limit'}), 413)
fh.write(chunk)
except Exception:
tmp_path.unlink(missing_ok=True)
logger.exception("[Backup] Failed to save uploaded backup")
return None, (jsonify({'status': 'error', 'message': 'Failed to read uploaded file'}), 500)
return tmp_path, None
@api_v3.route('/backup/validate', methods=['POST'])
def backup_validate():
"""Inspect an uploaded backup without applying it."""
tmp_path, err = _save_uploaded_backup_to_temp()
if err is not None:
return err
try:
from src import backup_manager
ok, error, manifest = backup_manager.validate_backup(tmp_path)
if not ok:
return jsonify({'status': 'error', 'message': error}), 400
return jsonify({'status': 'success', 'data': manifest})
except Exception:
logger.exception("[Backup] validate failed")
return jsonify({'status': 'error', 'message': 'Failed to validate backup'}), 500
finally:
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
@api_v3.route('/backup/restore', methods=['POST'])
def backup_restore():
"""
Restore an uploaded backup into the running installation.
The request is multipart/form-data with:
- ``backup_file``: the ZIP upload
- ``options``: JSON string with RestoreOptions fields (all boolean)
"""
tmp_path, err = _save_uploaded_backup_to_temp()
if err is not None:
return err
try:
from src import backup_manager
# Parse options (all optional; default is "restore everything").
raw_opts = request.form.get('options') or '{}'
try:
opts_dict = json.loads(raw_opts)
except json.JSONDecodeError:
return jsonify({'status': 'error', 'message': 'Invalid options JSON'}), 400
if not isinstance(opts_dict, dict):
return jsonify({'status': 'error', 'message': 'options must be an object'}), 400
opts = backup_manager.RestoreOptions(
restore_config=_coerce_to_bool(opts_dict.get('restore_config', True)),
restore_secrets=_coerce_to_bool(opts_dict.get('restore_secrets', True)),
restore_wifi=_coerce_to_bool(opts_dict.get('restore_wifi', True)),
restore_fonts=_coerce_to_bool(opts_dict.get('restore_fonts', True)),
restore_plugin_uploads=_coerce_to_bool(opts_dict.get('restore_plugin_uploads', True)),
reinstall_plugins=_coerce_to_bool(opts_dict.get('reinstall_plugins', True)),
)
# Snapshot current config through the atomic manager so the pre-restore
# state is recoverable via the existing rollback_config() path.
if api_v3.config_manager and opts.restore_config:
try:
current = api_v3.config_manager.load_config()
snapshot_ok, snapshot_err = _save_config_atomic(api_v3.config_manager, current, create_backup=True)
if not snapshot_ok:
logger.warning("[Backup] Pre-restore snapshot failed: %s (continuing)", snapshot_err)
except Exception:
logger.warning("[Backup] Pre-restore snapshot failed (continuing)", exc_info=True)
result = backup_manager.restore_backup(tmp_path, PROJECT_ROOT, opts)
# Reinstall plugins via the store manager, one at a time.
if opts.reinstall_plugins and api_v3.plugin_store_manager and result.plugins_to_install:
installed_names = set()
if api_v3.plugin_manager:
try:
existing = api_v3.plugin_manager.get_all_plugin_info() or []
installed_names = {p.get('id') for p in existing if p.get('id')}
except Exception:
installed_names = set()
for entry in result.plugins_to_install:
plugin_id = entry.get('plugin_id')
if not plugin_id:
continue
if plugin_id in installed_names:
result.plugins_installed.append(plugin_id)
continue
try:
ok = api_v3.plugin_store_manager.install_plugin(plugin_id)
if ok:
if api_v3.schema_manager:
api_v3.schema_manager.invalidate_cache(plugin_id)
if api_v3.plugin_manager:
api_v3.plugin_manager.discover_plugins()
api_v3.plugin_manager.load_plugin(plugin_id)
if api_v3.plugin_state_manager:
api_v3.plugin_state_manager.set_plugin_installed(plugin_id)
result.plugins_installed.append(plugin_id)
else:
result.plugins_failed.append({'plugin_id': plugin_id, 'error': 'install returned False'})
except Exception as install_err:
logger.exception("[Backup] plugin reinstall failed for %s", plugin_id)
result.plugins_failed.append({'plugin_id': plugin_id, 'error': str(install_err)})
# Clear font catalog cache so restored fonts show up.
if any(r.startswith("fonts") for r in result.restored):
try:
from web_interface.cache import delete_cached
delete_cached('fonts_catalog')
except Exception:
logger.warning("[Backup] Failed to clear font cache", exc_info=True)
# Reload config_manager state so the UI picks up the new values
# without a full service restart.
if api_v3.config_manager and opts.restore_config:
try:
api_v3.config_manager.load_config(force_reload=True)
except TypeError:
try:
api_v3.config_manager.load_config()
except Exception:
logger.warning("[Backup] Could not reload config after restore", exc_info=True)
except Exception:
logger.warning("[Backup] Could not reload config after restore", exc_info=True)
return jsonify({
'status': 'success' if result.success else 'partial',
'data': result.to_dict(),
})
except Exception:
logger.exception("[Backup] restore failed")
return jsonify({'status': 'error', 'message': 'Failed to restore backup'}), 500
finally:
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
@api_v3.route('/system/status', methods=['GET'])
def get_system_status():
"""Get system status"""
@@ -6650,146 +6365,18 @@ def upload_calendar_credentials():
logger.exception("[PluginConfig] upload_calendar_credentials failed")
return jsonify({'status': 'error', 'message': 'Failed to upload calendar credentials'}), 500
def _load_calendar_plugin_dir():
"""Resolve the calendar plugin's on-disk directory without requiring a running instance.
The web service and display service are separate processes — the web
process discovers plugins but does not instantiate them, so
plugin_manager.get_plugin('calendar') is typically None here.
"""
plugin_id = 'calendar'
if api_v3.plugin_manager:
plugin_dir = api_v3.plugin_manager.get_plugin_directory(plugin_id)
if plugin_dir and Path(plugin_dir).exists():
return Path(plugin_dir)
fallback = PROJECT_ROOT / 'plugins' / plugin_id
return fallback if fallback.exists() else None
_GOOGLE_API_TIMEOUT_SECONDS = 15
def _load_calendar_credentials(token_path):
"""Load OAuth credentials from the plugin's token file.
The calendar plugin historically persists credentials with pickle
(``token.pickle``). pickle.load is only applied to this specific file,
which is owned by the same user as the web service, chmod 0600, and
located inside the plugin install directory — it is not user-supplied
input. We still constrain the unpickle to a reasonable size to reduce
blast radius. New installs may use a JSON token (``token.json``)
written via google-auth's safe serializer; prefer that when present.
"""
json_path = token_path.with_suffix('.json')
if json_path.exists():
from google.oauth2.credentials import Credentials
return Credentials.from_authorized_user_file(str(json_path))
# Fall back to the pickle token the plugin writes today.
# nosemgrep: python.lang.security.audit.avoid-pickle.avoid-pickle
import pickle # noqa: S403
try:
size = token_path.stat().st_size
except OSError as e:
raise RuntimeError(f'Cannot stat token file: {e}') from e
if size > 64 * 1024:
raise RuntimeError('Token file is unexpectedly large; refusing to load.')
with open(token_path, 'rb') as f:
return pickle.load(f) # noqa: S301 # trusted file, owner-only perms
def _list_google_calendars_from_disk():
"""List calendars using the plugin's stored OAuth token.
Returns (calendars, error_message). calendars is a list of raw Google
calendarList items on success; on failure calendars is None and
error_message describes the problem.
Refreshed credentials are intentionally not persisted back to disk
from this request path — the display service owns token.pickle and
concurrent writes across processes could corrupt it. If refresh is
needed, it happens only in memory for the duration of this request.
"""
try:
import google_auth_httplib2
import httplib2
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
except ImportError:
return None, 'Google API libraries not installed on this host.'
plugin_dir = _load_calendar_plugin_dir()
if plugin_dir is None:
return None, 'Calendar plugin directory not found.'
token_path = plugin_dir / 'token.pickle'
if not token_path.exists() and not (plugin_dir / 'token.json').exists():
return None, 'Not authenticated yet — complete the Google authentication step first.'
try:
creds = _load_calendar_credentials(token_path)
except Exception as e:
logger.exception('list_calendar_calendars: failed to load stored credentials')
return None, f'Failed to load stored authentication: {e}'
if not creds or not getattr(creds, 'valid', False):
if creds and getattr(creds, 'expired', False) and getattr(creds, 'refresh_token', None):
try:
# In-memory refresh only; do not write back to shared token file.
creds.refresh(Request(timeout=_GOOGLE_API_TIMEOUT_SECONDS))
except (socket.timeout, TimeoutError) as e:
logger.warning('list_calendar_calendars: token refresh timed out: %s', e)
return None, 'Token refresh timed out. Please try again.'
except Exception as e:
logger.exception('list_calendar_calendars: token refresh failed')
return None, f'Stored authentication expired and refresh failed: {e}. Re-run the Google authentication step.'
else:
return None, 'Stored authentication is invalid. Re-run the Google authentication step.'
try:
# Build an Http with an explicit socket timeout so API calls cannot
# hang the Flask worker on flaky connectivity.
authed_http = google_auth_httplib2.AuthorizedHttp(
creds, http=httplib2.Http(timeout=_GOOGLE_API_TIMEOUT_SECONDS)
)
service = build('calendar', 'v3', http=authed_http, cache_discovery=False)
items = []
page_token = None
while True:
response = service.calendarList().list(pageToken=page_token).execute(
num_retries=1
)
items.extend(response.get('items', []))
page_token = response.get('nextPageToken')
if not page_token:
break
return items, None
except (socket.timeout, TimeoutError) as e:
logger.warning('list_calendar_calendars: Google API call timed out: %s', e)
return None, 'Google Calendar request timed out. Please try again.'
except Exception as e:
logger.exception('list_calendar_calendars: Google API call failed')
return None, f'Google Calendar API call failed: {e}'
@api_v3.route('/plugins/calendar/list-calendars', methods=['GET'])
def list_calendar_calendars():
"""Return Google Calendars accessible with the currently authenticated credentials.
Reads credentials from the plugin directory directly so this works from the
web process (which does not instantiate plugins).
"""
# Prefer a live plugin instance if one happens to exist (e.g. local dev where
# web and display share a process); otherwise fall back to on-disk credentials.
plugin = api_v3.plugin_manager.get_plugin('calendar') if api_v3.plugin_manager else None
"""Return Google Calendars accessible with the currently authenticated credentials."""
if not api_v3.plugin_manager:
return jsonify({'status': 'error', 'message': 'Plugin manager not available'}), 500
plugin = api_v3.plugin_manager.get_plugin('calendar')
if not plugin:
return jsonify({'status': 'error', 'message': 'Calendar plugin is not running. Enable it and save config first.'}), 404
if not hasattr(plugin, 'get_calendars'):
return jsonify({'status': 'error', 'message': 'Installed plugin version does not support calendar listing — update the plugin.'}), 400
try:
if plugin is not None and hasattr(plugin, 'get_calendars'):
raw = plugin.get_calendars()
else:
raw, err = _list_google_calendars_from_disk()
if raw is None:
return jsonify({'status': 'error', 'message': err}), 400
import collections.abc
if not isinstance(raw, (list, tuple)):
logger.error('list_calendar_calendars: get_calendars() returned non-sequence type %r', type(raw))

View File

@@ -76,8 +76,6 @@ def load_partial(partial_name):
return _load_logs_partial()
elif partial_name == 'raw-json':
return _load_raw_json_partial()
elif partial_name == 'backup-restore':
return _load_backup_restore_partial()
elif partial_name == 'wifi':
return _load_wifi_partial()
elif partial_name == 'cache':
@@ -298,13 +296,6 @@ def _load_raw_json_partial():
except Exception as e:
return f"Error: {str(e)}", 500
def _load_backup_restore_partial():
"""Load backup & restore partial."""
try:
return render_template('v3/partials/backup_restore.html')
except Exception as e:
return f"Error: {str(e)}", 500
@pages_v3.route('/setup')
def captive_setup():
"""Lightweight captive portal setup page — self-contained, no frameworks."""

View File

@@ -221,7 +221,6 @@
}
notifyFn(`Upload error: ${error.message}`, 'error');
} finally {
const fileInput = document.getElementById(`${fieldId}_file_input`);
if (fileInput) fileInput.value = '';
}
};

View File

@@ -937,11 +937,6 @@
class="nav-tab">
<i class="fas fa-file-code"></i>Config Editor
</button>
<button @click="activeTab = 'backup-restore'"
:class="activeTab === 'backup-restore' ? 'nav-tab-active' : ''"
class="nav-tab">
<i class="fas fa-save"></i>Backup &amp; Restore
</button>
<button @click="activeTab = 'fonts'"
:class="activeTab === 'fonts' ? 'nav-tab-active' : ''"
class="nav-tab">
@@ -1144,18 +1139,6 @@
</div>
</div>
<!-- Backup & Restore tab -->
<div x-show="activeTab === 'backup-restore'" x-transition>
<div id="backup-restore-content" hx-get="/v3/partials/backup-restore" hx-trigger="revealed" hx-swap="innerHTML">
<div class="animate-pulse">
<div class="bg-white rounded-lg shadow p-6">
<div class="h-4 bg-gray-200 rounded w-1/4 mb-4"></div>
<div class="h-32 bg-gray-200 rounded"></div>
</div>
</div>
</div>
</div>
<!-- Config Editor tab -->
<div x-show="activeTab === 'config-editor'" x-transition>
<div id="config-editor-content" hx-get="/v3/partials/raw-json" hx-trigger="revealed" hx-swap="innerHTML">

View File

@@ -1,358 +0,0 @@
<div class="space-y-6" id="backup-restore-root">
<!-- Security warning -->
<div class="bg-red-50 border border-red-200 rounded-lg p-4">
<div class="flex">
<div class="flex-shrink-0">
<i class="fas fa-exclamation-triangle text-red-600"></i>
</div>
<div class="ml-3">
<h3 class="text-sm font-medium text-red-800">Backup files contain secrets in plaintext</h3>
<div class="mt-1 text-sm text-red-700">
Your API keys (weather, Spotify, YouTube, GitHub, etc.) and any saved WiFi passwords
are stored inside the backup ZIP as plain text. Treat the file like a password —
store it somewhere private and delete it when you no longer need it.
</div>
</div>
</div>
</div>
<!-- Export card -->
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<div class="flex items-center justify-between">
<div>
<h2 class="text-lg font-semibold text-gray-900">Export backup</h2>
<p class="mt-1 text-sm text-gray-600">
Download a single ZIP with all of your settings so you can restore it later.
</p>
</div>
<button onclick="exportBackup()" id="export-backup-btn"
class="inline-flex items-center px-4 py-2 border border-transparent text-sm font-medium rounded-md text-white bg-blue-600 hover:bg-blue-700">
<i class="fas fa-download mr-2"></i>
Download backup
</button>
</div>
</div>
<div id="export-preview" class="text-sm text-gray-600">
<div class="animate-pulse">Loading summary…</div>
</div>
</div>
<!-- Restore card -->
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<h2 class="text-lg font-semibold text-gray-900">Restore from backup</h2>
<p class="mt-1 text-sm text-gray-600">
Upload a backup ZIP exported from this or another LEDMatrix install.
You'll see a summary before anything is written to disk.
</p>
</div>
<div class="space-y-4">
<div>
<label class="block text-sm font-medium text-gray-700 mb-2">Backup file</label>
<input type="file" id="restore-file-input" accept=".zip"
class="block w-full text-sm text-gray-700 file:mr-4 file:py-2 file:px-4 file:rounded-md file:border-0 file:text-sm file:font-medium file:bg-blue-50 file:text-blue-700 hover:file:bg-blue-100">
</div>
<div>
<button onclick="validateRestoreFile()" id="validate-restore-btn"
class="inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-check-circle mr-2"></i>
Inspect file
</button>
</div>
<div id="restore-preview" class="hidden bg-gray-50 border border-gray-200 rounded-md p-4">
<h3 class="text-sm font-medium text-gray-900 mb-2">Backup contents</h3>
<dl id="restore-preview-body" class="text-sm text-gray-700 space-y-1"></dl>
<h3 class="text-sm font-medium text-gray-900 mt-4 mb-2">Choose what to restore</h3>
<div class="grid grid-cols-1 md:grid-cols-2 gap-2 text-sm text-gray-700">
<label class="flex items-center gap-2"><input type="checkbox" id="opt-config" checked> <span>Main configuration</span></label>
<label class="flex items-center gap-2"><input type="checkbox" id="opt-secrets" checked> <span>API keys (secrets)</span></label>
<label class="flex items-center gap-2"><input type="checkbox" id="opt-wifi" checked> <span>WiFi configuration</span></label>
<label class="flex items-center gap-2"><input type="checkbox" id="opt-fonts" checked> <span>User-uploaded fonts</span></label>
<label class="flex items-center gap-2"><input type="checkbox" id="opt-plugin-uploads" checked> <span>Plugin image uploads</span></label>
<label class="flex items-center gap-2"><input type="checkbox" id="opt-reinstall" checked> <span>Reinstall missing plugins</span></label>
</div>
<div class="mt-4 flex gap-2">
<button onclick="runRestore()" id="run-restore-btn"
class="inline-flex items-center px-4 py-2 border border-transparent text-sm font-medium rounded-md text-white bg-green-600 hover:bg-green-700">
<i class="fas fa-upload mr-2"></i>
Restore now
</button>
<button onclick="clearRestore()"
class="inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
Cancel
</button>
</div>
</div>
<div id="restore-result" class="hidden"></div>
</div>
</div>
<!-- History card -->
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<div class="flex items-center justify-between">
<div>
<h2 class="text-lg font-semibold text-gray-900">Backup history</h2>
<p class="mt-1 text-sm text-gray-600">Previously exported backups stored on this device.</p>
</div>
<button onclick="loadBackupList()"
class="inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-sync-alt mr-2"></i>
Refresh
</button>
</div>
</div>
<div id="backup-history" class="text-sm text-gray-600">Loading…</div>
</div>
</div>
<script>
(function () {
let inspectedFile = null;
function notify(message, kind) {
if (typeof showNotification === 'function') {
showNotification(message, kind || 'info');
} else {
console.log('[backup]', kind || 'info', message);
}
}
function formatSize(bytes) {
if (!bytes) return '0 B';
const units = ['B', 'KB', 'MB', 'GB'];
let i = 0, size = bytes;
while (size >= 1024 && i < units.length - 1) { size /= 1024; i++; }
return size.toFixed(i === 0 ? 0 : 1) + ' ' + units[i];
}
function escapeHtml(value) {
return String(value == null ? '' : value).replace(/[&<>"']/g, function (c) {
return ({ '&': '&amp;', '<': '&lt;', '>': '&gt;', '"': '&quot;', "'": '&#39;' })[c];
});
}
async function loadPreview() {
const el = document.getElementById('export-preview');
try {
const res = await fetch('/api/v3/backup/preview');
const payload = await res.json();
if (payload.status !== 'success') throw new Error(payload.message || 'Preview failed');
const d = payload.data || {};
el.innerHTML = `
<ul class="list-disc pl-5 space-y-1">
<li>Main config: <strong>${d.has_config ? 'yes' : 'no'}</strong></li>
<li>Secrets: <strong>${d.has_secrets ? 'yes' : 'no'}</strong></li>
<li>WiFi config: <strong>${d.has_wifi ? 'yes' : 'no'}</strong></li>
<li>User fonts: <strong>${(d.user_fonts || []).length}</strong> ${d.user_fonts && d.user_fonts.length ? '(' + d.user_fonts.map(escapeHtml).join(', ') + ')' : ''}</li>
<li>Plugin image uploads: <strong>${d.plugin_uploads || 0}</strong> file(s)</li>
<li>Installed plugins: <strong>${(d.plugins || []).length}</strong></li>
</ul>`;
} catch (err) {
el.textContent = 'Could not load preview: ' + err.message;
}
}
async function loadBackupList() {
const el = document.getElementById('backup-history');
el.textContent = 'Loading…';
try {
const res = await fetch('/api/v3/backup/list');
const payload = await res.json();
if (payload.status !== 'success') throw new Error(payload.message || 'List failed');
const entries = payload.data || [];
if (!entries.length) {
el.innerHTML = '<p>No backups have been created yet.</p>';
return;
}
el.innerHTML = `
<table class="min-w-full divide-y divide-gray-200">
<thead>
<tr>
<th class="text-left py-2">Filename</th>
<th class="text-left py-2">Size</th>
<th class="text-left py-2">Created</th>
<th></th>
</tr>
</thead>
<tbody class="divide-y divide-gray-100">
${entries.map(e => `
<tr>
<td class="py-2 font-mono text-xs">${escapeHtml(e.filename)}</td>
<td class="py-2">${formatSize(e.size)}</td>
<td class="py-2">${escapeHtml(e.created_at)}</td>
<td class="py-2 text-right space-x-2">
<a href="/api/v3/backup/download/${encodeURIComponent(e.filename)}"
class="text-blue-600 hover:underline">Download</a>
<button data-filename="${escapeHtml(e.filename)}"
class="text-red-600 hover:underline backup-delete-btn">Delete</button>
</td>
</tr>
`).join('')}
</tbody>
</table>`;
el.querySelectorAll('.backup-delete-btn').forEach(btn => {
btn.addEventListener('click', () => deleteBackup(btn.dataset.filename));
});
} catch (err) {
el.textContent = 'Could not load backups: ' + err.message;
}
}
async function exportBackup() {
const btn = document.getElementById('export-backup-btn');
btn.disabled = true;
btn.innerHTML = '<i class="fas fa-spinner fa-spin mr-2"></i>Creating…';
try {
const res = await fetch('/api/v3/backup/export', { method: 'POST' });
const payload = await res.json();
if (payload.status !== 'success') throw new Error(payload.message || 'Export failed');
notify('Backup created: ' + payload.filename, 'success');
// Trigger browser download immediately.
window.location.href = '/api/v3/backup/download/' + encodeURIComponent(payload.filename);
await loadBackupList();
} catch (err) {
notify('Export failed: ' + err.message, 'error');
} finally {
btn.disabled = false;
btn.innerHTML = '<i class="fas fa-download mr-2"></i>Download backup';
}
}
async function deleteBackup(filename) {
if (!confirm('Delete ' + filename + '?')) return;
try {
const res = await fetch('/api/v3/backup/' + encodeURIComponent(filename), { method: 'DELETE' });
const payload = await res.json();
if (payload.status !== 'success') throw new Error(payload.message || 'Delete failed');
notify('Backup deleted', 'success');
await loadBackupList();
} catch (err) {
notify('Delete failed: ' + err.message, 'error');
}
}
async function validateRestoreFile() {
const input = document.getElementById('restore-file-input');
if (!input.files || !input.files[0]) {
notify('Choose a backup file first', 'error');
return;
}
const file = input.files[0];
const fd = new FormData();
fd.append('backup_file', file);
try {
const res = await fetch('/api/v3/backup/validate', { method: 'POST', body: fd });
const payload = await res.json();
if (payload.status !== 'success') throw new Error(payload.message || 'Validation failed');
inspectedFile = file;
renderRestorePreview(payload.data);
} catch (err) {
notify('Invalid backup: ' + err.message, 'error');
}
}
function renderRestorePreview(manifest) {
const wrap = document.getElementById('restore-preview');
const body = document.getElementById('restore-preview-body');
const detected = manifest.detected_contents || [];
const plugins = manifest.plugins || [];
body.innerHTML = `
<div><strong>Created:</strong> ${escapeHtml(manifest.created_at || 'unknown')}</div>
<div><strong>Source host:</strong> ${escapeHtml(manifest.hostname || 'unknown')}</div>
<div><strong>LEDMatrix version:</strong> ${escapeHtml(manifest.ledmatrix_version || 'unknown')}</div>
<div><strong>Includes:</strong> ${detected.length ? detected.map(escapeHtml).join(', ') : '(nothing detected)'}</div>
<div><strong>Plugins referenced:</strong> ${plugins.length ? plugins.map(p => escapeHtml(p.plugin_id)).join(', ') : 'none'}</div>
`;
wrap.classList.remove('hidden');
}
function clearRestore() {
inspectedFile = null;
document.getElementById('restore-preview').classList.add('hidden');
document.getElementById('restore-result').classList.add('hidden');
document.getElementById('restore-file-input').value = '';
}
async function runRestore() {
if (!inspectedFile) {
notify('Inspect the file before restoring', 'error');
return;
}
if (!confirm('Restore from this backup? Current configuration will be overwritten.')) return;
const options = {
restore_config: document.getElementById('opt-config').checked,
restore_secrets: document.getElementById('opt-secrets').checked,
restore_wifi: document.getElementById('opt-wifi').checked,
restore_fonts: document.getElementById('opt-fonts').checked,
restore_plugin_uploads: document.getElementById('opt-plugin-uploads').checked,
reinstall_plugins: document.getElementById('opt-reinstall').checked,
};
const fd = new FormData();
fd.append('backup_file', inspectedFile);
fd.append('options', JSON.stringify(options));
const btn = document.getElementById('run-restore-btn');
btn.disabled = true;
btn.innerHTML = '<i class="fas fa-spinner fa-spin mr-2"></i>Restoring…';
try {
const res = await fetch('/api/v3/backup/restore', { method: 'POST', body: fd });
const payload = await res.json();
if (payload.status !== 'success') {
const msgs = (payload.data?.errors || []).join('; ');
throw new Error(payload.message || msgs || 'Restore had errors');
}
const data = payload.data || {};
const hasPartial = (data.plugins_failed || []).length > 0 || (data.errors || []).length > 0;
const result = document.getElementById('restore-result');
result.className = (hasPartial
? 'bg-yellow-50 border-yellow-200 text-yellow-800'
: 'bg-green-50 border-green-200 text-green-800') + ' border rounded-md p-4';
result.classList.remove('hidden');
result.innerHTML = `
<h3 class="font-medium mb-2">${hasPartial ? 'Restore complete with warnings' : 'Restore complete'}</h3>
<div><strong>Restored:</strong> ${(data.restored || []).map(escapeHtml).join(', ') || 'none'}</div>
<div><strong>Skipped:</strong> ${(data.skipped || []).map(escapeHtml).join(', ') || 'none'}</div>
<div><strong>Plugins installed:</strong> ${(data.plugins_installed || []).map(escapeHtml).join(', ') || 'none'}</div>
<div><strong>Plugins failed:</strong> ${(data.plugins_failed || []).map(p => escapeHtml(p.plugin_id + ' (' + p.error + ')')).join(', ') || 'none'}</div>
<div><strong>Errors:</strong> ${(data.errors || []).map(escapeHtml).join('; ') || 'none'}</div>
${((data.restored || []).length || (data.plugins_installed || []).length) ? '<p class="mt-2">Restart the display service to apply all changes.</p>' : ''}
`;
notify(hasPartial ? 'Restore complete with warnings' : 'Restore complete', hasPartial ? 'warning' : 'success');
} catch (err) {
notify('Restore failed: ' + err.message, 'error');
} finally {
btn.disabled = false;
btn.innerHTML = '<i class="fas fa-upload mr-2"></i>Restore now';
}
}
// Expose handlers to inline onclick attributes.
window.exportBackup = exportBackup;
window.loadBackupList = loadBackupList;
window.validateRestoreFile = validateRestoreFile;
window.clearRestore = clearRestore;
window.runRestore = runRestore;
// Clear inspection state whenever the user picks a new file.
document.getElementById('restore-file-input').addEventListener('change', function () {
inspectedFile = null;
document.getElementById('restore-preview').classList.add('hidden');
document.getElementById('restore-result').classList.add('hidden');
});
// Initial load.
loadPreview();
loadBackupList();
})();
</script>