14 Commits

Author SHA1 Message Date
Chuck
b2524e918d feat(web): add Tools tab and row address type setting
Adds a Tools/Utilities tab to the web interface with one-click
maintenance buttons that previously required SSH:
- Git status panel (branch, dirty state, recent commits)
- Pull latest (rebase) and force reset to origin/main
- Reinstall base requirements (pip, with output)
- Reinstall per-plugin requirements (pass/fail per plugin)
- Clear __pycache__ directories
- Quick-access restart for display and web services

Also exposes the hzeller row_address_type option (0–4) in the
Display settings tab. The backend already read this value from
config; the UI, API field list, and validation were missing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-16 09:41:33 -04:00
Chuck
eb6687ceca fix(install): correctly detect already-installed dateutil/websocket-client
Address remaining coderabbitai findings on PR #369:
- check_package_installed() did __import__(package_name) directly, but
  python-dateutil and websocket-client import as dateutil/websocket. Both
  always failed the "already installed" check and were reinstalled on
  every run. Add an IMPORT_NAME_MAP for the mismatched names.
- _run() still read the entire temp file into memory before slicing the
  tail. Stream it line-by-line into a deque(maxlen=ERROR_TAIL_LINES)
  instead so memory use stays bounded for very chatty commands.
2026-06-11 16:21:56 -04:00
Chuck
60b64144a5 fix(install): suppress remaining Codacy subprocess false-positive
Codacy's Semgrep-based check still flagged the cmd-built subprocess.run
call as "without a static string" even with the Bandit nosec applied.
Add a nosemgrep marker alongside it - cmd is always a hardcoded
apt/pip argument list, never user input.
2026-06-11 13:01:33 -04:00
Chuck
5a1a095e16 fix(install): bound subprocess output and dedupe apt update in dependency installer
Address coderabbitai review on PR #369:
- _run() now streams combined stdout/stderr to a temp file and returns
  only the last ERROR_TAIL_LINES lines, instead of buffering full
  output in memory (Codacy also flagged the previous capture_output
  call as a subprocess-without-static-string security issue; the new
  call is annotated as safe since cmd is built from hardcoded args).
- `apt update` now runs once in main() instead of once per package
  needing an apt fallback.
2026-06-11 12:57:30 -04:00
Chuck
6aec2d9b78 feat(install): harden first-time install against common Pi failure modes
- wait_for_apt_lock: apt_update/apt_install now wait (up to 3min) for
  unattended-upgrades to release the dpkg lock instead of failing
  outright with "Command failed after 3 attempts" right after first boot.
- check_disk_space: new pre-flight check (Step 1) so a full SD card fails
  fast with a clear message instead of a cryptic mid-build error.
- Step 6: wrap rpi-rgb-led-matrix git clone/submodule operations in retry
  for resilience to transient network issues.
- Step 6: capture `pip install .` build output and print the last 50
  lines on failure, so the actual cmake/compiler error is visible instead
  of just "Failed to install rpi-rgb-led-matrix Python package".
2026-06-11 12:57:30 -04:00
Chuck
6c9a3510ee feat(install): surface root cause of web dependency install failures
install_dependencies_apt.py previously reported only which packages
failed, not why - the actual apt/pip error was discarded (apt) or
could scroll out of the on_error log tail (pip), leaving "Step 7:
Install web interface dependencies (line 915)" as the only visible
detail.

Capture command output for each install attempt and print a compact
DEPENDENCY INSTALLATION FAILURES summary with the last lines of error
output per package. Also run the installer with `python3 -u` for
real-time, correctly-ordered logging, and widen the on_error tail from
50 to 100 lines so the summary isn't cut off.
2026-06-11 12:57:29 -04:00
Chuck
6ea9862c14 fix(install): don't let outer ERR trap mask first_time_install.sh failures
set +e alone doesn't suppress bash's ERR trap, so any non-zero exit from
first_time_install.sh inside the one-shot installer immediately triggered
the outer on_error handler with a generic "Main installation, line 370"
message — before the script could report the real exit code or point to
logs/. Suspend the trap for that block so the existing if/else handling
runs instead.
2026-06-11 12:57:29 -04:00
Chuck
cf28a8c0d5 fix(display): restore early-continue guard for mid-loop mode changes (#367)
When the display loop breaks early because current_display_mode changed
(on-demand activation, live priority, etc.), it would fall through to the
"honour minimum duration" sleep for the *previous* mode — blocking for up
to that mode's full display_duration (default 30s) without polling
on-demand requests or re-checking the mode. New modes could sit unrendered
for up to 30s, or get clobbered by a queued stop request before ever
displaying.

This guard was added in #298 to fix #196 (live priority not interrupting
long display durations) and was accidentally dropped in #330 as collateral
damage of an unrelated time.monotonic() -> time.time() cleanup in the same
diff hunk. Restoring it fixes both the original #196 regression and a new
symptom found via the on-air MQTT plugin, where ON/OFF toggles could be
delayed by up to 30s or missed entirely depending on timing within the
previous mode's display cycle.

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-10 10:16:30 -04:00
Chuck
a06682981c fix(web): allow up to 24 panels in chain length config (#366)
Raises the Chain Length input's max from 8 to 24 to support longer
LED panel strings.

Co-authored-by: Chuck <chuck@example.com>
2026-06-09 21:31:07 -04:00
Ron Pierce
bc027c921d fix: check_plugin.py honors per-plugin test/harness.json (#365)
check_one() always compares the render against committed golden images, but
the CLI never loaded the plugin's test/harness.json — so the deterministic
settings the goldens were generated with (config, mock data, frozen time,
sizes) weren't applied. For any time/data-dependent plugin this means the CLI
(and the plugins-repo CI workflow that calls it) renders live data and the
golden drifts on every run, even with no real regression. The pytest matrix
path already reads harness.json via load_harness_spec; the CLI now does too.

- check_one loads load_harness_spec(plugin_dir) and layers it under explicit
  CLI flags: config = schema defaults < harness.json < --config; sizes =
  --sizes > LEDMATRIX_TEST_SIZES env > harness.json > default sample;
  mock_data/freeze_time/skip_update fall back to harness.json when not given
  on the CLI.
- parse_sizes returns None (not DEFAULT_TEST_SIZES) when --sizes is omitted,
  so the env/harness.json/default fallback chain in resolve_test_sizes applies.
- Regression tests: harness.json supplies render settings, and CLI flags
  override it. Use a temp fixture plugin so they run in core CI (no plugins).

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 12:52:33 -04:00
Ron Pierce
e0bd7088fa fix: make requirements-test.txt installable alongside requirements.txt (#364)
requirements.txt already pins pytest>=9.0.3,<10 (from #331), but
requirements-test.txt re-pinned pytest>=7.4,<9. The two ranges are
disjoint, so `pip install -r requirements.txt -r requirements-test.txt`
fails with ResolutionImpossible — breaking the core test workflow and the
plugins-repo safety workflow that installs both files.

pytest, pytest-cov, pytest-mock, and jsonschema are all already pinned
with major-version caps in requirements.txt, so drop them from
requirements-test.txt and keep only freezegun (the one test dep
requirements.txt doesn't provide).

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 10:32:05 -04:00
Ron Pierce
313e35a98f Add cross-size/cross-screen plugin safety harness (#361)
* feat(testing): add cross-size/cross-screen plugin safety harness

Render every plugin across all supported matrix sizes (64x32, 128x32,
128x64, 256x32) and every declared screen, failing on crashes, content
drawn past the panel edge, or visual drift vs committed golden images.

- BoundsCheckingDisplayManager: oversized-canvas overflow detection
- harness.py: multi-size/multi-screen render engine + golden compare
- scripts/check_plugin.py: CLI (functional+bounds, --out-dir, --update-golden,
  --freeze-time); render_plugin.py refactored onto shared loading helpers
- test/plugins/test_harness.py + test_plugin_matrix.py (parametrized,
  honors per-plugin test/harness.json; skips when no plugins present)
- MockCacheManager.cache_dir so cache-dir-using plugins load headlessly
- .github/workflows/test.yml + docs/plugin-safety-harness.md

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(testing): address PR review feedback on plugin safety harness

- check_plugin: friendly error for non-numeric --sizes; reject non-object
  --config / --mock-data JSON; sanitize plugin mode before using as a
  filename; stop --update-golden from masking crash/overflow failures
- bounds_display_manager: pad the canvas out to the largest supported panel
  (not a fixed 16px) so far-overshoot coordinates are caught, not clipped
- harness: merge config_schema defaults inside render_plugin_matrix; surface
  update() failures as a non-fatal warning + result field instead of a debug
  log; sanitize mode in golden_path
- loading: fail fast when harness.json references a missing mock_data fixture
- mocks: clean up the per-instance temp cache dir via weakref.finalize
- test_plugin_matrix: add a discovery guard that fails when
  LEDMATRIX_REQUIRE_PLUGINS=1 but none found (still skips locally); type hints
- bound test deps with upper version pins for deterministic CI

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* feat(testing): render plugins across arbitrary panel sizes, not a fixed list

Addresses maintainer feedback that there is no canonical set of supported
panel sizes — a build can be any size/configuration (square, 2x2, 4x4, 8x2,
long strips, tall stacks).

- sizes.py: SUPPORTED_SIZES -> DEFAULT_TEST_SIZES (back-compat alias kept),
  reframed as a representative SAMPLE of real panel-grid arrangements rather
  than an authoritative list; add parse_size_token / coerce_sizes /
  resolve_test_sizes helpers
- sizes are now fully overridable: LEDMATRIX_TEST_SIZES env (global, e.g. test
  on your exact hardware) > per-plugin harness.json "sizes" > default sample;
  CLI --sizes unchanged
- bounds_display_manager: pad the canvas to the largest panel IN THE CURRENT
  RUN (via overflow_extent) instead of a hardcoded max, so cross-size overflow
  detection scales to whatever sizes a run uses
- harness: compute per-run extent and thread it into the bounds manager
- tests: arbitrary-shape + size-parsing/precedence coverage
- docs: rewrite "Supported sizes" -> "Sizes: a sample, not a fixed list"

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(testing): fail the harness on non-connectivity update() errors

Addresses the remaining review thread: recording every update() exception as a
non-fatal warning still let a real update() regression pass green as long as
display() survived. Now update() failures are classified — a tolerated set of
connectivity errors (ConnectionError/TimeoutError/socket/ssl/urllib/http/
requests) is recorded non-fatally (expected with no network in CI), while any
other exception is treated as a genuine bug and fails that render.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* ci(security): pin actions to SHAs and disable checkout credential persistence

Addresses the CodeRabbit/zizmor workflow-hardening finding: pin
actions/checkout and actions/setup-python to full commit SHAs and set
persist-credentials: false on checkout to reduce supply-chain and
token-exposure risk.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(testing): validate positive sizes; narrow requests import except

Two review findings:
- sizes.py: parse_size_token / coerce_sizes now reject non-positive
  dimensions (0x32, -64x32) with a clear message instead of passing invalid
  sizes downstream (CodeRabbit).
- harness.py: the optional `requests` import now catches ImportError
  specifically and logs instead of `except Exception: pass`, clearing the
  Codacy medium "Try, Except, Pass" (harness.py L52) and Ruff S110/BLE001.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-05 14:32:52 -04:00
Ron Pierce
122e6d6863 fix(web): use fully-qualified .service unit names for privileged systemctl (#360)
The web interface runs headless, so every privileged systemctl call must be
covered by a NOPASSWD rule in /etc/sudoers.d/ledmatrix_web. The sudo command
matches the command line exactly, but the code called 'systemctl start
ledmatrix' while configure_web_sudo.sh grants 'systemctl start
ledmatrix.service'. The rule never matched, so start/stop/enable/disable/
restart fell back to a password prompt and failed with 'a terminal is
required to read the password'.

Align all privileged systemctl calls on the fully-qualified unit names the
sudoers grants use. Add a regression test that cross-checks api_v3.py calls
against the grants in configure_web_sudo.sh.

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 15:17:00 -04:00
Chuck
d488e8a2ad fix(api): don't coerce all-digit strings to int when schema type is string (#363)
* docs(core): add module and class docstrings to the 5 undocumented core files

Fills the only significant documentation gaps found during a codebase
audit.  All other core files (plugin_system/, logging_config.py, etc.)
already have complete module, class, and function docstrings.

Files changed (documentation only — zero logic changes):

  display_controller.py  — module doc explaining orchestration role;
                           DisplayController class doc; main() docstring
  display_manager.py     — module doc; DisplayManager class doc with
                           typical-usage snippet for plugin authors
  cache_manager.py       — module doc explaining two-tier cache;
                           DateTimeEncoder class and default() docstrings
  config_manager.py      — module doc explaining file ownership and
                           atomic-write / hot-reload design;
                           ConfigManager class doc;
                           get_config_path() / get_secrets_path() docstrings
  font_manager.py        — module doc (class docstring already existed)

Also noted (but not changed to avoid behaviour risk):
  display_manager.py and font_manager.py use logging.getLogger() directly
  instead of the project's get_logger() wrapper.  display_manager.py also
  calls setLevel(logging.INFO) immediately after, which would be lost if
  switched to get_logger().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* perf(display_controller): three targeted hot-path optimizations

Opt 1 — cache inspect.signature() per plugin_id
  inspect.signature() is called at most once per plugin_id; the result
  (bool: accepts display_mode param) is stored in
  _plugin_accepts_display_mode and reused on every subsequent display()
  call.  Eliminates all reflection from the display path at runtime.
  Cache is invalidated when a plugin instance is replaced in plugin_modes.

Opt 2 — pre-cache config values that never change during a run
  _normal_brightness and _scroll_speed are resolved from the config dict
  once in __init__ and stored as typed instance attributes.
  - Removes 2+ chained dict.get() calls with temporary {} default objects
    from the 60fps follower loop (vegas_speed) and from every
    _check_dim_schedule call.
  - current_brightness init now uses _normal_brightness directly.

Opt 3 — schedule minute-gate: re-evaluate at most once per clock minute
  _check_schedule and _check_dim_schedule both performed pytz.timezone(),
  datetime.now(), strftime(), and datetime.strptime() on every outer loop
  call.  Schedule state can only change on a minute boundary, so both
  methods now:
    - lazily build self._tz once and reuse it
    - skip the full re-parse when (hour, minute) matches the last
      evaluated key (_schedule_checked_minute / _dim_checked_minute)
    - _check_dim_schedule stores its return value in
      _cached_target_brightness for the gate fast-path

Tests: 23 new tests in test_display_controller_optimizations.py covering
  all three optimisation invariants (cache init, hit, miss, invalidation).
  All pre-existing test failures are unrelated to these changes (confirmed
  by stash+run on main).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve 22 pre-existing test failures across 6 groups

Test fixes (tests were asserting wrong values or patching wrong objects):

  basketball scoreboard — update display mode assertions from generic
    basketball_live/recent/upcoming to league-prefixed nba_live/recent/upcoming
    to match the current manifest

  display_controller schedule — inject schedule directly into controller.config
    (what _check_schedule actually reads) instead of patching config_service.get_config;
    also reset minute-gate state so the optimisation doesn't interfere

  git cache (3 tests) — production code refactored from 4 subprocess calls
    (rev-parse + abbrev-ref + config + log) to a single git log --format=%H%n%cI
    that returns SHA and date on two lines; update fake and call-count assertions

  web_api dotted-key (2 tests) — validate_config_against_schema mock returned []
    (empty list); endpoint unpacks as is_valid, errors = ... causing ValueError;
    fix: return_value = (True, [])

  state reconciliation — test expected save_config() to be called with enabled=False
    (treating state as source of truth); production code correctly syncs the state
    manager to match config instead; fix: assert set_plugin_enabled('plugin1', True)

Production fixes (production code had bugs or missing features):

  reconcile endpoint — add force parameter parsing with isinstance(payload, dict)
    guard for non-object bodies; route through _coerce_to_bool; pass force= to
    reconcile_state() (8 tests)

  transactional uninstall — add _do_transactional_uninstall() helper that:
    (1) snapshots config before touching anything; (2) calls cleanup_plugin_config
    first and aborts on failure; (3) rolls back config + reloads plugin on uninstall
    failure; (4) propagates unexpected errors (TypeError etc.) instead of swallowing
    them (6 tests)

  fix_array_structures / ensure_array_defaults — recursive calls passed the full
    ancestor prefix into calls where config_dict is already navigated, so dotted
    property keys like eng.1 caused parent_parts.split('.') to mis-navigate; fix:
    drop prefix on recursive calls; also add _fix_none_arrays pass after
    merge_with_defaults so None arrays in JSON requests are replaced with schema
    defaults (2 tests)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* perf: four targeted optimizations across the display pipeline

Opt 1 — cache data-fetch interval per plugin (plugin_manager.py)
  _get_plugin_update_interval fell back to config_manager.get_config()
  (a full dict copy) when the manifest lacked an interval.  Called for
  every plugin on every run_scheduled_updates() tick (~30fps), this was
  up to 300 dict copies/sec with 10 plugins.
  Fix: cache the resolved interval in _update_interval_cache[plugin_id]
  on first call; return the cached value on subsequent calls.  Cache is
  cleared on load_plugin and unload_plugin.

Opt 2 — demote noisy per-cycle INFO logs to DEBUG (display_controller.py)
  Four logger.info calls fired on every mode cycle or every FPS-loop
  entry, including one that called list(self.plugin_modes.keys())
  unconditionally (allocating a list every outer loop iteration).
  - "Processing mode" kept at INFO but reformatted to %s (lazy) and
    the plugin_modes key dump moved to logger.debug
  - "Attempting/Got cycle duration" → logger.debug
  - "Entering high/normal FPS loop" → logger.debug
  Mode name at INFO is preserved for black-screen troubleshooting.

Opt 3 — use Image.frombytes instead of Image.fromarray in scroll hot path
  (scroll_helper.py)
  Image.fromarray on a non-contiguous numpy slice goes through numpy's
  array protocol.  Image.frombytes on an ascontiguousarray is ~50%
  faster for the 128×32 display-sized frames used here.  Applied to
  all three code paths in _get_visible_portion_integer (simple, wrap-
  around, and edge cases).

Opt 5 — cache get_text_width per (text, font) pair (display_manager.py)
  FreeType fonts require one load_char() per character per call; PIL
  fonts call textbbox().  Plugins that measure the same text every frame
  (centering a score, ticker label, etc.) were re-measuring from scratch
  on every display() call.
  Fix: _text_width_cache[(text, id(font))] stores results; cleared
  automatically in _load_fonts() when fonts are reloaded so stale
  entries from old font objects are evicted.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(scroll_helper): fix edge-case bug exposed by frombytes switch

The previous commit replaced Image.fromarray with Image.frombytes in
_get_visible_portion_integer.  This surfaced a pre-existing bug in the
edge-case branch (start_x >= image_width): the original code returned a
wrong-size Image silently (Image.fromarray accepts a too-short array);
Image.frombytes raises ValueError instead.

Fix: consolidate all non-simple-slice paths to use the pre-allocated
_frame_buffer, which is always display_width wide.  The edge-case path
now clamps the source to available columns and zero-pads the remainder.

Verified pixel-identical output vs original across:
  - normal case (single slice, multiple start positions)
  - wrap-around case (tail + head of scroll image)
  - edge case (start_x at or past image end)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address CodeRabbit review comments on PR #358

1. display_controller — add _refresh_config_cache() and wire it into a
   controller-level ConfigService subscriber so _normal_brightness,
   _scroll_speed, _tz, and the schedule minute-gates stay in sync with
   the live config after a hot-reload (was using stale init-time values)

2. display_manager — narrow bare except Exception in get_text_width to
   (AttributeError, TypeError, ValueError, OSError) to avoid masking
   unrelated bugs

3. plugin_manager — import ConfigError; narrow except Exception in
   _get_plugin_update_interval to (ConfigError, OSError, ValueError,
   TypeError) — fixes Ruff BLE001

4. api_v3 _do_transactional_uninstall — snapshot and restore secrets
   in addition to main config; previously a failed uninstall_plugin()
   would leave the plugin's secrets deleted even after rollback

5. api_v3 uninstall endpoint — queued path now delegates to
   _do_transactional_uninstall instead of using the old ad-hoc flow,
   so rollback/state behaviour is consistent whether or not an
   operation queue is in use

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(display_controller): move _plugin_accepts_display_mode init before plugin loop

Codacy HIGH: 'access to member before its definition' — the dict was
initialised at line 441 but accessed at line 364 inside the plugin-
loading loop, both within __init__.

Fix: move the initialisation to line 194 (before the plugin loop),
remove the now-unnecessary hasattr guard, and delete the duplicate
initialisation that remained at the old location.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(api): don't coerce all-digit strings to int when schema type is string

_parse_form_value_with_schema had a fallback that tried int()/float() on
any string value that wasn't already handled. Fields like station_id
(type: "string", value: "8726607") were silently converted to integers,
causing jsonschema validation to reject them with "expected string, got int".

Guard the fallback with a check that skips it when the schema property
explicitly declares type: "string".

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-04 15:01:42 -04:00
23 changed files with 2213 additions and 152 deletions

33
.github/workflows/test.yml vendored Normal file
View File

@@ -0,0 +1,33 @@
name: Tests
on:
pull_request:
push:
branches: [main]
jobs:
plugin-safety:
name: Plugin safety harness + unit tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
persist-credentials: false
- uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
with:
python-version: "3.12"
cache: pip
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt -r requirements-test.txt
pip install RGBMatrixEmulator
- name: Run harness + visual rendering tests
run: |
pytest --no-cov \
test/plugins/test_harness.py \
test/plugins/test_visual_rendering.py \
test/plugins/test_plugin_matrix.py

View File

@@ -0,0 +1,136 @@
# Plugin Safety Harness
Renders a plugin across **every declared screen (mode)** and **a spread of
matrix sizes**, and fails if any combination crashes, draws past the panel edge,
or — for plugins that ship golden images — drifts visually. The goal: change a
plugin without breaking a size or screen you didn't think to test.
## Sizes: a sample, not a fixed list
There is **no fixed set of supported panel sizes** — an RGB matrix build can be
any width/height and configuration (square, rectangle, 2×2, 4×4, 8×2, long
strips, tall stacks). Plugins are expected to read dimensions dynamically
(`self.display_manager.matrix.width/height`) and lay themselves out
accordingly, so a hardcoded coordinate or unscaled font shows up as a failure
here.
The harness therefore renders against a **representative sample** that spans the
axes of variation (`DEFAULT_TEST_SIZES` in `src/plugin_system/testing/sizes.py`),
not an authoritative list:
Each module is 64×32; entries are real panel-grid arrangements (cols × rows):
| Size | Grid | Why it's in the sample |
|---------|------|--------------------------------------------|
| 64×32 | 1×1 | single panel — tightest common rectangle |
| 128×32 | 2×1 | the baseline most plugins are tuned for |
| 64×64 | 1×2 | stacked — tall-narrow centering |
| 128×64 | 2×2 | block — icon scaling / vertical centering |
| 256×32 | 4×1 | long strip — wide horizontal layout |
| 128×96 | 2×3 | tall — vertical overflow |
| 256×128 | 4×4 | large block — both dimensions big at once |
**Override the sizes entirely** to test your actual hardware (or any shape):
```bash
# CLI — one-off:
python scripts/check_plugin.py --plugin clock-simple --sizes 8x16,64x64,256x32
# pytest — force every plugin onto your panel(s):
LEDMATRIX_TEST_SIZES="8x16,128x128" pytest test/plugins/test_plugin_matrix.py
# Per-plugin — declare the shapes a plugin targets in its test/harness.json:
# { "sizes": [[8, 16], [64, 64]] }
```
Precedence: `LEDMATRIX_TEST_SIZES` env (global) → per-plugin `harness.json`
`sizes` → the default sample. Bounds checking adapts to whatever sizes a run
uses — the backing canvas is padded out to the **largest** panel in the run, so
a coordinate meant for a big build is still caught when rendering a small one.
## Quick start
```bash
# Functional + bounds check across all sizes/screens:
python scripts/check_plugin.py --plugin clock-simple
# Every discovered plugin:
python scripts/check_plugin.py --all
# Dump PNGs to eyeball each size/screen:
python scripts/check_plugin.py --plugin ledmatrix-weather --out-dir /tmp/preview
```
Exit code is non-zero if any `(plugin, size, screen)` fails. Plugins are
discovered in `plugin-repos/` and `plugins/` (override with `--plugin-dir`).
## What it checks (Phase 1 — always on)
1. **Loads** and builds its mode list.
2. **Renders every screen** at every size without raising. `update()` may fail
(no network in CI) and is tolerated; a crash in `display()` is a failure —
`display()` must handle the no-data state.
3. **Bounds**: nothing is drawn past the right/bottom edge. Implemented by
`BoundsCheckingDisplayManager`, which backs the declared panel with an
oversized canvas and flags any pixels that land in the margin. (Left/top
overflow at negative coordinates and BDF text are not flagged — golden images
cover those.)
## Golden images (Phase 2 — opt-in per plugin)
A plugin opts in by committing reference PNGs and (usually) a small harness spec:
```
plugins/<id>/test/harness.json # how to render deterministically
plugins/<id>/test/fixtures/mock.json # optional cached data
plugins/<id>/test/golden/<WxH>/<mode>.png
```
`test/harness.json` keys (all optional):
```json
{
"config": { "timezone": "UTC" },
"mock_data": "fixtures/mock.json",
"freeze_time": "2025-08-01 15:25:00",
"skip_update": false,
"sizes": [[128, 32], [128, 64]]
}
```
Generate / refresh goldens after an intentional visual change, then review the
diff before committing:
```bash
python scripts/check_plugin.py --plugin clock-simple --update-golden \
--config '{"timezone":"UTC"}' --freeze-time "2025-08-01 15:25:00"
```
Comparison is exact by default (`compare_images` in `harness.py` accepts a
tolerance for known anti-aliasing noise). Determinism requires a pinned Pillow
and the bundled fonts — keep both stable when regenerating goldens.
## Tests & CI
- `test/plugins/test_harness.py` — unit tests for bounds detection, image
comparison, and mode enumeration (run anywhere).
- `test/plugins/test_plugin_matrix.py` — parametrized over discovered plugins ×
sizes × screens; honors each plugin's `test/harness.json` and goldens. Skips
when no plugins are present (e.g. a fresh core checkout); set
`LEDMATRIX_REQUIRE_PLUGINS=1` in a pipeline where plugins must be present to
turn an empty discovery into a hard failure instead. Point it at the monorepo
with `LEDMATRIX_PLUGINS_DIR=/path/to/ledmatrix-plugins/plugins`.
- `.github/workflows/test.yml` — runs the harness + visual tests on every PR.
The plugin monorepo has its own `Plugin Safety` workflow that runs this harness
against changed plugins on every PR.
## Developer workflow
1. Change the plugin on a branch.
2. `python scripts/check_plugin.py --plugin <id> --out-dir /tmp/preview` and
eyeball the PNGs.
3. Intentional visual change? `--update-golden`, review diffs, commit goldens.
4. (Monorepo) bump `manifest.json` version and let the pre-commit hook sync
`plugins.json`.
5. Push — CI re-runs the harness across all sizes and gates the PR.

View File

@@ -15,8 +15,8 @@ on_error() {
echo "✗ An error occurred during: $CURRENT_STEP (line $line_no, exit $exit_code)" >&2
if [ -n "${LOG_FILE:-}" ]; then
echo "See the log for details: $LOG_FILE" >&2
echo "-- Last 50 lines from log --" >&2
tail -n 50 "$LOG_FILE" >&2 || true
echo "-- Last 100 lines from log --" >&2
tail -n 100 "$LOG_FILE" >&2 || true
fi
echo "\nCommon fixes:" >&2
echo "- Ensure the Pi is online (try: ping -c1 8.8.8.8)." >&2
@@ -202,8 +202,33 @@ retry() {
done
}
apt_update() { retry apt update; }
apt_install() { retry apt install -y "$@"; }
# Wait for another apt/dpkg process (commonly unattended-upgrades running
# shortly after first boot) to release its lock before we try apt ourselves.
# Without this, apt_update/apt_install can fail outright in the first couple
# minutes after a fresh Pi OS boot with a generic "Command failed after 3
# attempts" error.
wait_for_apt_lock() {
command -v flock >/dev/null 2>&1 || return 0
local lock_file="/var/lib/dpkg/lock-frontend"
local max_wait=180
local waited=0
local printed=0
while ! flock -n "$lock_file" -c true 2>/dev/null; do
if [ "$printed" -eq 0 ]; then
echo "⚠ Waiting for another apt/dpkg process to finish (e.g. unattended-upgrades on first boot)..."
printed=1
fi
if [ "$waited" -ge "$max_wait" ]; then
echo "⚠ Still waiting after ${max_wait}s; proceeding anyway."
break
fi
sleep 5
waited=$((waited+5))
done
}
apt_update() { wait_for_apt_lock; retry apt update; }
apt_install() { wait_for_apt_lock; retry apt install -y "$@"; }
apt_remove() { apt-get remove -y "$@" || true; }
check_network() {
@@ -222,6 +247,22 @@ check_network() {
exit 1
}
check_disk_space() {
command -v df >/dev/null 2>&1 || return 0
local available_mb
available_mb=$(df -m "$PROJECT_ROOT_DIR" | awk 'NR==2{print $4}')
available_mb=${available_mb:-0}
if [ "$available_mb" -lt 500 ]; then
echo "✗ ERROR: Insufficient disk space: ${available_mb}MB available (need at least 500MB)"
echo " Free up space first, e.g.: sudo apt clean && sudo apt autoremove"
exit 1
elif [ "$available_mb" -lt 1024 ]; then
echo "⚠ Limited disk space: ${available_mb}MB available (recommend at least 1GB for the rpi-rgb-led-matrix build in Step 6)"
else
echo "✓ Disk space sufficient: ${available_mb}MB available"
fi
}
echo ""
echo "This script will perform the following steps:"
echo "1. Install system dependencies"
@@ -271,8 +312,9 @@ CURRENT_STEP="Install system dependencies"
echo "Step 1: Installing system dependencies..."
echo "----------------------------------------"
# Ensure network is available before APT operations
# Pre-flight checks before APT operations
check_network
check_disk_space
# Update package list
apt_update
@@ -822,14 +864,14 @@ else
# Try to initialize submodule if .gitmodules exists
if [ -f "$PROJECT_ROOT_DIR/.gitmodules" ] && grep -q "rpi-rgb-led-matrix" "$PROJECT_ROOT_DIR/.gitmodules"; then
echo "Initializing rpi-rgb-led-matrix submodule..."
if ! git submodule update --init --recursive rpi-rgb-led-matrix-master 2>&1; then
if ! retry git submodule update --init --recursive rpi-rgb-led-matrix-master; then
echo "⚠ Submodule init failed, cloning directly from GitHub..."
git clone https://github.com/hzeller/rpi-rgb-led-matrix.git rpi-rgb-led-matrix-master
retry git clone https://github.com/hzeller/rpi-rgb-led-matrix.git rpi-rgb-led-matrix-master
fi
else
# Fallback: clone directly if submodule not configured
echo "Submodule not configured, cloning directly from GitHub..."
git clone https://github.com/hzeller/rpi-rgb-led-matrix.git rpi-rgb-led-matrix-master
retry git clone https://github.com/hzeller/rpi-rgb-led-matrix.git rpi-rgb-led-matrix-master
fi
fi
@@ -841,23 +883,34 @@ else
cd "$PROJECT_ROOT_DIR"
rm -rf rpi-rgb-led-matrix-master
if [ -f "$PROJECT_ROOT_DIR/.gitmodules" ] && grep -q "rpi-rgb-led-matrix" "$PROJECT_ROOT_DIR/.gitmodules"; then
git submodule update --init --recursive rpi-rgb-led-matrix-master
retry git submodule update --init --recursive rpi-rgb-led-matrix-master
else
git clone https://github.com/hzeller/rpi-rgb-led-matrix.git rpi-rgb-led-matrix-master
retry git clone https://github.com/hzeller/rpi-rgb-led-matrix.git rpi-rgb-led-matrix-master
fi
fi
pushd "$PROJECT_ROOT_DIR/rpi-rgb-led-matrix-master" >/dev/null
echo "Installing rpi-rgb-led-matrix Python package (scikit-build-core + cmake)..."
echo " Build deps required: python-dev-is-python3 cmake"
echo " This compiles C++ — may take 2-5 minutes on Pi 4/5..."
if ! python3 -m pip install --break-system-packages .; then
BUILD_OUTPUT=$(mktemp)
BUILD_SUCCESS=false
if python3 -m pip install --break-system-packages . > "$BUILD_OUTPUT" 2>&1; then
BUILD_SUCCESS=true
fi
cat "$BUILD_OUTPUT" >> "$LOG_FILE"
if [ "$BUILD_SUCCESS" != true ]; then
echo "✗ Failed to install rpi-rgb-led-matrix Python package"
echo " Ensure build tools are installed:"
echo " sudo apt install -y python-dev-is-python3 cmake build-essential"
echo ""
echo "-- Last 50 lines of build output --"
tail -n 50 "$BUILD_OUTPUT"
rm -f "$BUILD_OUTPUT"
popd >/dev/null
exit 1
fi
rm -f "$BUILD_OUTPUT"
popd >/dev/null
else
echo "✗ rpi-rgb-led-matrix-master directory not found at $PROJECT_ROOT_DIR"
@@ -912,7 +965,9 @@ else
# Try to install dependencies using the smart installer if available
if [ -f "$PROJECT_ROOT_DIR/scripts/install_dependencies_apt.py" ]; then
echo "Using smart dependency installer..."
python3 "$PROJECT_ROOT_DIR/scripts/install_dependencies_apt.py"
# -u: unbuffered stdout/stderr so output is captured in $LOG_FILE in
# real time and in order relative to this script's own echo statements
python3 -u "$PROJECT_ROOT_DIR/scripts/install_dependencies_apt.py"
else
echo "Using pip to install dependencies..."
if [ -f "$PROJECT_ROOT_DIR/requirements_web_v2.txt" ]; then

9
requirements-test.txt Normal file
View File

@@ -0,0 +1,9 @@
# Test-only dependencies for the plugin safety harness and pytest suite.
# Install alongside requirements.txt: pip install -r requirements.txt -r requirements-test.txt
#
# pytest, pytest-cov, pytest-mock, and jsonschema are already pinned (with
# major-version caps) in requirements.txt, so they are intentionally NOT
# repeated here — re-pinning pytest to <9 collided with requirements.txt's
# pytest>=9.0.3,<10 and made the two files impossible to install together.
# Only declare what requirements.txt doesn't already provide.
freezegun>=1.2,<2 # deterministic time for golden-image tests

232
scripts/check_plugin.py Normal file
View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python3
"""
Plugin safety checker.
Renders a plugin across every declared screen (mode) and every supported matrix
size, and fails if any screen crashes, overflows the panel, or (for plugins with
committed golden images) drifts visually.
Usage:
# Functional + bounds check across all sizes/modes:
python scripts/check_plugin.py --plugin clock-simple
# Every discovered plugin:
python scripts/check_plugin.py --all
# Dump PNGs for each size/mode so you can eyeball them:
python scripts/check_plugin.py --plugin ledmatrix-weather --out-dir /tmp/preview
# Refresh committed golden images after an intentional visual change:
python scripts/check_plugin.py --plugin clock-simple --update-golden \
--mock-data plugins/clock-simple/test/fixtures/mock.json
Exit code is non-zero if any (plugin, size, mode) fails.
"""
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional
PROJECT_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
os.environ['EMULATOR'] = 'true'
from src.logging_config import get_logger # noqa: E402
from src.plugin_system.testing.loading import ( # noqa: E402
find_plugin_dir, load_config_defaults, load_harness_spec,
)
from src.plugin_system.testing.harness import ( # noqa: E402
RenderResult, render_plugin_matrix, compare_to_goldens, write_goldens,
)
from src.plugin_system.testing.sizes import ( # noqa: E402
parse_size_token, resolve_test_sizes, safe_mode_filename, size_label,
)
logger = get_logger("[Check Plugin]")
DEFAULT_SEARCH_DIRS = [
str(PROJECT_ROOT / 'plugins'),
str(PROJECT_ROOT / 'plugin-repos'),
]
def discover_plugins(search_dirs: List[str]) -> List[str]:
"""All plugin ids found across the search dirs (dirs containing manifest.json)."""
found = []
for d in search_dirs:
base = Path(d)
if not base.exists():
continue
for child in sorted(base.iterdir()):
if (child / 'manifest.json').exists() and child.name not in found:
found.append(child.name)
return found
def parse_sizes(spec: Optional[str]):
if not spec:
return None
sizes = []
for token in spec.split(','):
if not token.strip():
continue
try:
sizes.append(parse_size_token(token))
except ValueError as exc:
raise SystemExit(str(exc)) from exc
return sizes
def check_one(plugin_id: str, search_dirs: List[str], sizes, mock_data: Dict,
config: Dict, run_update: bool, out_dir: Optional[Path],
update_golden: bool, golden_dir_override: Optional[Path],
freeze_time: Optional[str]) -> List[RenderResult]:
plugin_dir = find_plugin_dir(plugin_id, search_dirs)
if not plugin_dir:
logger.error("Plugin '%s' not found in: %s", plugin_id, search_dirs)
return [RenderResult(plugin_id, 0, 0, "<not-found>", error="plugin directory not found")]
# Per-plugin test/harness.json holds the deterministic settings the committed
# goldens were generated with (config, mock data, frozen time, sizes). Load
# them so the CLI/CI render reproduces the golden the same way the pytest
# matrix path does; explicit CLI flags still override the file.
spec = load_harness_spec(plugin_dir)
# config_schema defaults (real-install behavior), then harness.json config,
# then CLI --config — most specific wins.
full_config = {"enabled": True}
full_config.update(load_config_defaults(plugin_dir))
full_config.update(spec.get("config", {}))
full_config.update(config)
# Precedence: CLI flag > LEDMATRIX_TEST_SIZES env > harness.json > default.
effective_sizes = sizes if sizes else resolve_test_sizes(spec.get("sizes"))
# CLI value wins when provided, else fall back to the harness.json setting.
effective_mock_data = mock_data or spec.get("mock_data_contents", {})
effective_freeze = freeze_time or spec.get("freeze_time")
effective_run_update = run_update and not spec.get("skip_update", False)
results = render_plugin_matrix(
plugin_id=plugin_id, plugin_dir=plugin_dir, config=full_config,
mock_data=effective_mock_data, sizes=effective_sizes,
run_update=effective_run_update, freeze_time=effective_freeze,
)
golden_dir = golden_dir_override or (plugin_dir / 'test' / 'golden')
if update_golden:
written = write_goldens(results, golden_dir)
logger.info("Wrote %d golden image(s) for %s to %s", written, plugin_id, golden_dir)
else:
compare_to_goldens(results, golden_dir)
if out_dir:
for r in results:
if r.image is None:
continue
dest = out_dir / plugin_id / size_label(r.width, r.height)
dest.mkdir(parents=True, exist_ok=True)
r.image.save(dest / f"{safe_mode_filename(r.mode)}.png", format="PNG")
return results
def print_report(all_results: Dict[str, List[RenderResult]]) -> bool:
"""Print a per-plugin grid. Returns True if everything passed."""
everything_ok = True
for plugin_id, results in all_results.items():
print(f"\n=== {plugin_id} ===")
for r in results:
if r.ok:
status = "PASS"
detail = ""
if r.golden_checked:
detail = " (golden ✓)"
if r.update_error is not None:
detail += f" (update warn: {r.update_error})"
else:
everything_ok = False
if r.error is not None:
status, detail = "FAIL", f" error={r.error}"
elif r.overflow is not None:
status, detail = "FAIL", f" overflow bbox={r.overflow}"
elif r.golden_ok is False:
status = "FAIL"
detail = f" golden drift: {r.golden_diff_pixels}px (max Δ={r.golden_max_delta})"
else:
status, detail = "FAIL", ""
print(f" [{status}] {r.size_label:>7} {r.mode}{detail}")
print()
return everything_ok
def main() -> int:
parser = argparse.ArgumentParser(description="Check a plugin renders safely across sizes & screens")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--plugin', '-p', help='Plugin id to check')
group.add_argument('--all', action='store_true', help='Check every discovered plugin')
parser.add_argument('--plugin-dir', '-d', default=None, help='Directory to search for plugins')
parser.add_argument('--sizes', default=None, help='Comma-separated WxH list (default: all supported)')
parser.add_argument('--config', '-c', default='{}', help='Plugin config overrides as JSON')
parser.add_argument('--mock-data', '-m', default=None, help='Path to JSON file with mock cache data')
parser.add_argument('--out-dir', '-o', default=None, help='Also dump rendered PNGs here')
parser.add_argument('--skip-update', action='store_true', help='Skip calling update()')
parser.add_argument('--update-golden', action='store_true', help='Write/refresh golden images')
parser.add_argument('--golden-dir', default=None, help='Override golden dir (default: <plugin>/test/golden)')
parser.add_argument('--freeze-time', default=None,
help='Freeze wall clock, e.g. "2025-08-01 15:25:00" (for time-dependent plugins)')
args = parser.parse_args()
search_dirs = [args.plugin_dir] if args.plugin_dir else DEFAULT_SEARCH_DIRS
sizes = parse_sizes(args.sizes)
try:
config = json.loads(args.config)
except json.JSONDecodeError as e:
logger.error("Invalid --config JSON: %s", e)
return 2
if not isinstance(config, dict):
logger.error("--config must be a JSON object, got %s", type(config).__name__)
return 2
mock_data = {}
if args.mock_data:
mock_path = Path(args.mock_data)
if not mock_path.exists():
logger.error("Mock data file not found: %s", args.mock_data)
return 2
with open(mock_path) as f:
mock_data = json.load(f)
if not isinstance(mock_data, dict):
logger.error("--mock-data must be a JSON object (key -> cache value), got %s",
type(mock_data).__name__)
return 2
plugin_ids = discover_plugins(search_dirs) if args.all else [args.plugin]
if not plugin_ids:
logger.error("No plugins found in: %s", search_dirs)
return 2
out_dir = Path(args.out_dir) if args.out_dir else None
golden_dir_override = Path(args.golden_dir) if args.golden_dir else None
all_results: Dict[str, List[RenderResult]] = {}
for plugin_id in plugin_ids:
all_results[plugin_id] = check_one(
plugin_id=plugin_id, search_dirs=search_dirs, sizes=sizes,
mock_data=mock_data, config=config, run_update=not args.skip_update,
out_dir=out_dir, update_golden=args.update_golden,
golden_dir_override=golden_dir_override, freeze_time=args.freeze_time,
)
# When refreshing goldens we skip drift comparison, but a crash or overflow
# still means the plugin is broken — never let --update-golden mask that.
ok = print_report(all_results)
return 0 if ok else 1
if __name__ == '__main__':
sys.exit(main())

View File

@@ -340,9 +340,14 @@ main() {
echo ""
# Execute with proper error handling and non-interactive mode
# Temporarily disable errexit to capture exit code instead of exiting immediately
# Temporarily disable errexit AND the ERR trap to capture exit code instead of
# exiting immediately. `set +e` alone does not suppress the ERR trap, so without
# `trap '' ERR` a non-zero exit from first_time_install.sh would trigger on_error
# here with the generic "Main installation" message instead of the detailed
# if/else handling below.
set +e
trap '' ERR
# Check /tmp permissions - only fix if actually wrong (common in automated scenarios)
# When running manually, /tmp usually has correct permissions (1777)
TMP_PERMS=$(stat -c '%a' /tmp 2>/dev/null || echo "unknown")
@@ -370,6 +375,7 @@ main() {
sudo -E env TMPDIR=/tmp LEDMATRIX_ASSUME_YES=1 bash ./first_time_install.sh -y </dev/null
fi
INSTALL_EXIT_CODE=$?
trap 'on_error $LINENO' ERR # Re-enable ERR trap
set -e # Re-enable errexit
if [ $INSTALL_EXIT_CODE -eq 0 ]; then

View File

@@ -6,46 +6,67 @@ then falls back to pip with --break-system-packages
import subprocess
import sys
import tempfile
import warnings
from collections import deque
from pathlib import Path
# How many trailing lines of a failed command's output to keep for the
# end-of-run failure summary. Keeps the root cause near the end of the log,
# which is where first_time_install.sh's error handler tails from.
ERROR_TAIL_LINES = 15
def _run(cmd):
"""Run a command, streaming combined stdout/stderr to a temp file.
Returns (success, output) instead of raising, so callers can report
*why* a command failed rather than just that it failed. `output` is
bounded to the last ERROR_TAIL_LINES lines so failures from very
chatty commands (e.g. pip build logs) don't get buffered in memory.
"""
with tempfile.TemporaryFile(mode='w+b') as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT) # nosec B603 B607 - hardcoded apt/pip args # nosemgrep
f.seek(0)
# Stream line-by-line so only the last ERROR_TAIL_LINES are ever held
# in memory, regardless of how much output the command produced.
tail = deque(
(line.decode('utf-8', errors='replace').rstrip('\n') for line in f),
maxlen=ERROR_TAIL_LINES,
)
return result.returncode == 0, '\n'.join(tail)
def install_via_apt(package_name):
"""Try to install a package via apt."""
try:
# Map pip package names to apt package names
apt_package_map = {
'flask': 'python3-flask',
'PIL': 'python3-pil',
'freetype': 'python3-freetype',
'psutil': 'python3-psutil',
'werkzeug': 'python3-werkzeug',
'numpy': 'python3-numpy',
'requests': 'python3-requests',
'python-dateutil': 'python3-dateutil',
'pytz': 'python3-tz',
'geopy': 'python3-geopy',
'unidecode': 'python3-unidecode',
'websockets': 'python3-websockets',
'websocket-client': 'python3-websocket-client'
}
apt_package = apt_package_map.get(package_name, f'python3-{package_name}')
print(f"Trying to install {apt_package} via apt...")
subprocess.check_call([
'sudo', 'apt', 'update'
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.check_call([
'sudo', 'apt', 'install', '-y', apt_package
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
"""Try to install a package via apt. Returns (success, output)."""
# Map pip package names to apt package names
apt_package_map = {
'flask': 'python3-flask',
'PIL': 'python3-pil',
'freetype': 'python3-freetype',
'psutil': 'python3-psutil',
'werkzeug': 'python3-werkzeug',
'numpy': 'python3-numpy',
'requests': 'python3-requests',
'python-dateutil': 'python3-dateutil',
'pytz': 'python3-tz',
'geopy': 'python3-geopy',
'unidecode': 'python3-unidecode',
'websockets': 'python3-websockets',
'websocket-client': 'python3-websocket-client'
}
apt_package = apt_package_map.get(package_name, f'python3-{package_name}')
print(f"Trying to install {apt_package} via apt...")
success, output = _run(['sudo', 'apt', 'install', '-y', apt_package])
if success:
print(f"Successfully installed {apt_package} via apt")
return True
except subprocess.CalledProcessError:
print(f"Failed to install {package_name} via apt, will try pip")
return False
return True, ""
print(f"Failed to install {apt_package} via apt, will try pip")
return False, output
def install_via_pip(package_name):
"""Install a package via pip with --break-system-packages and --prefer-binary.
@@ -54,34 +75,65 @@ def install_via_pip(package_name):
Debian/Ubuntu-based systems without a virtual environment.
--prefer-binary prefers pre-built wheels over source distributions to avoid
exhausting /tmp space during compilation.
Returns (success, output).
"""
try:
print(f"Installing {package_name} via pip...")
subprocess.check_call([
sys.executable, '-m', 'pip', 'install', '--break-system-packages', '--prefer-binary', package_name
])
print(f"Installing {package_name} via pip...")
success, output = _run([
sys.executable, '-m', 'pip', 'install', '--break-system-packages', '--prefer-binary', package_name
])
if success:
print(f"Successfully installed {package_name} via pip")
return True
except subprocess.CalledProcessError as e:
print(f"Failed to install {package_name} via pip: {e}")
return False
return True, ""
print(f"Failed to install {package_name} via pip (see failure summary at end of log)")
return False, output
# Distribution (pip/apt) names whose importable module name differs.
IMPORT_NAME_MAP = {
'python-dateutil': 'dateutil',
'websocket-client': 'websocket',
}
def check_package_installed(package_name):
"""Check if a package is already installed."""
import_name = IMPORT_NAME_MAP.get(package_name, package_name)
# Suppress deprecation warnings when checking if packages are installed
# (we're just checking, not using them)
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=DeprecationWarning)
try:
__import__(package_name)
__import__(import_name)
return True
except ImportError:
return False
def print_failure_summary(failed_packages, failure_details):
print("\n" + "=" * 60)
print("DEPENDENCY INSTALLATION FAILURES - DETAILS")
print("=" * 60)
for package in failed_packages:
print(f"\nPackage: {package}")
print("-" * 40)
output = failure_details.get(package, "").strip()
if not output:
print(" (no output captured)")
continue
for line in output.splitlines()[-ERROR_TAIL_LINES:]:
print(f" {line}")
print("=" * 60)
def main():
"""Main installation function."""
print("Installing dependencies for LED Matrix Web Interface V2...")
print("Refreshing apt package index...")
_run(['sudo', 'apt', 'update']) # best-effort; individual installs surface their own errors
# List of required packages
required_packages = [
'flask',
@@ -98,19 +150,23 @@ def main():
'websockets',
'websocket-client'
]
failed_packages = []
failure_details = {}
for package in required_packages:
if check_package_installed(package):
print(f"{package} is already installed")
continue
# Try apt first, then pip
if not install_via_apt(package):
if not install_via_pip(package):
ok, apt_output = install_via_apt(package)
if not ok:
ok, pip_output = install_via_pip(package)
if not ok:
failed_packages.append(package)
failure_details[package] = pip_output or apt_output
# Install packages that don't have apt equivalents
special_packages = [
'timezonefinder>=6.5.0,<7.0.0',
@@ -122,47 +178,49 @@ def main():
'python-socketio>=5.11.0,<6.0.0',
'python-engineio>=4.9.0,<5.0.0'
]
for package in special_packages:
if not install_via_pip(package):
ok, pip_output = install_via_pip(package)
if not ok:
failed_packages.append(package)
failure_details[package] = pip_output
# Install rgbmatrix module from local source (optional - may already be installed in Step 6)
# Check if already installed first
if check_package_installed('rgbmatrix'):
print("rgbmatrix module already installed, skipping...")
else:
print("Installing rgbmatrix module from local source...")
try:
# Get project root (parent of scripts directory)
PROJECT_ROOT = Path(__file__).parent.parent
rgbmatrix_path = PROJECT_ROOT / 'rpi-rgb-led-matrix-master' / 'bindings' / 'python'
if rgbmatrix_path.exists():
# Check if the module has been built (look for setup.py)
setup_py = rgbmatrix_path / 'setup.py'
if setup_py.exists():
# Try installing - use regular install, not editable mode
# This is optional for web interface and should already be installed in Step 6
subprocess.check_call([
sys.executable, '-m', 'pip', 'install', '--break-system-packages', str(rgbmatrix_path)
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Get project root (parent of scripts directory)
PROJECT_ROOT = Path(__file__).parent.parent
rgbmatrix_path = PROJECT_ROOT / 'rpi-rgb-led-matrix-master' / 'bindings' / 'python'
if rgbmatrix_path.exists():
# Check if the module has been built (look for setup.py)
setup_py = rgbmatrix_path / 'setup.py'
if setup_py.exists():
# Try installing - use regular install, not editable mode
# This is optional for web interface and should already be installed in Step 6
ok, output = _run([sys.executable, '-m', 'pip', 'install', '--break-system-packages', str(rgbmatrix_path)])
if ok:
print("rgbmatrix module installed successfully")
else:
print("Warning: rgbmatrix setup.py not found, module may need to be built first")
print(" This is normal if Step 6 hasn't completed yet.")
# Don't fail the whole installation - rgbmatrix is optional for web interface
# and should be installed in Step 6 of first_time_install.sh
print("Warning: Failed to install rgbmatrix module:")
for line in output.strip().splitlines()[-ERROR_TAIL_LINES:]:
print(f" {line}")
print(" This is normal if rgbmatrix hasn't been built yet (Step 6).")
print(" The web interface will work without it.")
else:
print("Warning: rgbmatrix source not found (this is normal if Step 6 hasn't run yet)")
except subprocess.CalledProcessError as e:
# Don't fail the whole installation - rgbmatrix is optional for web interface
# and should be installed in Step 6 of first_time_install.sh
print(f"Warning: Failed to install rgbmatrix module: {e}")
print(" This is normal if rgbmatrix hasn't been built yet (Step 6).")
print(" The web interface will work without it.")
# Don't add to failed_packages since it's optional
print("Warning: rgbmatrix setup.py not found, module may need to be built first")
print(" This is normal if Step 6 hasn't completed yet.")
else:
print("Warning: rgbmatrix source not found (this is normal if Step 6 hasn't run yet)")
if failed_packages:
print(f"\nFailed to install the following packages: {failed_packages}")
print("You may need to install them manually or check your system configuration.")
print_failure_summary(failed_packages, failure_details)
return False
else:
print("\nAll dependencies installed successfully!")

View File

@@ -17,7 +17,6 @@ import os
import json
import argparse
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union
# Add project root to path
PROJECT_ROOT = Path(__file__).resolve().parent.parent
@@ -28,49 +27,15 @@ os.environ['EMULATOR'] = 'true'
# Import logger after path setup so src.logging_config is importable
from src.logging_config import get_logger # noqa: E402
from src.plugin_system.testing.loading import ( # noqa: E402
find_plugin_dir, load_manifest, load_config_defaults,
)
logger = get_logger("[Render Plugin]")
MIN_DIMENSION = 1
MAX_DIMENSION = 512
def find_plugin_dir(plugin_id: str, search_dirs: Sequence[Union[str, Path]]) -> Optional[Path]:
"""Find a plugin directory by searching multiple paths."""
from src.plugin_system.plugin_loader import PluginLoader
loader = PluginLoader()
for search_dir in search_dirs:
search_path = Path(search_dir)
if not search_path.exists():
continue
result = loader.find_plugin_directory(plugin_id, search_path)
if result:
return Path(result)
return None
def load_manifest(plugin_dir: Path) -> Dict[str, Any]:
"""Load and return manifest.json from plugin directory."""
manifest_path = plugin_dir / 'manifest.json'
if not manifest_path.exists():
raise FileNotFoundError(f"No manifest.json in {plugin_dir}")
with open(manifest_path, 'r') as f:
return json.load(f)
def load_config_defaults(plugin_dir: Path) -> Dict[str, Any]:
"""Extract default values from config_schema.json."""
schema_path = plugin_dir / 'config_schema.json'
if not schema_path.exists():
return {}
with open(schema_path, 'r') as f:
schema = json.load(f)
defaults: Dict[str, Any] = {}
for key, prop in schema.get('properties', {}).items():
if 'default' in prop:
defaults[key] = prop['default']
return defaults
def main() -> int:
"""Load a plugin, call update() + display(), and save the result as a PNG image."""
parser = argparse.ArgumentParser(description='Render a plugin display to a PNG image')

View File

@@ -2186,6 +2186,23 @@ class DisplayController:
loop_completed = True
break
# LOAD-BEARING: if current_display_mode changed mid-loop (on-demand
# activation, live priority, etc.), restart the main loop now instead
# of falling into the "honour minimum duration" sleep below. That sleep
# can run for up to the *previous* mode's full display_duration (default
# 30s) and doesn't poll on-demand requests or re-check the mode, so a
# freshly-requested mode switch would sit invisible for up to 30s — or
# get clobbered by a queued stop request — before ever rendering.
#
# This guard was added in #298 (live priority interrupting long display
# durations) and was accidentally dropped in #330 as collateral damage of
# an unrelated time.monotonic() -> time.time() cleanup in the same hunk.
# Removing it again will silently reintroduce both issues. _activate_on_demand
# already sets force_change=True and clears the display, so the next loop
# iteration renders the new mode immediately.
if self.current_display_mode != active_mode:
continue
# Ensure we honour minimum duration when not dynamic and loop ended early
if (
not dynamic_enabled

View File

@@ -7,13 +7,22 @@ Provides base classes and utilities for testing LEDMatrix plugins.
from .plugin_test_base import PluginTestCase
from .mocks import MockDisplayManager, MockCacheManager, MockConfigManager, MockPluginManager
from .visual_display_manager import VisualTestDisplayManager
from .bounds_display_manager import BoundsCheckingDisplayManager
from .sizes import (
DEFAULT_TEST_SIZES, SUPPORTED_SIZES, resolve_test_sizes, size_label,
)
__all__ = [
'PluginTestCase',
'VisualTestDisplayManager',
'BoundsCheckingDisplayManager',
'MockDisplayManager',
'MockCacheManager',
'MockConfigManager',
'MockPluginManager',
'DEFAULT_TEST_SIZES',
'SUPPORTED_SIZES',
'resolve_test_sizes',
'size_label',
]

View File

@@ -0,0 +1,129 @@
"""
Bounds-checking display manager.
A VisualTestDisplayManager that draws onto an oversized canvas (the declared
panel size plus a right/bottom margin) while still reporting the declared size
to the plugin. Content that a plugin draws past the right or bottom edge lands
in the margin instead of being silently clipped by PIL, so the harness can
detect overflow — the classic symptom of hardcoded coordinates or fonts/icons
that don't scale down to a smaller panel.
Limitations (documented on purpose):
- Overflow past the LEFT or TOP edge (negative coordinates) is still clipped by
PIL and not detected here. The dominant real-world breakage is content that is
too wide/tall for a smaller panel, which this catches.
- BDF text is clipped to the declared bounds by the parent's bitmap drawer, so
BDF overflow is not flagged. Golden-image regression covers those plugins.
- If a plugin replaces the canvas with its own image (display_manager.image = ...),
the margin can't be measured and overflow is reported as undetermined (None).
"""
from typing import Optional, Tuple
from .sizes import DEFAULT_TEST_SIZES
from .visual_display_manager import VisualTestDisplayManager, _MatrixProxy
# Smallest extra band kept on the right/bottom so a few pixels of overflow are
# still visible even on the largest panel in a run.
_BASE_MARGIN = 16
# Fallback overflow reference when a caller doesn't pass one: the largest shape
# in the default sample. We extend every (smaller) canvas out to at least this
# size so content drawn at a coordinate meant for a bigger build — e.g. x=200 on
# a 64-wide panel — lands in the padded region and is flagged, instead of being
# clipped off-canvas and read as a false pass.
_DEFAULT_EXTENT_WIDTH = max(w for w, _ in DEFAULT_TEST_SIZES)
_DEFAULT_EXTENT_HEIGHT = max(h for _, h in DEFAULT_TEST_SIZES)
class BoundsCheckingDisplayManager(VisualTestDisplayManager):
"""Detects drawing that overflows the declared panel size."""
# Kept for backwards compatibility; real padding is computed per-axis below.
MARGIN = _BASE_MARGIN
def __init__(self, width: int = 128, height: int = 32,
overflow_extent: Optional[Tuple[int, int]] = None):
self._declared_width = int(width)
self._declared_height = int(height)
# Pad the canvas out to at least `overflow_extent` (the largest panel
# this run cares about) plus a base margin, so coordinates meant for a
# bigger build are caught — not clipped — when rendering a smaller panel.
# Defaults to the largest shape in the sample when no run is known.
ext_w, ext_h = overflow_extent or (_DEFAULT_EXTENT_WIDTH, _DEFAULT_EXTENT_HEIGHT)
self._canvas_width = max(self._declared_width, int(ext_w)) + _BASE_MARGIN
self._canvas_height = max(self._declared_height, int(ext_h)) + _BASE_MARGIN
# Parent builds the (oversized) backing canvas + fonts.
super().__init__(self._canvas_width, self._canvas_height)
# Plugins must see the DECLARED size, not the padded canvas size.
self.matrix = _MatrixProxy(self._declared_width, self._declared_height)
# -- declared dimensions (override parent's image-derived properties) --
@property
def width(self) -> int:
return self._declared_width
@property
def height(self) -> int:
return self._declared_height
@property
def display_width(self) -> int:
return self._declared_width
@property
def display_height(self) -> int:
return self._declared_height
# -- overflow detection --
def _canvas_is_padded(self) -> bool:
return self.image.size == (self._canvas_width, self._canvas_height)
def check_overflow(self) -> Optional[Tuple[int, int, int, int]]:
"""Bounding box (in full-canvas coords) of any drawing beyond the
declared panel, or None if nothing overflowed / undetermined."""
if not self._canvas_is_padded():
return None
exp_w = self._canvas_width
exp_h = self._canvas_height
boxes = []
right = self.image.crop((self._declared_width, 0, exp_w, exp_h)).getbbox()
if right:
boxes.append((right[0] + self._declared_width, right[1],
right[2] + self._declared_width, right[3]))
bottom = self.image.crop((0, self._declared_height, exp_w, exp_h)).getbbox()
if bottom:
boxes.append((bottom[0], bottom[1] + self._declared_height,
bottom[2], bottom[3] + self._declared_height))
if not boxes:
return None
return (
min(b[0] for b in boxes), min(b[1] for b in boxes),
max(b[2] for b in boxes), max(b[3] for b in boxes),
)
# -- snapshot/image accessors return the cropped, true-panel image --
def declared_image(self):
"""The visible panel: the canvas cropped to the declared size."""
if self._canvas_is_padded():
return self.image.crop((0, 0, self._declared_width, self._declared_height))
return self.image
def save_snapshot(self, path: str) -> None:
self.declared_image().save(path, format='PNG')
def get_image(self):
return self.declared_image()
def get_image_base64(self) -> str:
import base64
import io
buffer = io.BytesIO()
self.declared_image().save(buffer, format='PNG')
return base64.b64encode(buffer.getvalue()).decode('utf-8')

View File

@@ -0,0 +1,314 @@
"""
Plugin safety harness.
Renders a plugin across every declared screen (mode) and every supported matrix
size, capturing crashes and overflow. Used by scripts/check_plugin.py and the
pytest matrix test to guarantee a plugin change doesn't break a screen at a size
the author didn't try.
The render flow mirrors scripts/render_plugin.py (same PluginLoader call), but
this module adds: multi-size iteration, per-mode rendering, overflow detection
via BoundsCheckingDisplayManager, and golden-image comparison.
"""
import contextlib
import http.client
import inspect
import socket
import ssl
import urllib.error
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from PIL import Image, ImageChops
from src.logging_config import get_logger
from .bounds_display_manager import BoundsCheckingDisplayManager
from .loading import load_config_defaults, load_manifest
from .sizes import DEFAULT_TEST_SIZES, safe_mode_filename, size_label
logger = get_logger("[Plugin Harness]")
def _tolerated_update_errors() -> Tuple[type, ...]:
"""Exception types from update() we treat as a tolerated no-connectivity
failure (expected in CI / headless dev) rather than a real plugin bug.
Anything NOT in this set is a genuine regression — a plugin that lets a
non-network exception escape update() should fail the harness, not pass
green because display() happened to survive.
"""
types: List[type] = [
ConnectionError, TimeoutError, # builtins
socket.gaierror, socket.timeout, # DNS / socket timeouts
ssl.SSLError,
urllib.error.URLError,
http.client.HTTPException,
]
try: # requests is optional; cover its whole error tree when present
import requests
types.append(requests.exceptions.RequestException)
except ImportError: # pragma: no cover - requests not installed
logger.debug("requests not installed; its connectivity errors won't be specifically tolerated")
return tuple(types)
_TOLERATED_UPDATE_ERRORS = _tolerated_update_errors()
@dataclass
class RenderResult:
"""Outcome of rendering one (size, mode) of a plugin."""
plugin_id: str
width: int
height: int
mode: str
image: Optional[Image.Image] = None
error: Optional[str] = None # fatal: load/display crash, or a non-network update() error
update_error: Optional[str] = None # tolerated: connectivity error from update() (no network in CI)
overflow: Optional[Tuple[int, int, int, int]] = None # bbox past the panel
# golden comparison (populated only when a golden was provided)
golden_checked: bool = False
golden_ok: Optional[bool] = None
golden_diff_pixels: int = 0
golden_max_delta: int = 0
@property
def size_label(self) -> str:
return size_label(self.width, self.height)
@property
def ok(self) -> bool:
"""Phase-1 pass: rendered without crashing and without overflow, and if a
golden was checked it matched."""
if self.error is not None or self.overflow is not None:
return False
if self.golden_checked and self.golden_ok is False:
return False
return True
def list_modes(plugin_instance: Any, manifest: Dict[str, Any], plugin_id: str) -> List[str]:
"""Enumerate a plugin's screens: instance.modes wins, then manifest
display_modes, then the plugin id as a single mode."""
modes = getattr(plugin_instance, "modes", None)
if modes:
return [str(m) for m in modes]
declared = manifest.get("display_modes")
if declared:
return [str(m) for m in declared]
return [plugin_id]
def _instantiate(plugin_id: str, manifest: Dict[str, Any], plugin_dir: Path,
config: Dict[str, Any], mock_data: Dict[str, Any],
display_manager: Any) -> Any:
"""Load and construct a plugin instance with mocked managers."""
from src.plugin_system.plugin_loader import PluginLoader
from src.plugin_system.testing import MockCacheManager, MockPluginManager
cache_manager = MockCacheManager()
for key, value in (mock_data or {}).items():
cache_manager.set(key, value)
loader = PluginLoader()
plugin_instance, _module = loader.load_plugin(
plugin_id=plugin_id,
manifest=manifest,
plugin_dir=plugin_dir,
config=config,
display_manager=display_manager,
cache_manager=cache_manager,
plugin_manager=MockPluginManager(),
install_deps=False,
)
return plugin_instance
def _render_mode(plugin_instance: Any, mode: str) -> None:
"""Render a specific screen. Prefer an explicit display_mode kwarg; otherwise
drive the plugin's internal mode state machine (first display() call renders
modes[current_mode_index] when current_display_mode is None)."""
sig = inspect.signature(plugin_instance.display)
if "display_mode" in sig.parameters:
plugin_instance.display(force_clear=True, display_mode=mode)
return
modes = getattr(plugin_instance, "modes", None)
if modes and mode in modes:
plugin_instance.current_mode_index = list(modes).index(mode)
if hasattr(plugin_instance, "current_display_mode"):
plugin_instance.current_display_mode = None
plugin_instance.display(force_clear=False)
def _freeze(freeze_time: Optional[str]):
"""Context manager that freezes wall-clock time when freeze_time is given,
so time-dependent plugins (clocks, countdowns) render deterministic goldens."""
if not freeze_time:
return contextlib.nullcontext()
try:
from freezegun import freeze_time as _ft
except ImportError as e: # pragma: no cover - only hit without the dep
raise RuntimeError(
"freeze_time requires the 'freezegun' package (pip install freezegun)"
) from e
return _ft(freeze_time)
def render_plugin_matrix(
plugin_id: str,
plugin_dir: Path,
config: Optional[Dict[str, Any]] = None,
mock_data: Optional[Dict[str, Any]] = None,
sizes: Optional[List[Tuple[int, int]]] = None,
run_update: bool = True,
freeze_time: Optional[str] = None,
) -> List[RenderResult]:
"""Render every (size, mode) combination for a plugin.
Returns a flat list of RenderResult. A fresh plugin instance is built per
(size, mode) so state never leaks between screens. Pass freeze_time (e.g.
"2025-08-01 15:25:00") to make time-dependent plugins reproducible.
"""
plugin_dir = Path(plugin_dir)
manifest = load_manifest(plugin_dir)
# Start from config_schema.json defaults so the plugin behaves like a real
# install; explicit caller config still wins over a schema default.
config = {"enabled": True, **load_config_defaults(plugin_dir), **(config or {})}
sizes = sizes or DEFAULT_TEST_SIZES
results: List[RenderResult] = []
# The largest panel in this run. Every (smaller) canvas is padded out to it
# so a coordinate meant for the biggest configuration is still caught when
# rendering a smaller one, instead of being clipped into a false pass.
extent = (max(w for w, _ in sizes), max(h for _, h in sizes))
with _freeze(freeze_time):
for width, height in sizes:
results.extend(_render_size(
plugin_id, manifest, plugin_dir, config, mock_data or {},
width, height, run_update, extent,
))
return results
def _render_size(plugin_id, manifest, plugin_dir, config, mock_data,
width, height, run_update, extent) -> List[RenderResult]:
"""Render every mode at one size. A fresh instance per mode avoids state leaks."""
results: List[RenderResult] = []
# Discover modes once per size (instance build can depend on config).
try:
probe_dm = BoundsCheckingDisplayManager(width=width, height=height, overflow_extent=extent)
probe = _instantiate(plugin_id, manifest, plugin_dir, config, mock_data, probe_dm)
modes = list_modes(probe, manifest, plugin_id)
except Exception as e: # noqa: BLE001 — surface any load failure as a result
return [RenderResult(plugin_id, width, height, "<load>", error=repr(e))]
for mode in modes:
result = RenderResult(plugin_id, width, height, mode)
dm = BoundsCheckingDisplayManager(width=width, height=height, overflow_extent=extent)
try:
inst = _instantiate(plugin_id, manifest, plugin_dir, config, mock_data, dm)
if run_update:
try:
inst.update()
except _TOLERATED_UPDATE_ERRORS as e:
# Expected when CI / headless dev has no network: record it
# (surfaced in the report) but don't fail the run.
result.update_error = repr(e)
logger.debug("update() connectivity error for %s [%s]: %s", plugin_id, mode, e)
except Exception as e: # noqa: BLE001 — a non-network update() failure is a real bug
# A regression in update() must not pass green just because
# display() survives, so treat it as a failure of this render.
result.error = repr(e)
logger.warning("update() raised a non-connectivity error for %s [%s]: %s",
plugin_id, mode, e)
if result.error is None:
_render_mode(inst, mode)
result.image = dm.get_image()
result.overflow = dm.check_overflow()
except Exception as e: # noqa: BLE001 — a display crash is a real failure
result.error = repr(e)
results.append(result)
return results
# ---------------------------------------------------------------------------
# Golden-image comparison
# ---------------------------------------------------------------------------
def compare_images(rendered: Image.Image, golden: Image.Image,
max_delta: int = 0, max_diff_pixels: int = 0) -> Tuple[bool, int, int]:
"""Compare two images. Returns (ok, diff_pixel_count, max_per_channel_delta).
Tolerances default to exact match; bump them only to absorb known platform
anti-aliasing noise (requires a pinned Pillow + bundled fonts for stability).
"""
if rendered.size != golden.size:
return False, rendered.size[0] * rendered.size[1], 255
a = rendered.convert("RGB")
b = golden.convert("RGB")
diff = ImageChops.difference(a, b)
bbox = diff.getbbox()
if bbox is None:
return True, 0, 0
# Count pixels whose largest per-channel delta exceeds the allowed tolerance,
# and track the worst delta seen (for reporting).
diff_pixels = 0
observed_max = 0
for px in diff.crop(bbox).getdata():
m = max(px) if isinstance(px, tuple) else px
if m > observed_max:
observed_max = m
if m > max_delta:
diff_pixels += 1
# Pass when the number of out-of-tolerance pixels is within budget.
ok = diff_pixels <= max_diff_pixels
return ok, diff_pixels, observed_max
def golden_path(golden_dir: Path, width: int, height: int, mode: str) -> Path:
"""Location of a golden image: <golden_dir>/<WxH>/<mode>.png.
The mode is sanitized to a safe basename so a mode name with '/' or '..'
can't read or write outside the golden directory.
"""
return Path(golden_dir) / size_label(width, height) / f"{safe_mode_filename(mode)}.png"
def compare_to_goldens(results: List[RenderResult], golden_dir: Path,
max_delta: int = 0, max_diff_pixels: int = 0) -> List[RenderResult]:
"""Compare rendered results against committed goldens, mutating each result's
golden_* fields. Results with no golden file on disk are left unchecked."""
for r in results:
if r.image is None:
continue
gp = golden_path(golden_dir, r.width, r.height, r.mode)
if not gp.exists():
continue
r.golden_checked = True
with Image.open(gp) as g:
ok, diff_pixels, observed_max = compare_images(
r.image, g, max_delta=max_delta, max_diff_pixels=max_diff_pixels)
r.golden_ok = ok
r.golden_diff_pixels = diff_pixels
r.golden_max_delta = observed_max
return results
def write_goldens(results: List[RenderResult], golden_dir: Path) -> int:
"""Write each successfully-rendered result to its golden path. Returns count."""
written = 0
for r in results:
if r.image is None or r.error is not None:
continue
gp = golden_path(golden_dir, r.width, r.height, r.mode)
gp.parent.mkdir(parents=True, exist_ok=True)
r.image.save(gp, format="PNG")
written += 1
return written

View File

@@ -0,0 +1,82 @@
"""
Shared helpers for loading a plugin headlessly.
Used by scripts/render_plugin.py, scripts/check_plugin.py, and the harness so
plugin discovery / manifest / config-default logic lives in exactly one place.
"""
import json
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union
def find_plugin_dir(plugin_id: str, search_dirs: Sequence[Union[str, Path]]) -> Optional[Path]:
"""Find a plugin directory by searching multiple paths."""
from src.plugin_system.plugin_loader import PluginLoader
loader = PluginLoader()
for search_dir in search_dirs:
search_path = Path(search_dir)
if not search_path.exists():
continue
result = loader.find_plugin_directory(plugin_id, search_path)
if result:
return Path(result)
return None
def load_manifest(plugin_dir: Union[str, Path]) -> Dict[str, Any]:
"""Load and return manifest.json from a plugin directory."""
manifest_path = Path(plugin_dir) / 'manifest.json'
if not manifest_path.exists():
raise FileNotFoundError(f"No manifest.json in {plugin_dir}")
with open(manifest_path, 'r') as f:
return json.load(f)
def load_config_defaults(plugin_dir: Union[str, Path]) -> Dict[str, Any]:
"""Extract default values from a plugin's config_schema.json (empty if none)."""
schema_path = Path(plugin_dir) / 'config_schema.json'
if not schema_path.exists():
return {}
with open(schema_path, 'r') as f:
schema = json.load(f)
defaults: Dict[str, Any] = {}
for key, prop in schema.get('properties', {}).items():
if isinstance(prop, dict) and 'default' in prop:
defaults[key] = prop['default']
return defaults
def load_harness_spec(plugin_dir: Union[str, Path]) -> Dict[str, Any]:
"""Optional per-plugin harness settings from <plugin>/test/harness.json.
Lets a plugin opt into golden-image testing by declaring how to render it
deterministically. All keys optional:
{
"config": {...}, # config overrides
"mock_data": "fixtures/mock.json", # path (relative to plugin dir) to cache fixtures
"freeze_time": "2025-08-01 15:25:00",
"skip_update": false
}
Returns {} when no harness.json exists.
"""
spec_path = Path(plugin_dir) / 'test' / 'harness.json'
if not spec_path.exists():
return {}
with open(spec_path, 'r') as f:
spec = json.load(f)
# Resolve mock_data path and inline its contents for convenience.
mock_rel = spec.get('mock_data')
if mock_rel:
mock_path = Path(plugin_dir) / mock_rel
if not mock_path.exists():
# A declared-but-missing fixture is a harness config error: failing
# loudly beats silently rendering the plugin with no mock data.
raise FileNotFoundError(
f"harness.json references mock_data '{mock_rel}' but "
f"{mock_path} does not exist"
)
with open(mock_path, 'r') as mf:
spec['mock_data_contents'] = json.load(mf)
return spec

View File

@@ -63,11 +63,23 @@ class MockCacheManager:
"""Mock cache manager for testing."""
def __init__(self):
import shutil
import tempfile
import weakref
self._cache: Dict[str, Any] = {}
self._cache_timestamps: Dict[str, float] = {}
self.get_calls = []
self.set_calls = []
self.delete_calls = []
# Real temp dir for plugins that write/read files under cache_dir.
# Registered for cleanup so each mock instance doesn't leak a tmp dir.
self.cache_dir = tempfile.mkdtemp(prefix="ledmatrix-mock-cache-")
self._finalizer = weakref.finalize(
self, shutil.rmtree, self.cache_dir, ignore_errors=True)
def cleanup(self) -> None:
"""Remove the temp cache directory created for this instance."""
self._finalizer()
def get(self, key: str, max_age: Optional[float] = None) -> Optional[Any]:
"""Get a value from cache."""

View File

@@ -0,0 +1,120 @@
"""
LED matrix sizes the plugin safety harness renders against.
There is no fixed set of "supported" panel sizes — an RGB matrix build can be
any width/height and configuration (square, rectangle, 2x2, 4x4, 8x2, long
strips, tall stacks, ...). Plugins are expected to read width/height
dynamically and lay themselves out accordingly, so the harness's job is to
prove a plugin survives a *spread* of shapes, not a canonical list.
`DEFAULT_TEST_SIZES` is therefore a representative SAMPLE chosen to span the
axes of variation (narrow, wide, square, tall, small, long), not an
exhaustive or authoritative list. Callers can override it entirely:
- CLI: scripts/check_plugin.py --sizes 8x16,64x64,256x32
- pytest: LEDMATRIX_TEST_SIZES="8x16,64x64" env var (all plugins), or
per-plugin test/harness.json {"sizes": [[8, 16], [64, 64]]}
so anyone can point the harness at the exact panel(s) their build uses.
"""
import os
from typing import Iterable, List, Optional, Sequence, Tuple, Union
# A spread of real panel-grid arrangements (each module is 64x32), not a list of
# "blessed" sizes. Each entry exercises a different layout assumption a plugin
# might accidentally bake in. Annotations are the panel grid (cols x rows).
DEFAULT_TEST_SIZES: List[Tuple[int, int]] = [
(64, 32), # 1x1 — single panel, the tightest common rectangle
(128, 32), # 2x1 — the baseline most plugins are tuned for
(64, 64), # 1x2 — stacked, exercises tall-narrow centering
(128, 64), # 2x2 — block, icon scaling / vertical centering
(256, 32), # 4x1 — long strip, wide horizontal layout
(128, 96), # 2x3 — tall, exercises vertical overflow
(256, 128), # 4x4 — large block, both dimensions big at once
]
# Backwards-compatible alias. Prefer DEFAULT_TEST_SIZES in new code — the old
# name implied these were the only valid panel sizes, which they are not.
SUPPORTED_SIZES = DEFAULT_TEST_SIZES
def size_label(width: int, height: int) -> str:
"""Human/path-friendly label for a size, e.g. '128x32'."""
return f"{width}x{height}"
def parse_size_token(token: str) -> Tuple[int, int]:
"""Parse a single 'WxH' token into an (int, int) pair.
Raises ValueError (with a user-friendly message) on malformed input so
callers can surface it however they like.
"""
cleaned = token.strip().lower()
if "x" not in cleaned:
raise ValueError(f"Invalid size '{token}' (expected WxH, e.g. 128x32)")
w, h = cleaned.split("x", 1)
try:
width, height = int(w), int(h)
except ValueError as exc:
raise ValueError(
f"Invalid size '{token}' (expected numeric WxH, e.g. 128x32)"
) from exc
if width <= 0 or height <= 0:
raise ValueError(
f"Invalid size '{token}' (width and height must be positive, e.g. 128x32)"
)
return (width, height)
def coerce_sizes(
value: Union[str, Iterable[Sequence[int]], None]
) -> Optional[List[Tuple[int, int]]]:
"""Normalize a size spec into a list of (w, h) tuples, or None if empty.
Accepts a comma-separated 'WxH,WxH' string (CLI / env var) or an iterable
of [w, h] / (w, h) pairs (harness.json). Returns None when value is falsy
so callers can fall back to the default sample.
"""
if not value:
return None
if isinstance(value, str):
return [parse_size_token(tok) for tok in value.split(",") if tok.strip()]
sizes: List[Tuple[int, int]] = []
for pair in value:
w, h = pair # raises if not a 2-element sequence
width, height = int(w), int(h)
if width <= 0 or height <= 0:
raise ValueError(f"Invalid size pair {pair!r} (width and height must be positive)")
sizes.append((width, height))
return sizes or None
def resolve_test_sizes(
spec_sizes: Union[str, Iterable[Sequence[int]], None] = None,
) -> List[Tuple[int, int]]:
"""Decide which sizes to render, by precedence:
1. LEDMATRIX_TEST_SIZES env var — a global "test on my hardware" override
that wins for every plugin.
2. spec_sizes — e.g. a per-plugin harness.json "sizes" list.
3. DEFAULT_TEST_SIZES — the representative sample.
"""
env = coerce_sizes(os.environ.get("LEDMATRIX_TEST_SIZES"))
if env:
return env
spec = coerce_sizes(spec_sizes)
if spec:
return spec
return list(DEFAULT_TEST_SIZES)
def safe_mode_filename(mode: str) -> str:
"""A filesystem-safe basename for a plugin mode.
Mode names come from plugin metadata/render state, so a value containing
'/' or '..' could otherwise escape the intended output directory. Collapse
anything that isn't alphanumeric / dash / underscore to '_'.
"""
cleaned = "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in mode)
return cleaned or "mode"

View File

@@ -0,0 +1,255 @@
"""
Unit tests for the plugin safety harness primitives:
bounds detection, image comparison, and mode enumeration.
These don't load real plugins, so they run anywhere (including core CI where
plugin-repos is empty).
"""
import importlib.util
import json
from pathlib import Path
import pytest
from PIL import Image
from src.plugin_system.testing.bounds_display_manager import BoundsCheckingDisplayManager
from src.plugin_system.testing.harness import (
_TOLERATED_UPDATE_ERRORS, compare_images, list_modes,
)
from src.plugin_system.testing.sizes import (
DEFAULT_TEST_SIZES, coerce_sizes, parse_size_token, resolve_test_sizes,
)
class TestBoundsDetection:
def test_reports_declared_size_not_canvas_size(self):
dm = BoundsCheckingDisplayManager(width=64, height=32)
assert dm.width == 64 and dm.height == 32
assert dm.matrix.width == 64 and dm.matrix.height == 32
# Backing canvas is padded out past the declared panel so far-overshoot
# coordinates land on-canvas and get flagged instead of clipped.
canvas_w, canvas_h = dm.image.size
assert canvas_w > 64 and canvas_h > 32
def test_far_overshoot_on_small_panel_is_detected(self):
# A coordinate meant for a wide build (x past 64) must still be caught
# when the declared panel is only 64 wide.
dm = BoundsCheckingDisplayManager(width=64, height=32)
dm.draw.rectangle([200, 5, 210, 10], fill=(255, 0, 0))
bbox = dm.check_overflow()
assert bbox is not None
assert bbox[0] >= 64
def test_in_bounds_drawing_has_no_overflow(self):
dm = BoundsCheckingDisplayManager(width=64, height=32)
dm.draw.rectangle([0, 0, 63, 31], fill=(255, 255, 255))
assert dm.check_overflow() is None
def test_right_overflow_is_detected(self):
dm = BoundsCheckingDisplayManager(width=64, height=32)
# Draw a few pixels past the right edge.
dm.draw.rectangle([60, 5, 70, 10], fill=(255, 0, 0))
bbox = dm.check_overflow()
assert bbox is not None
assert bbox[0] >= 64 # overflow starts at or past the declared width
def test_bottom_overflow_is_detected(self):
dm = BoundsCheckingDisplayManager(width=64, height=32)
dm.draw.rectangle([5, 30, 10, 40], fill=(0, 255, 0))
bbox = dm.check_overflow()
assert bbox is not None
assert bbox[3] > 32 # overflow extends past the declared height
def test_declared_image_is_cropped_to_panel(self):
dm = BoundsCheckingDisplayManager(width=64, height=32)
assert dm.get_image().size == (64, 32)
def test_snapshot_saves_cropped_panel(self, tmp_path):
dm = BoundsCheckingDisplayManager(width=128, height=32)
out = tmp_path / "snap.png"
dm.save_snapshot(str(out))
with Image.open(out) as img:
assert img.size == (128, 32)
class TestArbitraryPanelSizes:
"""The harness must handle any panel shape, not a fixed supported list."""
def test_overflow_extent_pads_to_largest_in_run(self):
# A wide run (extent 256) means content at x=200 on a 64-wide panel is
# caught; the same draw with a small extent would be clipped (false pass).
wide = BoundsCheckingDisplayManager(width=64, height=32, overflow_extent=(256, 32))
wide.draw.rectangle([200, 5, 210, 10], fill=(255, 0, 0))
assert wide.check_overflow() is not None
tight = BoundsCheckingDisplayManager(width=64, height=32, overflow_extent=(64, 32))
tight.draw.rectangle([200, 5, 210, 10], fill=(255, 0, 0))
assert tight.check_overflow() is None # clipped beyond the small canvas
def test_unusual_shapes_report_their_declared_size(self):
for w, h in [(8, 2), (6, 6), (200, 8), (64, 96)]:
dm = BoundsCheckingDisplayManager(width=w, height=h)
assert dm.width == w and dm.height == h
assert dm.matrix.width == w and dm.matrix.height == h
class TestUpdateErrorClassification:
"""update() may fail for lack of network (tolerated) but a logic bug must
not pass green just because display() survives."""
def test_connectivity_errors_are_tolerated(self):
import socket
import urllib.error
for exc in (ConnectionError("x"), TimeoutError("x"), socket.gaierror("x"),
urllib.error.URLError("x")):
assert isinstance(exc, _TOLERATED_UPDATE_ERRORS)
def test_logic_errors_are_not_tolerated(self):
for exc in (ValueError("x"), KeyError("x"), AttributeError("x"), TypeError("x")):
assert not isinstance(exc, _TOLERATED_UPDATE_ERRORS)
class TestSizeParsing:
def test_parse_size_token_ok(self):
assert parse_size_token(" 128X32 ") == (128, 32)
def test_parse_size_token_rejects_garbage(self):
with pytest.raises(ValueError):
parse_size_token("128xabc")
with pytest.raises(ValueError):
parse_size_token("128-32")
def test_rejects_non_positive_dimensions(self):
for bad in ("0x32", "-64x32", "64x0", "64x-1"):
with pytest.raises(ValueError):
parse_size_token(bad)
with pytest.raises(ValueError):
coerce_sizes([[0, 32]])
with pytest.raises(ValueError):
coerce_sizes("64x-1")
def test_coerce_sizes_from_string_and_pairs(self):
assert coerce_sizes("8x16,64x64") == [(8, 16), (64, 64)]
assert coerce_sizes([[8, 16], (64, 64)]) == [(8, 16), (64, 64)]
assert coerce_sizes(None) is None
assert coerce_sizes("") is None
def test_resolve_precedence_env_then_spec_then_default(self, monkeypatch):
monkeypatch.delenv("LEDMATRIX_TEST_SIZES", raising=False)
assert resolve_test_sizes(None) == list(DEFAULT_TEST_SIZES)
assert resolve_test_sizes([[8, 16]]) == [(8, 16)]
monkeypatch.setenv("LEDMATRIX_TEST_SIZES", "5x5")
# env wins over a per-plugin spec
assert resolve_test_sizes([[8, 16]]) == [(5, 5)]
class TestCompareImages:
def test_identical_images_match(self):
a = Image.new("RGB", (16, 16), (10, 20, 30))
b = a.copy()
ok, diff_pixels, max_delta = compare_images(a, b)
assert ok and diff_pixels == 0 and max_delta == 0
def test_different_images_fail_at_zero_tolerance(self):
a = Image.new("RGB", (16, 16), (0, 0, 0))
b = a.copy()
b.putpixel((1, 1), (255, 255, 255))
ok, diff_pixels, max_delta = compare_images(a, b)
assert not ok and diff_pixels == 1 and max_delta == 255
def test_tolerance_absorbs_small_noise(self):
a = Image.new("RGB", (16, 16), (100, 100, 100))
b = a.copy()
b.putpixel((2, 2), (103, 100, 100)) # delta 3
ok, _, max_delta = compare_images(a, b, max_delta=5, max_diff_pixels=0)
assert ok and max_delta == 3
def test_size_mismatch_fails(self):
a = Image.new("RGB", (16, 16))
b = Image.new("RGB", (32, 16))
ok, _, _ = compare_images(a, b)
assert not ok
class TestListModes:
def test_instance_modes_take_precedence(self):
inst = type("P", (), {"modes": ["a", "b"]})()
assert list_modes(inst, {"display_modes": ["x"]}, "pid") == ["a", "b"]
def test_falls_back_to_manifest_display_modes(self):
inst = type("P", (), {})()
assert list_modes(inst, {"display_modes": ["x", "y"]}, "pid") == ["x", "y"]
def test_falls_back_to_plugin_id(self):
inst = type("P", (), {})()
assert list_modes(inst, {}, "pid") == ["pid"]
def _load_check_plugin_cli():
"""Load scripts/check_plugin.py by path (it isn't an importable package)."""
root = Path(__file__).resolve().parents[2]
path = root / "scripts" / "check_plugin.py"
spec = importlib.util.spec_from_file_location("check_plugin_cli", path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _make_fixture_plugin(tmp_path, harness):
"""Create a minimal plugin dir with a test/harness.json; return its parent
(the search dir)."""
pdir = tmp_path / "plugins" / "demo-clock"
(pdir / "test").mkdir(parents=True)
(pdir / "manifest.json").write_text(json.dumps({
"id": "demo-clock", "name": "Demo Clock", "version": "1.0.0",
"author": "test", "entry_point": "manager.py", "class_name": "DemoClock",
"display_modes": ["demo-clock"], "compatible_versions": ["*"],
}))
(pdir / "test" / "harness.json").write_text(json.dumps(harness))
return pdir.parent
class TestCheckPluginHonorsHarnessJson:
"""Regression: check_plugin.py (the CI tool) must apply test/harness.json so
its render reproduces the committed goldens — otherwise time/data-dependent
plugins drift on every CI run."""
def test_harness_json_supplies_render_settings(self, tmp_path, monkeypatch):
mod = _load_check_plugin_cli()
search = _make_fixture_plugin(tmp_path, {
"config": {"timezone": "UTC"},
"freeze_time": "2025-08-01 15:25:00",
"sizes": [[128, 32]],
})
captured = {}
monkeypatch.setattr(mod, "render_plugin_matrix",
lambda **kw: captured.update(kw) or [])
monkeypatch.setattr(mod, "compare_to_goldens", lambda *a, **k: [])
mod.check_one(
plugin_id="demo-clock", search_dirs=[str(search)], sizes=None,
mock_data={}, config={}, run_update=True, out_dir=None,
update_golden=False, golden_dir_override=None, freeze_time=None,
)
assert captured["freeze_time"] == "2025-08-01 15:25:00"
assert captured["config"]["timezone"] == "UTC"
assert captured["sizes"] == [(128, 32)]
def test_cli_flags_override_harness_json(self, tmp_path, monkeypatch):
mod = _load_check_plugin_cli()
search = _make_fixture_plugin(tmp_path, {
"config": {"timezone": "UTC"},
"freeze_time": "2025-08-01 15:25:00",
})
captured = {}
monkeypatch.setattr(mod, "render_plugin_matrix",
lambda **kw: captured.update(kw) or [])
monkeypatch.setattr(mod, "compare_to_goldens", lambda *a, **k: [])
mod.check_one(
plugin_id="demo-clock", search_dirs=[str(search)], sizes=None,
mock_data={}, config={"timezone": "America/New_York"},
run_update=True, out_dir=None, update_golden=False,
golden_dir_override=None, freeze_time="2030-01-01 00:00:00",
)
assert captured["freeze_time"] == "2030-01-01 00:00:00"
assert captured["config"]["timezone"] == "America/New_York"

View File

@@ -0,0 +1,115 @@
"""
Cross-size / cross-screen plugin safety test.
For every discovered plugin, render every declared screen at every supported
matrix size and assert it: loads, renders without crashing, stays within the
panel bounds, and — for plugins that ship golden images — matches them.
Plugin discovery (first match wins):
- $LEDMATRIX_PLUGINS_DIR (os.pathsep-separated list of dirs), else
- <project_root>/plugin-repos and <project_root>/plugins
A plugin opts into golden-image checks by adding test/golden/<WxH>/<mode>.png
(and usually test/harness.json for deterministic config / mock data / time).
"""
import os
from pathlib import Path
from typing import Dict, List
import pytest
from src.plugin_system.testing.harness import (
render_plugin_matrix, compare_to_goldens,
)
from src.plugin_system.testing.loading import load_config_defaults, load_harness_spec
from src.plugin_system.testing.sizes import resolve_test_sizes
PROJECT_ROOT = Path(__file__).resolve().parents[2]
# Set LEDMATRIX_REQUIRE_PLUGINS=1 in any CI/hardware pipeline where plugins are
# expected to be present, so a discovery drift (empty search path) fails loudly
# instead of silently skipping and losing this safety signal.
_REQUIRE_PLUGINS = os.environ.get("LEDMATRIX_REQUIRE_PLUGINS") == "1"
def _plugin_search_dirs() -> List[Path]:
env = os.environ.get("LEDMATRIX_PLUGINS_DIR")
if env:
return [Path(p) for p in env.split(os.pathsep) if p]
return [PROJECT_ROOT / "plugin-repos", PROJECT_ROOT / "plugins"]
def _discover() -> Dict[str, Path]:
"""Map plugin_id -> plugin_dir for all plugins on the search path."""
found: Dict[str, Path] = {}
for base in _plugin_search_dirs():
if not base.exists():
continue
for child in sorted(base.iterdir()):
if (child / "manifest.json").exists() and child.name not in found:
found[child.name] = child
return found
_PLUGINS = _discover()
@pytest.mark.plugin
def test_plugins_were_discovered() -> None:
"""Guard against silently skipping the whole matrix when discovery drifts.
Local dev and the plugin-less core CI legitimately have no plugins, so we
skip there; but when LEDMATRIX_REQUIRE_PLUGINS=1 an empty search path is a
hard failure rather than a green no-op.
"""
if _PLUGINS:
return
search = [str(p) for p in _plugin_search_dirs()]
if _REQUIRE_PLUGINS:
pytest.fail(
"LEDMATRIX_REQUIRE_PLUGINS=1 but no plugins were discovered on the "
f"search path: {search}"
)
pytest.skip(f"no plugins found on the search path: {search}")
@pytest.mark.plugin
@pytest.mark.skipif(not _PLUGINS, reason="no plugins found on the search path")
@pytest.mark.parametrize("plugin_id", sorted(_PLUGINS))
def test_plugin_renders_across_sizes_and_screens(plugin_id: str) -> None:
plugin_dir = _PLUGINS[plugin_id]
spec = load_harness_spec(plugin_dir)
config = {"enabled": True}
config.update(load_config_defaults(plugin_dir))
config.update(spec.get("config", {}))
# Sizes: LEDMATRIX_TEST_SIZES env (test on real hardware) wins, then the
# plugin's own harness.json "sizes", else the default representative sample.
sizes = resolve_test_sizes(spec.get("sizes"))
results = render_plugin_matrix(
plugin_id=plugin_id,
plugin_dir=plugin_dir,
config=config,
mock_data=spec.get("mock_data_contents", {}),
sizes=sizes,
run_update=not spec.get("skip_update", False),
freeze_time=spec.get("freeze_time"),
)
compare_to_goldens(results, plugin_dir / "test" / "golden")
failures = []
for r in results:
if r.error is not None:
failures.append(f"{r.size_label} {r.mode}: crashed: {r.error}")
elif r.overflow is not None:
failures.append(f"{r.size_label} {r.mode}: overflow past panel bbox={r.overflow}")
elif r.golden_checked and r.golden_ok is False:
failures.append(
f"{r.size_label} {r.mode}: golden drift {r.golden_diff_pixels}px "
f"(max Δ={r.golden_max_delta})"
)
assert not failures, f"{plugin_id} failed:\n " + "\n ".join(failures)

View File

@@ -0,0 +1,76 @@
"""Guards that every privileged systemctl call the web interface makes is
covered by a passwordless-sudo grant in configure_web_sudo.sh.
The web interface runs headless (no TTY), so any `sudo` call that is not
matched by a NOPASSWD rule in /etc/sudoers.d/ledmatrix_web falls back to a
password prompt and fails with:
sudo: a terminal is required to read the password
sudo matches the command line by exact string, so `systemctl start ledmatrix`
and `systemctl start ledmatrix.service` are NOT interchangeable. This test
parses both the production blueprint and the sudoers-generator script and
asserts the (verb, unit) pairs line up, catching the suffix-mismatch class of
bug before it ships.
"""
import ast
import re
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[2]
API_V3 = PROJECT_ROOT / "web_interface" / "blueprints" / "api_v3.py"
SUDOERS_SCRIPT = PROJECT_ROOT / "scripts" / "install" / "configure_web_sudo.sh"
def _sudo_systemctl_calls(source: str) -> set[tuple[str, str]]:
"""Return (verb, unit) for every list literal beginning with
['sudo', 'systemctl', ...] passed to a subprocess call in the source."""
calls: set[tuple[str, str]] = set()
for node in ast.walk(ast.parse(source)):
if not isinstance(node, ast.List):
continue
elts = node.elts
if len(elts) < 4:
continue
if not all(isinstance(e, ast.Constant) and isinstance(e.value, str) for e in elts[:4]):
continue
if elts[0].value == "sudo" and elts[1].value == "systemctl":
calls.add((elts[2].value, elts[3].value))
return calls
def _granted_systemctl_rules(script: str) -> set[tuple[str, str]]:
"""Return (verb, unit) for each `$SYSTEMCTL_PATH <verb> <unit>` NOPASSWD
grant emitted by the sudoers-generator script."""
rules: set[tuple[str, str]] = set()
for match in re.finditer(r"\$SYSTEMCTL_PATH\s+(\S+)\s+(\S+)", script):
verb, unit = match.group(1), match.group(2).rstrip('"')
rules.add((verb, unit))
return rules
def test_every_sudo_systemctl_call_is_granted() -> None:
calls = _sudo_systemctl_calls(API_V3.read_text())
rules = _granted_systemctl_rules(SUDOERS_SCRIPT.read_text())
assert calls, "expected to find sudo systemctl calls in api_v3.py"
uncovered = {c for c in calls if c not in rules}
assert not uncovered, (
"These sudo systemctl calls have no matching NOPASSWD grant in "
"configure_web_sudo.sh; they will fail headless with "
"'sudo: a terminal is required to read the password': "
+ ", ".join(f"systemctl {v} {u}" for v, u in sorted(uncovered))
)
def test_units_are_fully_qualified() -> None:
"""Privileged systemctl calls must name the unit as <name>.service so they
match the sudoers grants, which use the fully-qualified unit name."""
calls = _sudo_systemctl_calls(API_V3.read_text())
unqualified = {(v, u) for v, u in calls if not u.endswith(".service")}
assert not unqualified, (
"sudo systemctl calls must use fully-qualified .service unit names: "
+ ", ".join(f"systemctl {v} {u}" for v, u in sorted(unqualified))
)

View File

@@ -190,7 +190,7 @@ def _ensure_display_service_running():
if status.get('active'):
status['started'] = False
return status
result = _run_systemctl_command(['sudo', 'systemctl', 'start', 'ledmatrix'])
result = _run_systemctl_command(['sudo', 'systemctl', 'start', 'ledmatrix.service'])
service_status = _get_display_service_status()
result['started'] = result.get('returncode') == 0
result['active'] = service_status.get('active')
@@ -199,7 +199,7 @@ def _ensure_display_service_running():
def _stop_display_service():
"""Stop the ledmatrix display service."""
result = _run_systemctl_command(['sudo', 'systemctl', 'stop', 'ledmatrix'])
result = _run_systemctl_command(['sudo', 'systemctl', 'stop', 'ledmatrix.service'])
status = _get_display_service_status()
result['active'] = status.get('active')
result['status'] = status
@@ -705,7 +705,8 @@ def save_main_config():
display_fields = ['rows', 'cols', 'chain_length', 'parallel', 'brightness', 'hardware_mapping',
'gpio_slowdown', 'rp1_rio', 'scan_mode', 'disable_hardware_pulsing', 'inverse_colors', 'show_refresh_rate',
'pwm_bits', 'pwm_dither_bits', 'pwm_lsb_nanoseconds', 'limit_refresh_rate_hz', 'use_short_date_format',
'max_dynamic_duration_seconds', 'led_rgb_sequence', 'multiplexing', 'panel_type']
'max_dynamic_duration_seconds', 'led_rgb_sequence', 'multiplexing', 'panel_type',
'row_address_type']
if any(k in data for k in display_fields):
if 'display' not in current_config:
@@ -736,14 +737,23 @@ def save_main_config():
except (ValueError, TypeError):
return jsonify({'status': 'error', 'message': f"Invalid multiplexing value '{data['multiplexing']}'. Must be an integer from 0 to 22."}), 400
# Validate row_address_type
if 'row_address_type' in data:
try:
rat_val = int(data['row_address_type'])
if rat_val < 0 or rat_val > 4:
return jsonify({'status': 'error', 'message': f"Invalid row_address_type '{data['row_address_type']}'. Must be an integer from 0 to 4."}), 400
except (ValueError, TypeError):
return jsonify({'status': 'error', 'message': f"Invalid row_address_type '{data['row_address_type']}'. Must be an integer from 0 to 4."}), 400
# Handle hardware settings
for field in ['rows', 'cols', 'chain_length', 'parallel', 'brightness', 'hardware_mapping', 'scan_mode',
'pwm_bits', 'pwm_dither_bits', 'pwm_lsb_nanoseconds', 'limit_refresh_rate_hz',
'led_rgb_sequence', 'multiplexing', 'panel_type']:
'led_rgb_sequence', 'multiplexing', 'panel_type', 'row_address_type']:
if field in data:
if field in ['rows', 'cols', 'chain_length', 'parallel', 'brightness', 'scan_mode',
'pwm_bits', 'pwm_dither_bits', 'pwm_lsb_nanoseconds', 'limit_refresh_rate_hz',
'multiplexing']:
'multiplexing', 'row_address_type']:
current_config['display']['hardware'][field] = int(data[field])
else:
current_config['display']['hardware'][field] = data[field]
@@ -1461,7 +1471,7 @@ def execute_system_action():
# For on-demand modes, we would need to integrate with the display controller
# For now, just start the display service
try:
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix.service'],
capture_output=True, text=True, timeout=10)
except subprocess.TimeoutExpired as e:
logger.error("start_display (%s) timed out: %s", mode, e)
@@ -1478,16 +1488,16 @@ def execute_system_action():
resp['stderr'] = result.stderr.strip()
return jsonify(resp)
else:
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix.service'],
capture_output=True, text=True, timeout=10)
elif action == 'stop_display':
result = subprocess.run(['sudo', 'systemctl', 'stop', 'ledmatrix'],
result = subprocess.run(['sudo', 'systemctl', 'stop', 'ledmatrix.service'],
capture_output=True, text=True, timeout=10)
elif action == 'enable_autostart':
result = subprocess.run(['sudo', 'systemctl', 'enable', 'ledmatrix'],
result = subprocess.run(['sudo', 'systemctl', 'enable', 'ledmatrix.service'],
capture_output=True, text=True, timeout=10)
elif action == 'disable_autostart':
result = subprocess.run(['sudo', 'systemctl', 'disable', 'ledmatrix'],
result = subprocess.run(['sudo', 'systemctl', 'disable', 'ledmatrix.service'],
capture_output=True, text=True, timeout=10)
elif action == 'reboot_system':
result = subprocess.run(['sudo', 'reboot'],
@@ -1568,12 +1578,72 @@ def execute_system_action():
'message': pull_message,
})
elif action == 'restart_display_service':
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'],
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix.service'],
capture_output=True, text=True, timeout=10)
elif action == 'restart_web_service':
# Try to restart the web service (assuming it's ledmatrix-web.service)
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'],
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web.service'],
capture_output=True, text=True, timeout=10)
elif action == 'install_base_requirements':
req_file = PROJECT_ROOT / 'requirements.txt'
if not req_file.exists():
return jsonify({'status': 'error', 'message': 'No requirements.txt found at project root'})
result = subprocess.run(
[sys.executable, '-m', 'pip', 'install', '--break-system-packages', '-r', str(req_file)],
capture_output=True, text=True, timeout=120, cwd=str(PROJECT_ROOT)
)
return jsonify({
'status': 'success' if result.returncode == 0 else 'error',
'message': 'Base requirements installed successfully' if result.returncode == 0 else 'pip install failed',
'output': (result.stdout + result.stderr).strip()
})
elif action == 'install_plugin_requirements':
plugins_dir = Path(plugin_manager.plugins_dir) if plugin_manager else PROJECT_ROOT / 'plugin-repos'
results = []
if plugins_dir.exists():
for p in sorted(plugins_dir.iterdir()):
req = p / 'requirements.txt'
if p.is_dir() and req.exists():
r = subprocess.run(
[sys.executable, '-m', 'pip', 'install', '--break-system-packages', '-r', str(req)],
capture_output=True, text=True, timeout=60
)
results.append({
'plugin': p.name,
'ok': r.returncode == 0,
'output': (r.stdout + r.stderr).strip()
})
ok_count = sum(1 for r in results if r['ok'])
all_ok = all(r['ok'] for r in results) if results else True
return jsonify({
'status': 'success' if all_ok else 'error',
'message': f'Processed {len(results)} plugin(s) — {ok_count} succeeded' if results else 'No plugin requirements.txt files found',
'details': results
})
elif action == 'force_git_reset':
project_dir = str(PROJECT_ROOT)
fetch = subprocess.run(
['git', 'fetch', 'origin'],
capture_output=True, text=True, timeout=30, cwd=project_dir
)
if fetch.returncode != 0:
return jsonify({'status': 'error', 'message': 'git fetch failed', 'output': fetch.stderr.strip()})
reset = subprocess.run(
['git', 'reset', '--hard', 'origin/main'],
capture_output=True, text=True, timeout=30, cwd=project_dir
)
return jsonify({
'status': 'success' if reset.returncode == 0 else 'error',
'message': 'Reset to origin/main successfully' if reset.returncode == 0 else 'git reset failed',
'output': (reset.stdout + reset.stderr).strip()
})
elif action == 'clear_pycache':
cleared = 0
for d in PROJECT_ROOT.rglob('__pycache__'):
if d.is_dir():
shutil.rmtree(d, ignore_errors=True)
cleared += 1
return jsonify({'status': 'success', 'message': f'Cleared {cleared} __pycache__ directories'})
else:
return jsonify({'status': 'error', 'message': 'Unknown action'}), 400
@@ -1596,6 +1666,27 @@ def execute_system_action():
logger.error("execute_system_action failed: %s", e, exc_info=True)
return jsonify({'status': 'error', 'message': 'Action failed; see logs for details'}), 500
@api_v3.route('/system/git-info', methods=['GET'])
def get_git_info():
"""Return branch, dirty state, recent commits and remote URL for the Tools tab."""
d = str(PROJECT_ROOT)
try:
branch = subprocess.run(['git', 'branch', '--show-current'], capture_output=True, text=True, timeout=10, cwd=d)
status = subprocess.run(['git', 'status', '--short', '--untracked-files=no'], capture_output=True, text=True, timeout=15, cwd=d)
log = subprocess.run(['git', 'log', '--oneline', '-5'], capture_output=True, text=True, timeout=10, cwd=d)
remote = subprocess.run(['git', 'remote', 'get-url', 'origin'], capture_output=True, text=True, timeout=10, cwd=d)
return jsonify({
'branch': branch.stdout.strip(),
'dirty': bool(status.stdout.strip()),
'status': status.stdout.strip(),
'recent_commits': log.stdout.strip(),
'remote_url': remote.stdout.strip(),
})
except Exception as e:
logger.error("get_git_info failed: %s", e, exc_info=True)
return jsonify({'status': 'error', 'message': 'Failed to get git info'}), 500
@api_v3.route('/hardware/status', methods=['GET'])
def get_hardware_status():
"""Return LED matrix hardware initialization status written by display_manager at startup."""
@@ -3709,13 +3800,14 @@ def _parse_form_value_with_schema(value, key_path, schema):
except ValueError:
return prop.get('default', 0.0)
# Try parsing as number (fallback)
try:
if '.' in stripped:
return float(stripped)
return int(stripped)
except ValueError:
pass
# Try parsing as number (fallback) — skip when schema explicitly says string
if not (prop and prop.get('type') == 'string'):
try:
if '.' in stripped:
return float(stripped)
return int(stripped)
except ValueError:
pass
# Return as string
return value

View File

@@ -90,6 +90,8 @@ def load_partial(partial_name):
return _load_cache_partial()
elif partial_name == 'operation-history':
return _load_operation_history_partial()
elif partial_name == 'tools':
return _load_tools_partial()
else:
return "Partial not found", 404
@@ -448,6 +450,15 @@ def _load_operation_history_partial():
return "Error loading partial", 500
def _load_tools_partial():
"""Load tools/utilities partial."""
try:
return render_template('v3/partials/tools.html')
except Exception:
logger.error("Error loading partial", exc_info=True)
return "Error loading partial", 500
def _load_plugin_config_partial(plugin_id):
"""
Load plugin configuration partial - server-side rendered form.

View File

@@ -1009,6 +1009,11 @@
class="nav-tab">
<i class="fas fa-history"></i>Operation History
</button>
<button @click="activeTab = 'tools'"
:class="activeTab === 'tools' ? 'nav-tab-active' : ''"
class="nav-tab">
<i class="fas fa-tools"></i>Tools
</button>
</nav>
</div>
@@ -1290,6 +1295,18 @@
</div>
</div>
<!-- Tools tab -->
<div x-show="activeTab === 'tools'" x-transition>
<div id="tools-content" hx-get="/v3/partials/tools" hx-trigger="loadtab" hx-swap="innerHTML">
<div class="animate-pulse">
<div class="bg-white rounded-lg shadow p-6">
<div class="h-4 bg-gray-200 rounded w-1/4 mb-4"></div>
<div class="h-32 bg-gray-200 rounded"></div>
</div>
</div>
</div>
</div>
<!-- Dynamic Plugin Tabs - HTMX Lazy Loading -->
<!--
Architecture: Server-side rendered plugin configuration forms

View File

@@ -68,7 +68,7 @@
name="chain_length"
value="{{ main_config.display.hardware.chain_length or 2 }}"
min="1"
max="8"
max="24"
class="form-control">
<p class="mt-1 text-sm text-gray-600">Number of LED panels chained together</p>
</div>
@@ -166,6 +166,18 @@
</select>
<p class="mt-1 text-sm text-gray-600">Special panel chipset initialization (use Standard unless your panel requires it)</p>
</div>
<div class="form-group">
<label for="row_address_type" class="block text-sm font-medium text-gray-700">Row Address Type</label>
<select id="row_address_type" name="row_address_type" class="form-control">
<option value="0" {% if main_config.display.hardware.get('row_address_type', 0)|int == 0 %}selected{% endif %}>0 - Default</option>
<option value="1" {% if main_config.display.hardware.get('row_address_type', 0)|int == 1 %}selected{% endif %}>1 - AB-addressed panels</option>
<option value="2" {% if main_config.display.hardware.get('row_address_type', 0)|int == 2 %}selected{% endif %}>2 - Row direct</option>
<option value="3" {% if main_config.display.hardware.get('row_address_type', 0)|int == 3 %}selected{% endif %}>3 - ABC-addressed panels</option>
<option value="4" {% if main_config.display.hardware.get('row_address_type', 0)|int == 4 %}selected{% endif %}>4 - ABC Shift + DE direct</option>
</select>
<p class="mt-1 text-sm text-gray-600">Row addressing scheme — leave at Default (0) unless your panel requires a specific type</p>
</div>
</div>
<div class="grid grid-cols-1 md:grid-cols-3 gap-4">

View File

@@ -0,0 +1,306 @@
<div class="space-y-6" id="tools-root">
<!-- Git & Updates -->
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<h2 class="text-lg font-semibold text-gray-900">Git &amp; Updates</h2>
<p class="mt-1 text-sm text-gray-600">Inspect the current git state and pull or reset to the latest remote code.</p>
</div>
<!-- Git status info -->
<div id="git-info-panel" class="mb-6 bg-gray-50 border border-gray-200 rounded-lg p-4 text-sm">
<div class="animate-pulse text-gray-400">Loading git info…</div>
</div>
<div class="space-y-4">
<!-- Pull latest -->
<div class="flex items-start justify-between gap-4">
<div>
<p class="text-sm font-medium text-gray-900">Pull latest (rebase)</p>
<p class="text-xs text-gray-500 mt-0.5">Stashes local changes, runs <code class="bg-gray-100 px-1 rounded">git pull --rebase</code>, then restores the stash.</p>
</div>
<button id="btn-git-pull" onclick="toolsAction('git_pull', 'btn-git-pull', 'result-git-pull')"
class="shrink-0 inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-download mr-2"></i>Pull Latest
</button>
</div>
<div id="result-git-pull" class="hidden"></div>
<!-- Force reset -->
<div class="flex items-start justify-between gap-4 pt-4 border-t border-gray-100">
<div>
<p class="text-sm font-medium text-gray-900">Force reset to <code class="bg-gray-100 px-1 rounded">origin/main</code></p>
<p class="text-xs text-gray-500 mt-0.5">Runs <code class="bg-gray-100 px-1 rounded">git fetch origin</code> then <code class="bg-gray-100 px-1 rounded">git reset --hard origin/main</code>. Discards all local changes.</p>
</div>
<div class="shrink-0 flex flex-col items-end gap-2">
<button id="btn-force-reset-confirm" onclick="showForceResetConfirm()"
class="inline-flex items-center px-3 py-2 border border-red-300 text-sm font-medium rounded-md text-red-700 bg-white hover:bg-red-50">
<i class="fas fa-exclamation-triangle mr-2"></i>Force Reset…
</button>
<div id="force-reset-confirm-row" class="hidden flex items-center gap-2">
<span class="text-xs text-red-700 font-medium">This discards all local changes. Sure?</span>
<button onclick="toolsAction('force_git_reset', 'btn-force-reset-confirm', 'result-force-reset'); hideForceResetConfirm()"
class="inline-flex items-center px-3 py-2 border border-transparent text-sm font-medium rounded-md text-white bg-red-600 hover:bg-red-700">
Yes, reset
</button>
<button onclick="hideForceResetConfirm()"
class="inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
Cancel
</button>
</div>
</div>
</div>
<div id="result-force-reset" class="hidden"></div>
</div>
</div>
<!-- Python Dependencies -->
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<h2 class="text-lg font-semibold text-gray-900">Python Dependencies</h2>
<p class="mt-1 text-sm text-gray-600">Re-run <code class="bg-gray-100 px-1 rounded">pip install</code> to fix missing or broken packages.</p>
</div>
<div class="space-y-4">
<!-- Base requirements -->
<div class="flex items-start justify-between gap-4">
<div>
<p class="text-sm font-medium text-gray-900">Reinstall base requirements</p>
<p class="text-xs text-gray-500 mt-0.5">Installs from <code class="bg-gray-100 px-1 rounded">requirements.txt</code> in the project root.</p>
</div>
<button id="btn-base-reqs" onclick="toolsAction('install_base_requirements', 'btn-base-reqs', 'result-base-reqs', true)"
class="shrink-0 inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-box mr-2"></i>Reinstall Base
</button>
</div>
<div id="result-base-reqs" class="hidden"></div>
<!-- Plugin requirements -->
<div class="flex items-start justify-between gap-4 pt-4 border-t border-gray-100">
<div>
<p class="text-sm font-medium text-gray-900">Reinstall plugin requirements</p>
<p class="text-xs text-gray-500 mt-0.5">Runs <code class="bg-gray-100 px-1 rounded">pip install</code> for every installed plugin that has a <code class="bg-gray-100 px-1 rounded">requirements.txt</code>.</p>
</div>
<button id="btn-plugin-reqs" onclick="toolsAction('install_plugin_requirements', 'btn-plugin-reqs', 'result-plugin-reqs', false, true)"
class="shrink-0 inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-puzzle-piece mr-2"></i>Reinstall Plugin Deps
</button>
</div>
<div id="result-plugin-reqs" class="hidden"></div>
</div>
</div>
<!-- Maintenance -->
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<h2 class="text-lg font-semibold text-gray-900">Maintenance</h2>
<p class="mt-1 text-sm text-gray-600">Housekeeping operations that don't affect config or plugins.</p>
</div>
<div class="space-y-4">
<div class="flex items-start justify-between gap-4">
<div>
<p class="text-sm font-medium text-gray-900">Clear Python cache</p>
<p class="text-xs text-gray-500 mt-0.5">Deletes all <code class="bg-gray-100 px-1 rounded">__pycache__</code> directories in the project. Useful after switching branches or debugging import issues.</p>
</div>
<button id="btn-clear-pycache" onclick="toolsAction('clear_pycache', 'btn-clear-pycache', 'result-clear-pycache')"
class="shrink-0 inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-broom mr-2"></i>Clear Cache
</button>
</div>
<div id="result-clear-pycache" class="hidden"></div>
</div>
</div>
<!-- Services -->
<div class="bg-white rounded-lg shadow p-6">
<div class="border-b border-gray-200 pb-4 mb-6">
<h2 class="text-lg font-semibold text-gray-900">Services</h2>
<p class="mt-1 text-sm text-gray-600">Quick access to service restarts.</p>
</div>
<div class="space-y-4">
<div class="flex items-start justify-between gap-4">
<div>
<p class="text-sm font-medium text-gray-900">Restart display service</p>
<p class="text-xs text-gray-500 mt-0.5">Restarts <code class="bg-gray-100 px-1 rounded">ledmatrix.service</code>.</p>
</div>
<button id="btn-restart-display" onclick="toolsAction('restart_display_service', 'btn-restart-display', 'result-restart-display')"
class="shrink-0 inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-sync-alt mr-2"></i>Restart Display
</button>
</div>
<div id="result-restart-display" class="hidden"></div>
<div class="flex items-start justify-between gap-4 pt-4 border-t border-gray-100">
<div>
<p class="text-sm font-medium text-gray-900">Restart web interface</p>
<p class="text-xs text-gray-500 mt-0.5">Restarts <code class="bg-gray-100 px-1 rounded">ledmatrix-web.service</code>. The page will go offline briefly.</p>
</div>
<button id="btn-restart-web" onclick="toolsAction('restart_web_service', 'btn-restart-web', 'result-restart-web')"
class="shrink-0 inline-flex items-center px-3 py-2 border border-gray-300 text-sm font-medium rounded-md text-gray-700 bg-white hover:bg-gray-50">
<i class="fas fa-globe mr-2"></i>Restart Web
</button>
</div>
<div id="result-restart-web" class="hidden"></div>
</div>
</div>
</div>
<script>
(function () {
// ── helpers ──────────────────────────────────────────────────────────────
function setBusy(btnId, busy) {
const btn = document.getElementById(btnId);
if (!btn) return;
btn.disabled = busy;
btn.style.opacity = busy ? '0.6' : '';
btn.style.cursor = busy ? 'wait' : '';
const icon = btn.querySelector('i');
if (icon) {
if (busy) {
icon.dataset.origClass = icon.className;
icon.className = 'fas fa-spinner fa-spin mr-2';
} else if (icon.dataset.origClass) {
icon.className = icon.dataset.origClass;
}
}
}
function showResult(resultId, ok, message, output, pluginDetails) {
const el = document.getElementById(resultId);
if (!el) return;
el.classList.remove('hidden');
const color = ok ? 'green' : 'red';
const icon = ok ? 'fa-check-circle' : 'fa-times-circle';
let html = `
<div class="mt-3 rounded-md p-3 bg-${color}-50 border border-${color}-200">
<div class="flex items-start gap-2">
<i class="fas ${icon} text-${color}-600 mt-0.5"></i>
<span class="text-sm text-${color}-800">${escHtml(message)}</span>
</div>`;
if (output) {
html += `
<details class="mt-2">
<summary class="text-xs text-${color}-700 cursor-pointer hover:underline">Show output</summary>
<pre class="mt-2 text-xs bg-gray-900 text-gray-100 rounded p-3 overflow-x-auto whitespace-pre-wrap">${escHtml(output)}</pre>
</details>`;
}
if (pluginDetails && pluginDetails.length > 0) {
html += `<ul class="mt-3 space-y-1">`;
for (const d of pluginDetails) {
const dc = d.ok ? 'green' : 'red';
const di = d.ok ? 'fa-check' : 'fa-times';
html += `<li class="text-xs flex items-start gap-1">
<i class="fas ${di} text-${dc}-600 mt-0.5 w-3"></i>
<span class="text-gray-700">${escHtml(d.plugin)}</span>`;
if (d.output) {
html += ` <details class="inline"><summary class="cursor-pointer text-gray-400 hover:underline ml-1">output</summary>
<pre class="mt-1 text-xs bg-gray-900 text-gray-100 rounded p-2 overflow-x-auto whitespace-pre-wrap">${escHtml(d.output)}</pre></details>`;
}
html += `</li>`;
}
html += `</ul>`;
}
html += `</div>`;
el.innerHTML = html;
}
function escHtml(s) {
return String(s || '').replace(/&/g,'&amp;').replace(/</g,'&lt;').replace(/>/g,'&gt;').replace(/"/g,'&quot;');
}
// ── main action dispatcher ────────────────────────────────────────────────
window.toolsAction = function(action, btnId, resultId, showOutput, showPluginDetails) {
setBusy(btnId, true);
const el = document.getElementById(resultId);
if (el) el.classList.add('hidden');
fetch('/api/v3/system/action', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({action})
})
.then(r => r.json())
.then(data => {
const ok = data.status === 'success';
showResult(
resultId, ok,
data.message || (ok ? 'Done' : 'Failed'),
showOutput ? (data.output || '') : '',
showPluginDetails ? (data.details || []) : null
);
})
.catch(err => {
showResult(resultId, false, 'Request failed: ' + err.message);
})
.finally(() => setBusy(btnId, false));
};
// ── force-reset confirm helpers ───────────────────────────────────────────
window.showForceResetConfirm = function() {
document.getElementById('force-reset-confirm-row').classList.remove('hidden');
document.getElementById('btn-force-reset-confirm').classList.add('hidden');
};
window.hideForceResetConfirm = function() {
document.getElementById('force-reset-confirm-row').classList.add('hidden');
document.getElementById('btn-force-reset-confirm').classList.remove('hidden');
};
// ── git info panel ────────────────────────────────────────────────────────
function loadGitInfo() {
const panel = document.getElementById('git-info-panel');
if (!panel) return;
fetch('/api/v3/system/git-info')
.then(r => r.json())
.then(d => {
const dirtyBadge = d.dirty
? '<span class="inline-flex items-center px-2 py-0.5 rounded text-xs font-medium bg-yellow-100 text-yellow-800">dirty</span>'
: '<span class="inline-flex items-center px-2 py-0.5 rounded text-xs font-medium bg-green-100 text-green-800">clean</span>';
let html = `<div class="space-y-2">
<div class="flex items-center gap-2">
<i class="fas fa-code-branch text-gray-400"></i>
<span class="font-mono text-gray-800">${escHtml(d.branch || 'unknown')}</span>
${dirtyBadge}
</div>`;
if (d.dirty && d.status) {
html += `<pre class="text-xs bg-yellow-50 border border-yellow-200 rounded p-2 overflow-x-auto whitespace-pre-wrap text-yellow-900">${escHtml(d.status)}</pre>`;
}
if (d.recent_commits) {
html += `<div class="mt-2">
<p class="text-xs text-gray-500 mb-1">Recent commits</p>
<pre class="text-xs bg-gray-50 border border-gray-200 rounded p-2 overflow-x-auto whitespace-pre-wrap text-gray-700">${escHtml(d.recent_commits)}</pre>
</div>`;
}
if (d.remote_url) {
html += `<p class="text-xs text-gray-400 mt-1"><i class="fas fa-cloud mr-1"></i>${escHtml(d.remote_url)}</p>`;
}
html += `</div>`;
panel.innerHTML = html;
})
.catch(() => {
panel.innerHTML = '<span class="text-sm text-red-600">Could not load git info.</span>';
});
}
// Load on first render; HTMX will have already swapped us in by this point.
loadGitInfo();
})();
</script>