diff --git a/src/backup_manager.py b/src/backup_manager.py new file mode 100644 index 00000000..9c9d15fe --- /dev/null +++ b/src/backup_manager.py @@ -0,0 +1,597 @@ +""" +User configuration backup and restore. + +Packages the user's LEDMatrix configuration, secrets, WiFi settings, +user-uploaded fonts, plugin image uploads, and installed-plugin manifest +into a single ``.zip`` that can be exported from one installation and +imported on a fresh install. + +This module is intentionally Flask-free so it can be unit-tested and +used from scripts. +""" + +from __future__ import annotations + +import io +import json +import logging +import os +import shutil +import socket +import tempfile +import zipfile +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +SCHEMA_VERSION = 1 + +# Filenames shipped with the LEDMatrix repository under ``assets/fonts/``. +# Anything present on disk but NOT in this set is treated as a user upload +# and included in backups. Keep this snapshot in sync with the repo — regenerate +# with:: +# +# ls assets/fonts/ +# +# Tests assert the set matches the checked-in fonts. +BUNDLED_FONTS: frozenset[str] = frozenset({ + "10x20.bdf", + "4x6.bdf", + "4x6-font.ttf", + "5by7.regular.ttf", + "5x7.bdf", + "5x8.bdf", + "6x9.bdf", + "6x10.bdf", + "6x12.bdf", + "6x13.bdf", + "6x13B.bdf", + "6x13O.bdf", + "7x13.bdf", + "7x13B.bdf", + "7x13O.bdf", + "7x14.bdf", + "7x14B.bdf", + "8x13.bdf", + "8x13B.bdf", + "8x13O.bdf", + "9x15.bdf", + "9x15B.bdf", + "9x18.bdf", + "9x18B.bdf", + "AUTHORS", + "bdf_font_guide", + "clR6x12.bdf", + "helvR12.bdf", + "ic8x8u.bdf", + "MatrixChunky8.bdf", + "MatrixChunky8X.bdf", + "MatrixLight6.bdf", + "MatrixLight6X.bdf", + "MatrixLight8X.bdf", + "PressStart2P-Regular.ttf", + "README", + "README.md", + "texgyre-27.bdf", + "tom-thumb.bdf", +}) + +# Relative paths inside the project that the backup knows how to round-trip. +_CONFIG_REL = Path("config/config.json") +_SECRETS_REL = Path("config/config_secrets.json") +_WIFI_REL = Path("config/wifi_config.json") +_FONTS_REL = Path("assets/fonts") +_PLUGIN_UPLOADS_REL = Path("assets/plugins") +_STATE_REL = Path("data/plugin_state.json") + +MANIFEST_NAME = "manifest.json" +PLUGINS_MANIFEST_NAME = "plugins.json" + +# Hard cap on the size of a single file we'll accept inside an uploaded ZIP +# to limit zip-bomb risk. 50 MB matches the existing plugin-image upload cap. +_MAX_MEMBER_BYTES = 50 * 1024 * 1024 +# Hard cap on the total uncompressed size of an uploaded ZIP. +_MAX_TOTAL_BYTES = 200 * 1024 * 1024 + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + + +@dataclass +class RestoreOptions: + """Which sections of a backup should be restored.""" + + restore_config: bool = True + restore_secrets: bool = True + restore_wifi: bool = True + restore_fonts: bool = True + restore_plugin_uploads: bool = True + reinstall_plugins: bool = True + + +@dataclass +class RestoreResult: + """Outcome of a restore operation.""" + + success: bool = False + restored: List[str] = field(default_factory=list) + skipped: List[str] = field(default_factory=list) + plugins_to_install: List[Dict[str, Any]] = field(default_factory=list) + plugins_installed: List[str] = field(default_factory=list) + plugins_failed: List[Dict[str, str]] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + manifest: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + +# --------------------------------------------------------------------------- +# Manifest helpers +# --------------------------------------------------------------------------- + + +def _ledmatrix_version(project_root: Path) -> str: + """Best-effort version string for the current install.""" + version_file = project_root / "VERSION" + if version_file.exists(): + try: + return version_file.read_text(encoding="utf-8").strip() or "unknown" + except OSError: + pass + head_file = project_root / ".git" / "HEAD" + if head_file.exists(): + try: + head = head_file.read_text(encoding="utf-8").strip() + if head.startswith("ref: "): + ref = head[5:] + ref_path = project_root / ".git" / ref + if ref_path.exists(): + return ref_path.read_text(encoding="utf-8").strip()[:12] or "unknown" + return head[:12] or "unknown" + except OSError: + pass + return "unknown" + + +def _build_manifest(contents: List[str], project_root: Path) -> Dict[str, Any]: + return { + "schema_version": SCHEMA_VERSION, + "created_at": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), + "ledmatrix_version": _ledmatrix_version(project_root), + "hostname": socket.gethostname(), + "contents": contents, + } + + +# --------------------------------------------------------------------------- +# Installed-plugin enumeration +# --------------------------------------------------------------------------- + + +def list_installed_plugins(project_root: Path) -> List[Dict[str, Any]]: + """ + Return a list of currently-installed plugins suitable for the backup + manifest. Each entry has ``plugin_id`` and ``version``. + + Reads ``data/plugin_state.json`` if present; otherwise walks the plugin + directory and reads each ``manifest.json``. + """ + plugins: Dict[str, Dict[str, Any]] = {} + + state_file = project_root / _STATE_REL + if state_file.exists(): + try: + with state_file.open("r", encoding="utf-8") as f: + state = json.load(f) + raw_plugins = state.get("plugins", {}) if isinstance(state, dict) else {} + if isinstance(raw_plugins, dict): + for plugin_id, info in raw_plugins.items(): + if not isinstance(info, dict): + continue + plugins[plugin_id] = { + "plugin_id": plugin_id, + "version": info.get("version") or "", + "enabled": bool(info.get("enabled", True)), + } + except (OSError, json.JSONDecodeError) as e: + logger.warning("Could not read plugin_state.json: %s", e) + + # Fall back to scanning plugin-repos/ for manifests. + plugins_root = project_root / "plugin-repos" + if plugins_root.exists(): + for entry in sorted(plugins_root.iterdir()): + if not entry.is_dir(): + continue + manifest = entry / "manifest.json" + if not manifest.exists(): + continue + try: + with manifest.open("r", encoding="utf-8") as f: + data = json.load(f) + except (OSError, json.JSONDecodeError): + continue + plugin_id = data.get("id") or entry.name + if plugin_id not in plugins: + plugins[plugin_id] = { + "plugin_id": plugin_id, + "version": data.get("version", ""), + "enabled": True, + } + + return sorted(plugins.values(), key=lambda p: p["plugin_id"]) + + +# --------------------------------------------------------------------------- +# Font filtering +# --------------------------------------------------------------------------- + + +def iter_user_fonts(project_root: Path) -> List[Path]: + """Return absolute paths to user-uploaded fonts (anything in + ``assets/fonts/`` not listed in :data:`BUNDLED_FONTS`).""" + fonts_dir = project_root / _FONTS_REL + if not fonts_dir.exists(): + return [] + user_fonts: List[Path] = [] + for entry in sorted(fonts_dir.iterdir()): + if entry.is_file() and entry.name not in BUNDLED_FONTS: + user_fonts.append(entry) + return user_fonts + + +def iter_plugin_uploads(project_root: Path) -> List[Path]: + """Return every file under ``assets/plugins/*/uploads/`` (recursive).""" + plugin_root = project_root / _PLUGIN_UPLOADS_REL + if not plugin_root.exists(): + return [] + out: List[Path] = [] + for plugin_dir in sorted(plugin_root.iterdir()): + if not plugin_dir.is_dir(): + continue + uploads = plugin_dir / "uploads" + if not uploads.exists(): + continue + for root, _dirs, files in os.walk(uploads): + for name in sorted(files): + out.append(Path(root) / name) + return out + + +# --------------------------------------------------------------------------- +# Export +# --------------------------------------------------------------------------- + + +def create_backup( + project_root: Path, + output_dir: Optional[Path] = None, +) -> Path: + """ + Build a backup ZIP and write it into ``output_dir`` (defaults to + ``/config/backups/exports/``). Returns the path to the + created file. + """ + project_root = Path(project_root).resolve() + if output_dir is None: + output_dir = project_root / "config" / "backups" / "exports" + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + hostname = socket.gethostname() or "ledmatrix" + safe_host = "".join(c for c in hostname if c.isalnum() or c in "-_") or "ledmatrix" + zip_name = f"ledmatrix-backup-{safe_host}-{timestamp}.zip" + zip_path = output_dir / zip_name + + contents: List[str] = [] + + # Build bundle in memory first, then atomically write to final path. + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zf: + # Config files. + if (project_root / _CONFIG_REL).exists(): + zf.write(project_root / _CONFIG_REL, _CONFIG_REL.as_posix()) + contents.append("config") + if (project_root / _SECRETS_REL).exists(): + zf.write(project_root / _SECRETS_REL, _SECRETS_REL.as_posix()) + contents.append("secrets") + if (project_root / _WIFI_REL).exists(): + zf.write(project_root / _WIFI_REL, _WIFI_REL.as_posix()) + contents.append("wifi") + + # User-uploaded fonts. + user_fonts = iter_user_fonts(project_root) + if user_fonts: + for font in user_fonts: + arcname = font.relative_to(project_root).as_posix() + zf.write(font, arcname) + contents.append("fonts") + + # Plugin uploads. + plugin_uploads = iter_plugin_uploads(project_root) + if plugin_uploads: + for upload in plugin_uploads: + arcname = upload.relative_to(project_root).as_posix() + zf.write(upload, arcname) + contents.append("plugin_uploads") + + # Installed plugins manifest. + plugins = list_installed_plugins(project_root) + if plugins: + zf.writestr( + PLUGINS_MANIFEST_NAME, + json.dumps(plugins, indent=2), + ) + contents.append("plugins") + + # Manifest goes last so that `contents` reflects what we actually wrote. + manifest = _build_manifest(contents, project_root) + zf.writestr(MANIFEST_NAME, json.dumps(manifest, indent=2)) + + # Write atomically. + tmp_path = zip_path.with_suffix(".zip.tmp") + tmp_path.write_bytes(buffer.getvalue()) + os.replace(tmp_path, zip_path) + logger.info("Created backup %s (%d bytes)", zip_path, zip_path.stat().st_size) + return zip_path + + +def preview_backup_contents(project_root: Path) -> Dict[str, Any]: + """Return a summary of what ``create_backup`` would include.""" + project_root = Path(project_root).resolve() + return { + "has_config": (project_root / _CONFIG_REL).exists(), + "has_secrets": (project_root / _SECRETS_REL).exists(), + "has_wifi": (project_root / _WIFI_REL).exists(), + "user_fonts": [p.name for p in iter_user_fonts(project_root)], + "plugin_uploads": len(iter_plugin_uploads(project_root)), + "plugins": list_installed_plugins(project_root), + } + + +# --------------------------------------------------------------------------- +# Validate +# --------------------------------------------------------------------------- + + +def _safe_extract_path(base_dir: Path, member_name: str) -> Optional[Path]: + """Resolve a ZIP member name against ``base_dir`` and reject anything + that escapes it. Returns the resolved absolute path, or ``None`` if the + name is unsafe.""" + # Reject absolute paths and Windows-style drives outright. + if member_name.startswith(("/", "\\")) or (len(member_name) >= 2 and member_name[1] == ":"): + return None + target = (base_dir / member_name).resolve() + try: + target.relative_to(base_dir.resolve()) + except ValueError: + return None + return target + + +def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]: + """ + Inspect a backup ZIP without extracting to disk. + + Returns ``(ok, error_message, manifest_dict)``. ``manifest_dict`` contains + the parsed manifest plus diagnostic fields: + - ``detected_contents``: list of section names present in the archive + - ``plugins``: parsed plugins.json if present + - ``total_uncompressed``: sum of uncompressed sizes + """ + zip_path = Path(zip_path) + if not zip_path.exists(): + return False, f"Backup file not found: {zip_path}", {} + + try: + with zipfile.ZipFile(zip_path, "r") as zf: + names = zf.namelist() + if MANIFEST_NAME not in names: + return False, "Backup is missing manifest.json", {} + + total = 0 + for info in zf.infolist(): + if info.file_size > _MAX_MEMBER_BYTES: + return False, f"Member {info.filename} is too large", {} + total += info.file_size + if total > _MAX_TOTAL_BYTES: + return False, "Backup exceeds maximum allowed size", {} + # Safety: reject members with unsafe paths up front. + if _safe_extract_path(Path("/tmp/_zip_check"), info.filename) is None: + return False, f"Unsafe path in backup: {info.filename}", {} + + try: + manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8") + manifest = json.loads(manifest_raw) + except (OSError, UnicodeDecodeError, json.JSONDecodeError) as e: + return False, f"Invalid manifest.json: {e}", {} + + if not isinstance(manifest, dict) or "schema_version" not in manifest: + return False, "Invalid manifest structure", {} + if manifest.get("schema_version") != SCHEMA_VERSION: + return ( + False, + f"Unsupported backup schema version: {manifest.get('schema_version')}", + {}, + ) + + detected: List[str] = [] + if _CONFIG_REL.as_posix() in names: + detected.append("config") + if _SECRETS_REL.as_posix() in names: + detected.append("secrets") + if _WIFI_REL.as_posix() in names: + detected.append("wifi") + if any(n.startswith(_FONTS_REL.as_posix() + "/") for n in names): + detected.append("fonts") + if any(n.startswith(_PLUGIN_UPLOADS_REL.as_posix() + "/") for n in names): + detected.append("plugin_uploads") + + plugins: List[Dict[str, Any]] = [] + if PLUGINS_MANIFEST_NAME in names: + try: + plugins = json.loads(zf.read(PLUGINS_MANIFEST_NAME).decode("utf-8")) + if not isinstance(plugins, list): + plugins = [] + else: + detected.append("plugins") + except (OSError, UnicodeDecodeError, json.JSONDecodeError): + plugins = [] + + result_manifest = dict(manifest) + result_manifest["detected_contents"] = detected + result_manifest["plugins"] = plugins + result_manifest["total_uncompressed"] = total + result_manifest["file_count"] = len(names) + return True, "", result_manifest + except zipfile.BadZipFile: + return False, "File is not a valid ZIP archive", {} + except OSError as e: + return False, f"Could not read backup: {e}", {} + + +# --------------------------------------------------------------------------- +# Restore +# --------------------------------------------------------------------------- + + +def _extract_zip_safe(zip_path: Path, dest_dir: Path) -> None: + """Extract ``zip_path`` into ``dest_dir`` rejecting any unsafe members.""" + with zipfile.ZipFile(zip_path, "r") as zf: + for info in zf.infolist(): + target = _safe_extract_path(dest_dir, info.filename) + if target is None: + raise ValueError(f"Unsafe path in backup: {info.filename}") + if info.is_dir(): + target.mkdir(parents=True, exist_ok=True) + continue + target.parent.mkdir(parents=True, exist_ok=True) + with zf.open(info, "r") as src, open(target, "wb") as dst: + shutil.copyfileobj(src, dst, length=64 * 1024) + + +def _copy_file(src: Path, dst: Path) -> None: + dst.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dst) + + +def restore_backup( + zip_path: Path, + project_root: Path, + options: Optional[RestoreOptions] = None, +) -> RestoreResult: + """ + Restore ``zip_path`` into ``project_root`` according to ``options``. + + Plugin reinstalls are NOT performed here — the caller is responsible for + walking ``result.plugins_to_install`` and calling the store manager. This + keeps this module Flask-free and side-effect free beyond the filesystem. + """ + if options is None: + options = RestoreOptions() + project_root = Path(project_root).resolve() + result = RestoreResult() + + ok, err, manifest = validate_backup(zip_path) + if not ok: + result.errors.append(err) + return result + result.manifest = manifest + + with tempfile.TemporaryDirectory(prefix="ledmatrix_restore_") as tmp: + tmp_dir = Path(tmp) + try: + _extract_zip_safe(Path(zip_path), tmp_dir) + except (ValueError, zipfile.BadZipFile, OSError) as e: + result.errors.append(f"Failed to extract backup: {e}") + return result + + # Main config. + if options.restore_config and (tmp_dir / _CONFIG_REL).exists(): + try: + _copy_file(tmp_dir / _CONFIG_REL, project_root / _CONFIG_REL) + result.restored.append("config") + except OSError as e: + result.errors.append(f"Failed to restore config.json: {e}") + elif (tmp_dir / _CONFIG_REL).exists(): + result.skipped.append("config") + + # Secrets. + if options.restore_secrets and (tmp_dir / _SECRETS_REL).exists(): + try: + _copy_file(tmp_dir / _SECRETS_REL, project_root / _SECRETS_REL) + result.restored.append("secrets") + except OSError as e: + result.errors.append(f"Failed to restore config_secrets.json: {e}") + elif (tmp_dir / _SECRETS_REL).exists(): + result.skipped.append("secrets") + + # WiFi. + if options.restore_wifi and (tmp_dir / _WIFI_REL).exists(): + try: + _copy_file(tmp_dir / _WIFI_REL, project_root / _WIFI_REL) + result.restored.append("wifi") + except OSError as e: + result.errors.append(f"Failed to restore wifi_config.json: {e}") + elif (tmp_dir / _WIFI_REL).exists(): + result.skipped.append("wifi") + + # User fonts — skip anything that collides with a bundled font. + tmp_fonts = tmp_dir / _FONTS_REL + if options.restore_fonts and tmp_fonts.exists(): + restored_count = 0 + for font in sorted(tmp_fonts.iterdir()): + if not font.is_file(): + continue + if font.name in BUNDLED_FONTS: + result.skipped.append(f"font:{font.name} (bundled)") + continue + try: + _copy_file(font, project_root / _FONTS_REL / font.name) + restored_count += 1 + except OSError as e: + result.errors.append(f"Failed to restore font {font.name}: {e}") + if restored_count: + result.restored.append(f"fonts ({restored_count})") + elif tmp_fonts.exists(): + result.skipped.append("fonts") + + # Plugin uploads. + tmp_uploads = tmp_dir / _PLUGIN_UPLOADS_REL + if options.restore_plugin_uploads and tmp_uploads.exists(): + count = 0 + for root, _dirs, files in os.walk(tmp_uploads): + for name in files: + src = Path(root) / name + rel = src.relative_to(tmp_dir) + try: + _copy_file(src, project_root / rel) + count += 1 + except OSError as e: + result.errors.append(f"Failed to restore {rel}: {e}") + if count: + result.restored.append(f"plugin_uploads ({count})") + elif tmp_uploads.exists(): + result.skipped.append("plugin_uploads") + + # Plugins list (for caller to reinstall). + if options.reinstall_plugins and (tmp_dir / PLUGINS_MANIFEST_NAME).exists(): + try: + with (tmp_dir / PLUGINS_MANIFEST_NAME).open("r", encoding="utf-8") as f: + plugins = json.load(f) + if isinstance(plugins, list): + result.plugins_to_install = [ + {"plugin_id": p.get("plugin_id"), "version": p.get("version", "")} + for p in plugins + if isinstance(p, dict) and p.get("plugin_id") + ] + except (OSError, json.JSONDecodeError) as e: + result.errors.append(f"Could not read plugins.json: {e}") + + result.success = not result.errors + return result diff --git a/test/test_backup_manager.py b/test/test_backup_manager.py new file mode 100644 index 00000000..e8287b14 --- /dev/null +++ b/test/test_backup_manager.py @@ -0,0 +1,283 @@ +"""Tests for src.backup_manager.""" + +from __future__ import annotations + +import io +import json +import zipfile +from pathlib import Path + +import pytest + +from src import backup_manager +from src.backup_manager import ( + BUNDLED_FONTS, + RestoreOptions, + create_backup, + list_installed_plugins, + preview_backup_contents, + restore_backup, + validate_backup, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _make_project(root: Path) -> Path: + """Build a minimal fake project tree under ``root``.""" + (root / "config").mkdir(parents=True) + (root / "config" / "config.json").write_text( + json.dumps({"web_ui": {"port": 8080}, "my-plugin": {"enabled": True, "favorites": ["A", "B"]}}), + encoding="utf-8", + ) + (root / "config" / "config_secrets.json").write_text( + json.dumps({"ledmatrix-weather": {"api_key": "SECRET"}}), + encoding="utf-8", + ) + (root / "config" / "wifi_config.json").write_text( + json.dumps({"ap_mode": {"ssid": "LEDMatrix"}}), + encoding="utf-8", + ) + + fonts = root / "assets" / "fonts" + fonts.mkdir(parents=True) + # One bundled font (should be excluded) and one user-uploaded font. + (fonts / "5x7.bdf").write_text("BUNDLED", encoding="utf-8") + (fonts / "my-custom-font.ttf").write_bytes(b"\x00\x01USER") + + uploads = root / "assets" / "plugins" / "static-image" / "uploads" + uploads.mkdir(parents=True) + (uploads / "image_1.png").write_bytes(b"\x89PNG\r\n\x1a\nfake") + (uploads / ".metadata.json").write_text(json.dumps({"a": 1}), encoding="utf-8") + + # plugin-repos for installed-plugin enumeration. + plugin_dir = root / "plugin-repos" / "my-plugin" + plugin_dir.mkdir(parents=True) + (plugin_dir / "manifest.json").write_text( + json.dumps({"id": "my-plugin", "version": "1.2.3"}), + encoding="utf-8", + ) + + # plugin_state.json + (root / "data").mkdir() + (root / "data" / "plugin_state.json").write_text( + json.dumps( + { + "plugins": { + "my-plugin": {"version": "1.2.3", "enabled": True}, + "other-plugin": {"version": "0.1.0", "enabled": False}, + } + } + ), + encoding="utf-8", + ) + return root + + +@pytest.fixture +def project(tmp_path: Path) -> Path: + return _make_project(tmp_path / "src_project") + + +@pytest.fixture +def empty_project(tmp_path: Path) -> Path: + root = tmp_path / "dst_project" + root.mkdir() + # Pre-seed only the bundled font to simulate a fresh install. + (root / "assets" / "fonts").mkdir(parents=True) + (root / "assets" / "fonts" / "5x7.bdf").write_text("BUNDLED", encoding="utf-8") + return root + + +# --------------------------------------------------------------------------- +# BUNDLED_FONTS sanity +# --------------------------------------------------------------------------- + + +def test_bundled_fonts_matches_repo() -> None: + """Every entry in BUNDLED_FONTS must exist on disk in assets/fonts/. + + The reverse direction is intentionally not checked: real installations + have user-uploaded fonts in the same directory, and they should be + treated as user data (not bundled). + """ + repo_fonts = Path(__file__).resolve().parent.parent / "assets" / "fonts" + if not repo_fonts.exists(): + pytest.skip("assets/fonts not present in test env") + on_disk = {p.name for p in repo_fonts.iterdir() if p.is_file()} + missing = set(BUNDLED_FONTS) - on_disk + assert not missing, f"BUNDLED_FONTS references files not in assets/fonts/: {missing}" + + +# --------------------------------------------------------------------------- +# Preview / enumeration +# --------------------------------------------------------------------------- + + +def test_list_installed_plugins(project: Path) -> None: + plugins = list_installed_plugins(project) + ids = [p["plugin_id"] for p in plugins] + assert "my-plugin" in ids + assert "other-plugin" in ids + my = next(p for p in plugins if p["plugin_id"] == "my-plugin") + assert my["version"] == "1.2.3" + + +def test_preview_backup_contents(project: Path) -> None: + preview = preview_backup_contents(project) + assert preview["has_config"] is True + assert preview["has_secrets"] is True + assert preview["has_wifi"] is True + assert preview["user_fonts"] == ["my-custom-font.ttf"] + assert preview["plugin_uploads"] >= 2 + assert any(p["plugin_id"] == "my-plugin" for p in preview["plugins"]) + + +# --------------------------------------------------------------------------- +# Export +# --------------------------------------------------------------------------- + + +def test_create_backup_contents(project: Path, tmp_path: Path) -> None: + out_dir = tmp_path / "exports" + zip_path = create_backup(project, output_dir=out_dir) + assert zip_path.exists() + assert zip_path.parent == out_dir + with zipfile.ZipFile(zip_path) as zf: + names = set(zf.namelist()) + assert "manifest.json" in names + assert "config/config.json" in names + assert "config/config_secrets.json" in names + assert "config/wifi_config.json" in names + assert "assets/fonts/my-custom-font.ttf" in names + # Bundled font must NOT be included. + assert "assets/fonts/5x7.bdf" not in names + assert "assets/plugins/static-image/uploads/image_1.png" in names + assert "plugins.json" in names + + +def test_create_backup_manifest(project: Path, tmp_path: Path) -> None: + zip_path = create_backup(project, output_dir=tmp_path / "exports") + with zipfile.ZipFile(zip_path) as zf: + manifest = json.loads(zf.read("manifest.json")) + assert manifest["schema_version"] == backup_manager.SCHEMA_VERSION + assert "created_at" in manifest + assert set(manifest["contents"]) >= {"config", "secrets", "wifi", "fonts", "plugin_uploads", "plugins"} + + +# --------------------------------------------------------------------------- +# Validate +# --------------------------------------------------------------------------- + + +def test_validate_backup_ok(project: Path, tmp_path: Path) -> None: + zip_path = create_backup(project, output_dir=tmp_path / "exports") + ok, err, manifest = validate_backup(zip_path) + assert ok, err + assert err == "" + assert "config" in manifest["detected_contents"] + assert "secrets" in manifest["detected_contents"] + assert any(p["plugin_id"] == "my-plugin" for p in manifest["plugins"]) + + +def test_validate_backup_missing_manifest(tmp_path: Path) -> None: + zip_path = tmp_path / "bad.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("config/config.json", "{}") + ok, err, _ = validate_backup(zip_path) + assert not ok + assert "manifest" in err.lower() + + +def test_validate_backup_bad_schema_version(tmp_path: Path) -> None: + zip_path = tmp_path / "bad.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("manifest.json", json.dumps({"schema_version": 999})) + ok, err, _ = validate_backup(zip_path) + assert not ok + assert "schema" in err.lower() + + +def test_validate_backup_rejects_zip_traversal(tmp_path: Path) -> None: + zip_path = tmp_path / "malicious.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("manifest.json", json.dumps({"schema_version": 1, "contents": []})) + zf.writestr("../../etc/passwd", "x") + ok, err, _ = validate_backup(zip_path) + assert not ok + assert "unsafe" in err.lower() + + +def test_validate_backup_not_a_zip(tmp_path: Path) -> None: + p = tmp_path / "nope.zip" + p.write_text("hello", encoding="utf-8") + ok, err, _ = validate_backup(p) + assert not ok + + +# --------------------------------------------------------------------------- +# Restore +# --------------------------------------------------------------------------- + + +def test_restore_roundtrip(project: Path, empty_project: Path, tmp_path: Path) -> None: + zip_path = create_backup(project, output_dir=tmp_path / "exports") + result = restore_backup(zip_path, empty_project, RestoreOptions()) + + assert result.success, result.errors + assert "config" in result.restored + assert "secrets" in result.restored + assert "wifi" in result.restored + + # Files exist with correct contents. + restored_config = json.loads((empty_project / "config" / "config.json").read_text()) + assert restored_config["my-plugin"]["favorites"] == ["A", "B"] + + restored_secrets = json.loads((empty_project / "config" / "config_secrets.json").read_text()) + assert restored_secrets["ledmatrix-weather"]["api_key"] == "SECRET" + + # User font restored, bundled font untouched. + assert (empty_project / "assets" / "fonts" / "my-custom-font.ttf").read_bytes() == b"\x00\x01USER" + assert (empty_project / "assets" / "fonts" / "5x7.bdf").read_text() == "BUNDLED" + + # Plugin uploads restored. + assert (empty_project / "assets" / "plugins" / "static-image" / "uploads" / "image_1.png").exists() + + # Plugins to install surfaced for the caller. + plugin_ids = {p["plugin_id"] for p in result.plugins_to_install} + assert "my-plugin" in plugin_ids + + +def test_restore_honors_options(project: Path, empty_project: Path, tmp_path: Path) -> None: + zip_path = create_backup(project, output_dir=tmp_path / "exports") + opts = RestoreOptions( + restore_config=True, + restore_secrets=False, + restore_wifi=False, + restore_fonts=False, + restore_plugin_uploads=False, + reinstall_plugins=False, + ) + result = restore_backup(zip_path, empty_project, opts) + assert result.success, result.errors + assert (empty_project / "config" / "config.json").exists() + assert not (empty_project / "config" / "config_secrets.json").exists() + assert not (empty_project / "config" / "wifi_config.json").exists() + assert not (empty_project / "assets" / "fonts" / "my-custom-font.ttf").exists() + assert result.plugins_to_install == [] + assert "secrets" in result.skipped + assert "wifi" in result.skipped + + +def test_restore_rejects_malicious_zip(empty_project: Path, tmp_path: Path) -> None: + zip_path = tmp_path / "bad.zip" + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("manifest.json", json.dumps({"schema_version": 1, "contents": []})) + zf.writestr("../escape.txt", "x") + result = restore_backup(zip_path, empty_project, RestoreOptions()) + # validate_backup catches it before extraction. + assert not result.success + assert any("unsafe" in e.lower() for e in result.errors) diff --git a/web_interface/blueprints/api_v3.py b/web_interface/blueprints/api_v3.py index 7fe9c7b3..5de684bc 100644 --- a/web_interface/blueprints/api_v3.py +++ b/web_interface/blueprints/api_v3.py @@ -1105,6 +1105,267 @@ def save_raw_secrets_config(): status_code=500 ) + +# --------------------------------------------------------------------------- +# Backup & Restore +# --------------------------------------------------------------------------- + +_BACKUP_FILENAME_RE = re.compile(r'^ledmatrix-backup-[A-Za-z0-9_-]+-\d{8}_\d{6}\.zip$') + + +def _backup_exports_dir() -> Path: + """Directory where user-downloadable backup ZIPs are stored.""" + d = PROJECT_ROOT / 'config' / 'backups' / 'exports' + d.mkdir(parents=True, exist_ok=True) + return d + + +def _is_safe_backup_filename(name: str) -> bool: + """Allow-list filter for backup filenames used in download/delete.""" + return bool(_BACKUP_FILENAME_RE.match(name)) + + +@api_v3.route('/backup/preview', methods=['GET']) +def backup_preview(): + """Return a summary of what a new backup would include.""" + try: + from src import backup_manager + preview = backup_manager.preview_backup_contents(PROJECT_ROOT) + return jsonify({'status': 'success', 'data': preview}) + except Exception: + logger.exception("[Backup] preview failed") + return jsonify({'status': 'error', 'message': 'Failed to compute backup preview'}), 500 + + +@api_v3.route('/backup/export', methods=['POST']) +def backup_export(): + """Create a new backup ZIP and return its filename.""" + try: + from src import backup_manager + zip_path = backup_manager.create_backup(PROJECT_ROOT, output_dir=_backup_exports_dir()) + return jsonify({ + 'status': 'success', + 'filename': zip_path.name, + 'size': zip_path.stat().st_size, + 'created_at': datetime.utcnow().isoformat() + 'Z', + }) + except Exception: + logger.exception("[Backup] export failed") + return jsonify({'status': 'error', 'message': 'Failed to create backup'}), 500 + + +@api_v3.route('/backup/list', methods=['GET']) +def backup_list(): + """List backup ZIPs currently stored on disk.""" + try: + exports = _backup_exports_dir() + entries = [] + for path in sorted(exports.glob('ledmatrix-backup-*.zip'), reverse=True): + if not _is_safe_backup_filename(path.name): + continue + stat = path.stat() + entries.append({ + 'filename': path.name, + 'size': stat.st_size, + 'created_at': datetime.utcfromtimestamp(stat.st_mtime).isoformat() + 'Z', + }) + return jsonify({'status': 'success', 'data': entries}) + except Exception: + logger.exception("[Backup] list failed") + return jsonify({'status': 'error', 'message': 'Failed to list backups'}), 500 + + +@api_v3.route('/backup/download/', methods=['GET']) +def backup_download(filename): + """Stream a previously-created backup ZIP to the browser.""" + try: + if not _is_safe_backup_filename(filename): + return jsonify({'status': 'error', 'message': 'Invalid backup filename'}), 400 + exports = _backup_exports_dir() + target = exports / filename + if not target.exists(): + return jsonify({'status': 'error', 'message': 'Backup not found'}), 404 + return send_from_directory( + str(exports), + filename, + as_attachment=True, + mimetype='application/zip', + ) + except Exception: + logger.exception("[Backup] download failed") + return jsonify({'status': 'error', 'message': 'Failed to download backup'}), 500 + + +@api_v3.route('/backup/', methods=['DELETE']) +def backup_delete(filename): + """Delete a stored backup ZIP.""" + try: + if not _is_safe_backup_filename(filename): + return jsonify({'status': 'error', 'message': 'Invalid backup filename'}), 400 + target = _backup_exports_dir() / filename + if not target.exists(): + return jsonify({'status': 'error', 'message': 'Backup not found'}), 404 + target.unlink() + return jsonify({'status': 'success', 'message': f'Deleted {filename}'}) + except Exception: + logger.exception("[Backup] delete failed") + return jsonify({'status': 'error', 'message': 'Failed to delete backup'}), 500 + + +def _save_uploaded_backup_to_temp() -> Tuple[Optional[Path], Optional[Tuple[Response, int]]]: + """Shared upload handler for validate/restore endpoints. Returns + ``(temp_path, None)`` on success or ``(None, error_response)`` on failure. + The caller is responsible for deleting the returned temp file.""" + import tempfile as _tempfile + if 'backup_file' not in request.files: + return None, (jsonify({'status': 'error', 'message': 'No backup file provided'}), 400) + upload = request.files['backup_file'] + if not upload.filename: + return None, (jsonify({'status': 'error', 'message': 'No file selected'}), 400) + is_valid, err = validate_file_upload( + upload.filename, + max_size_mb=200, + allowed_extensions=['.zip'], + ) + if not is_valid: + return None, (jsonify({'status': 'error', 'message': err}), 400) + fd, tmp_name = _tempfile.mkstemp(prefix='ledmatrix_upload_', suffix='.zip') + os.close(fd) + tmp_path = Path(tmp_name) + try: + upload.save(str(tmp_path)) + except Exception: + tmp_path.unlink(missing_ok=True) + logger.exception("[Backup] Failed to save uploaded backup") + return None, (jsonify({'status': 'error', 'message': 'Failed to read uploaded file'}), 500) + return tmp_path, None + + +@api_v3.route('/backup/validate', methods=['POST']) +def backup_validate(): + """Inspect an uploaded backup without applying it.""" + tmp_path, err = _save_uploaded_backup_to_temp() + if err is not None: + return err + try: + from src import backup_manager + ok, error, manifest = backup_manager.validate_backup(tmp_path) + if not ok: + return jsonify({'status': 'error', 'message': error}), 400 + return jsonify({'status': 'success', 'data': manifest}) + except Exception: + logger.exception("[Backup] validate failed") + return jsonify({'status': 'error', 'message': 'Failed to validate backup'}), 500 + finally: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + + +@api_v3.route('/backup/restore', methods=['POST']) +def backup_restore(): + """ + Restore an uploaded backup into the running installation. + + The request is multipart/form-data with: + - ``backup_file``: the ZIP upload + - ``options``: JSON string with RestoreOptions fields (all boolean) + """ + tmp_path, err = _save_uploaded_backup_to_temp() + if err is not None: + return err + try: + from src import backup_manager + + # Parse options (all optional; default is "restore everything"). + raw_opts = request.form.get('options') or '{}' + try: + opts_dict = json.loads(raw_opts) + except json.JSONDecodeError: + return jsonify({'status': 'error', 'message': 'Invalid options JSON'}), 400 + + opts = backup_manager.RestoreOptions( + restore_config=bool(opts_dict.get('restore_config', True)), + restore_secrets=bool(opts_dict.get('restore_secrets', True)), + restore_wifi=bool(opts_dict.get('restore_wifi', True)), + restore_fonts=bool(opts_dict.get('restore_fonts', True)), + restore_plugin_uploads=bool(opts_dict.get('restore_plugin_uploads', True)), + reinstall_plugins=bool(opts_dict.get('reinstall_plugins', True)), + ) + + # Snapshot current config through the atomic manager so the pre-restore + # state is recoverable via the existing rollback_config() path. + if api_v3.config_manager and opts.restore_config: + try: + current = api_v3.config_manager.load_config() + _save_config_atomic(api_v3.config_manager, current, create_backup=True) + except Exception: + logger.warning("[Backup] Pre-restore snapshot failed (continuing)", exc_info=True) + + result = backup_manager.restore_backup(tmp_path, PROJECT_ROOT, opts) + + # Reinstall plugins via the store manager, one at a time. + if opts.reinstall_plugins and api_v3.plugin_store_manager and result.plugins_to_install: + installed_names = set() + if api_v3.plugin_manager: + try: + existing = api_v3.plugin_manager.get_all_plugin_info() or [] + installed_names = {p.get('id') for p in existing if p.get('id')} + except Exception: + installed_names = set() + for entry in result.plugins_to_install: + plugin_id = entry.get('plugin_id') + if not plugin_id: + continue + if plugin_id in installed_names: + result.plugins_installed.append(plugin_id) + continue + try: + ok = api_v3.plugin_store_manager.install_plugin(plugin_id) + if ok: + result.plugins_installed.append(plugin_id) + else: + result.plugins_failed.append({'plugin_id': plugin_id, 'error': 'install returned False'}) + except Exception as install_err: + logger.exception("[Backup] plugin reinstall failed for %s", plugin_id) + result.plugins_failed.append({'plugin_id': plugin_id, 'error': str(install_err)}) + + # Clear font catalog cache so restored fonts show up. + if 'fonts' in ' '.join(result.restored): + try: + from web_interface.cache import delete_cached + delete_cached('fonts_catalog') + except Exception: + pass + + # Reload config_manager state so the UI picks up the new values + # without a full service restart. + if api_v3.config_manager and opts.restore_config: + try: + api_v3.config_manager.load_config(force_reload=True) + except TypeError: + try: + api_v3.config_manager.load_config() + except Exception: + pass + except Exception: + logger.warning("[Backup] Could not reload config after restore", exc_info=True) + + return jsonify({ + 'status': 'success' if result.success else 'partial', + 'data': result.to_dict(), + }) + except Exception: + logger.exception("[Backup] restore failed") + return jsonify({'status': 'error', 'message': 'Failed to restore backup'}), 500 + finally: + try: + tmp_path.unlink(missing_ok=True) + except OSError: + pass + + @api_v3.route('/system/status', methods=['GET']) def get_system_status(): """Get system status""" diff --git a/web_interface/blueprints/pages_v3.py b/web_interface/blueprints/pages_v3.py index 209d1470..a01f827a 100644 --- a/web_interface/blueprints/pages_v3.py +++ b/web_interface/blueprints/pages_v3.py @@ -76,6 +76,8 @@ def load_partial(partial_name): return _load_logs_partial() elif partial_name == 'raw-json': return _load_raw_json_partial() + elif partial_name == 'backup-restore': + return _load_backup_restore_partial() elif partial_name == 'wifi': return _load_wifi_partial() elif partial_name == 'cache': @@ -296,6 +298,13 @@ def _load_raw_json_partial(): except Exception as e: return f"Error: {str(e)}", 500 +def _load_backup_restore_partial(): + """Load backup & restore partial.""" + try: + return render_template('v3/partials/backup_restore.html') + except Exception as e: + return f"Error: {str(e)}", 500 + @pages_v3.route('/setup') def captive_setup(): """Lightweight captive portal setup page — self-contained, no frameworks.""" diff --git a/web_interface/templates/v3/base.html b/web_interface/templates/v3/base.html index e3da43d3..88409c84 100644 --- a/web_interface/templates/v3/base.html +++ b/web_interface/templates/v3/base.html @@ -968,6 +968,11 @@ class="nav-tab"> Config Editor + + + +
+
Loading summary…
+
+ + + +
+
+

Restore from backup

+

+ Upload a backup ZIP exported from this or another LEDMatrix install. + You'll see a summary before anything is written to disk. +

+
+ +
+
+ + +
+ +
+ +
+ + + + +
+
+ + +
+
+
+
+

Backup history

+

Previously exported backups stored on this device.

+
+ +
+
+
Loading…
+
+ + + +