Fix backup API 404s, hardware status 500, and HTMX loading race

- Add all backup API routes to api_v3.py: preview, list, export,
  validate, restore (with plugin reinstall), download, delete
- Fix PermissionError on /hardware/status: return graceful 200 instead
  of 500 when the status file is owned by a different user; also fix
  root cause by writing the file world-readable (0o644) in display_manager
- Fix HTMX race: dispatch htmx:ready window event from HTMX onload
  callback; loadTabContent now waits for that event instead of
  immediately falling back to direct fetch (eliminating the
  "HTMX not available" console warning on initial load)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Chuck
2026-05-23 16:16:31 -04:00
parent ddc53ff1e0
commit 745ba8101e
3 changed files with 188 additions and 12 deletions

View File

@@ -190,7 +190,7 @@ class DisplayManager:
json.dump(_hw_status, _f) json.dump(_hw_status, _f)
_f.flush() _f.flush()
os.fsync(_f.fileno()) os.fsync(_f.fileno())
os.chmod(_tmp_path, 0o600) os.chmod(_tmp_path, 0o644)
os.replace(_tmp_path, _status_path) os.replace(_tmp_path, _status_path)
except Exception: except Exception:
try: try:

View File

@@ -1601,9 +1601,12 @@ def get_hardware_status():
return jsonify({"status": "success", "data": hw_data}) return jsonify({"status": "success", "data": hw_data})
except FileNotFoundError: except FileNotFoundError:
return jsonify({"status": "success", "data": {"ok": None, "error": "Display service not yet started"}}) return jsonify({"status": "success", "data": {"ok": None, "error": "Display service not yet started"}})
except (json.JSONDecodeError, PermissionError): except PermissionError:
logger.error("Failed to read hardware status file", exc_info=True) logger.warning("Permission denied reading hardware status file; display service may be running as a different user")
return jsonify({"status": "error", "message": "Unable to read hardware status"}), 500 return jsonify({"status": "success", "data": {"ok": None, "error": "Hardware status temporarily unavailable"}})
except json.JSONDecodeError:
logger.error("Failed to parse hardware status file", exc_info=True)
return jsonify({"status": "success", "data": {"ok": None, "error": "Hardware status file corrupted"}})
except Exception: except Exception:
logger.error("Unexpected error reading hardware status", exc_info=True) logger.error("Unexpected error reading hardware status", exc_info=True)
return jsonify({"status": "error", "message": "Unable to read hardware status"}), 500 return jsonify({"status": "error", "message": "Unable to read hardware status"}), 500
@@ -7064,4 +7067,178 @@ def clear_old_errors():
message="Failed to clear old errors", message="Failed to clear old errors",
details=str(e), details=str(e),
status_code=500 status_code=500
) )
# ---------------------------------------------------------------------------
# Backup / Restore
# ---------------------------------------------------------------------------
_BACKUP_EXPORT_DIR = PROJECT_ROOT / "config" / "backups" / "exports"
def _safe_backup_path(filename: str) -> Path:
"""Resolve a filename to an absolute path inside the export dir,
rejecting any traversal attempts. Returns None if unsafe."""
if not filename or '/' in filename or '\\' in filename or filename.startswith('.'):
return None
path = (_BACKUP_EXPORT_DIR / filename).resolve()
try:
path.relative_to(_BACKUP_EXPORT_DIR.resolve())
except ValueError:
return None
return path
@api_v3.route('/backup/preview', methods=['GET'])
def backup_preview():
"""Return a summary of what a new backup would include."""
try:
from src.backup_manager import preview_backup_contents
data = preview_backup_contents(PROJECT_ROOT)
return jsonify({'status': 'success', 'data': data})
except Exception as e:
logger.error("backup_preview failed: %s", e, exc_info=True)
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/backup/list', methods=['GET'])
def backup_list():
"""List backup ZIPs stored in the export directory."""
try:
_BACKUP_EXPORT_DIR.mkdir(parents=True, exist_ok=True)
entries = []
for p in sorted(_BACKUP_EXPORT_DIR.iterdir(), key=lambda x: x.stat().st_mtime, reverse=True):
if not p.is_file() or p.suffix != '.zip':
continue
st = p.stat()
entries.append({
'filename': p.name,
'size': st.st_size,
'created_at': datetime.fromtimestamp(st.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
})
return jsonify({'status': 'success', 'data': entries})
except Exception as e:
logger.error("backup_list failed: %s", e, exc_info=True)
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/backup/export', methods=['POST'])
def backup_export():
"""Create a new backup ZIP and return its filename."""
try:
from src.backup_manager import create_backup
zip_path = create_backup(PROJECT_ROOT, output_dir=_BACKUP_EXPORT_DIR)
return jsonify({'status': 'success', 'filename': zip_path.name})
except Exception as e:
logger.error("backup_export failed: %s", e, exc_info=True)
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/backup/validate', methods=['POST'])
def backup_validate():
"""Validate an uploaded backup ZIP and return its manifest."""
try:
from src.backup_manager import validate_backup
if 'backup_file' not in request.files:
return jsonify({'status': 'error', 'message': 'No backup_file in request'}), 400
f = request.files['backup_file']
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
tmp_path = tmp.name
f.save(tmp_path)
try:
ok, err_msg, manifest = validate_backup(Path(tmp_path))
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
if not ok:
return jsonify({'status': 'error', 'message': err_msg}), 400
return jsonify({'status': 'success', 'data': manifest})
except Exception as e:
logger.error("backup_validate failed: %s", e, exc_info=True)
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/backup/restore', methods=['POST'])
def backup_restore():
"""Restore a backup ZIP with optional RestoreOptions."""
try:
from src.backup_manager import restore_backup, RestoreOptions
if 'backup_file' not in request.files:
return jsonify({'status': 'error', 'message': 'No backup_file in request'}), 400
f = request.files['backup_file']
options_raw = request.form.get('options', '{}')
try:
opts_dict = json.loads(options_raw)
except json.JSONDecodeError:
opts_dict = {}
options = RestoreOptions(
restore_config=bool(opts_dict.get('restore_config', True)),
restore_secrets=bool(opts_dict.get('restore_secrets', True)),
restore_wifi=bool(opts_dict.get('restore_wifi', True)),
restore_fonts=bool(opts_dict.get('restore_fonts', True)),
restore_plugin_uploads=bool(opts_dict.get('restore_plugin_uploads', True)),
reinstall_plugins=bool(opts_dict.get('reinstall_plugins', True)),
)
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp:
tmp_path = tmp.name
f.save(tmp_path)
try:
result = restore_backup(Path(tmp_path), PROJECT_ROOT, options)
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
# Reinstall plugins if requested and store manager available
if options.reinstall_plugins and result.plugins_to_install:
psm = getattr(api_v3, 'plugin_store_manager', None) or plugin_store_manager
for plug in result.plugins_to_install:
pid = plug.get('plugin_id')
if not pid:
continue
try:
if psm and hasattr(psm, 'install_plugin'):
ok = psm.install_plugin(pid)
if ok:
result.plugins_installed.append(pid)
else:
result.plugins_failed.append({'plugin_id': pid, 'error': 'install_plugin returned False'})
else:
result.plugins_failed.append({'plugin_id': pid, 'error': 'Store manager unavailable'})
except Exception as pe:
result.plugins_failed.append({'plugin_id': pid, 'error': str(pe)})
data = result.to_dict()
if not result.success:
return jsonify({'status': 'error', 'message': 'Restore had errors', 'data': data}), 500
return jsonify({'status': 'success', 'data': data})
except Exception as e:
logger.error("backup_restore failed: %s", e, exc_info=True)
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/backup/download/<path:filename>', methods=['GET'])
def backup_download(filename):
"""Stream a backup ZIP to the browser."""
from flask import send_file
path = _safe_backup_path(filename)
if path is None or not path.exists():
return jsonify({'status': 'error', 'message': 'Backup not found'}), 404
return send_file(path, as_attachment=True, download_name=path.name)
@api_v3.route('/backup/<path:filename>', methods=['DELETE'])
def backup_delete(filename):
"""Delete a stored backup ZIP."""
path = _safe_backup_path(filename)
if path is None or not path.exists():
return jsonify({'status': 'error', 'message': 'Backup not found'}), 404
try:
path.unlink()
return jsonify({'status': 'success'})
except OSError as e:
return jsonify({'status': 'error', 'message': str(e)}), 500

View File

@@ -152,6 +152,7 @@
} }
} else { } else {
console.log('HTMX loaded successfully'); console.log('HTMX loaded successfully');
window.dispatchEvent(new Event('htmx:ready'));
// Load extensions after core loads // Load extensions after core loads
loadScript(sseSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/sse.js' : '/static/v3/js/htmx-sse.js'); loadScript(sseSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/sse.js' : '/static/v3/js/htmx-sse.js');
loadScript(jsonEncSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/json-enc.js' : '/static/v3/js/htmx-json-enc.js'); loadScript(jsonEncSrc, isAPMode ? 'https://unpkg.com/htmx.org/dist/ext/json-enc.js' : '/static/v3/js/htmx-json-enc.js');
@@ -1836,13 +1837,11 @@
htmx.trigger(contentEl, 'revealed'); htmx.trigger(contentEl, 'revealed');
} }
} else { } else {
// HTMX not available, use direct fetch // HTMX is still loading asynchronously — retry once it signals ready
console.warn('HTMX not available, using direct fetch for tab:', tab); const self = this;
if (tab === 'overview' && typeof loadOverviewDirect === 'function') { window.addEventListener('htmx:ready', function retry() {
loadOverviewDirect(); self.loadTabContent(tab);
} else if (tab === 'wifi' && typeof loadWifiDirect === 'function') { }, { once: true });
loadWifiDirect();
}
} }
}, },