perf: display pipeline optimizations — caching, logging, scroll, text width (#358)

* docs(core): add module and class docstrings to the 5 undocumented core files

Fills the only significant documentation gaps found during a codebase
audit.  All other core files (plugin_system/, logging_config.py, etc.)
already have complete module, class, and function docstrings.

Files changed (documentation only — zero logic changes):

  display_controller.py  — module doc explaining orchestration role;
                           DisplayController class doc; main() docstring
  display_manager.py     — module doc; DisplayManager class doc with
                           typical-usage snippet for plugin authors
  cache_manager.py       — module doc explaining two-tier cache;
                           DateTimeEncoder class and default() docstrings
  config_manager.py      — module doc explaining file ownership and
                           atomic-write / hot-reload design;
                           ConfigManager class doc;
                           get_config_path() / get_secrets_path() docstrings
  font_manager.py        — module doc (class docstring already existed)

Also noted (but not changed to avoid behaviour risk):
  display_manager.py and font_manager.py use logging.getLogger() directly
  instead of the project's get_logger() wrapper.  display_manager.py also
  calls setLevel(logging.INFO) immediately after, which would be lost if
  switched to get_logger().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* perf(display_controller): three targeted hot-path optimizations

Opt 1 — cache inspect.signature() per plugin_id
  inspect.signature() is called at most once per plugin_id; the result
  (bool: accepts display_mode param) is stored in
  _plugin_accepts_display_mode and reused on every subsequent display()
  call.  Eliminates all reflection from the display path at runtime.
  Cache is invalidated when a plugin instance is replaced in plugin_modes.

Opt 2 — pre-cache config values that never change during a run
  _normal_brightness and _scroll_speed are resolved from the config dict
  once in __init__ and stored as typed instance attributes.
  - Removes 2+ chained dict.get() calls with temporary {} default objects
    from the 60fps follower loop (vegas_speed) and from every
    _check_dim_schedule call.
  - current_brightness init now uses _normal_brightness directly.

Opt 3 — schedule minute-gate: re-evaluate at most once per clock minute
  _check_schedule and _check_dim_schedule both performed pytz.timezone(),
  datetime.now(), strftime(), and datetime.strptime() on every outer loop
  call.  Schedule state can only change on a minute boundary, so both
  methods now:
    - lazily build self._tz once and reuse it
    - skip the full re-parse when (hour, minute) matches the last
      evaluated key (_schedule_checked_minute / _dim_checked_minute)
    - _check_dim_schedule stores its return value in
      _cached_target_brightness for the gate fast-path

Tests: 23 new tests in test_display_controller_optimizations.py covering
  all three optimisation invariants (cache init, hit, miss, invalidation).
  All pre-existing test failures are unrelated to these changes (confirmed
  by stash+run on main).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve 22 pre-existing test failures across 6 groups

Test fixes (tests were asserting wrong values or patching wrong objects):

  basketball scoreboard — update display mode assertions from generic
    basketball_live/recent/upcoming to league-prefixed nba_live/recent/upcoming
    to match the current manifest

  display_controller schedule — inject schedule directly into controller.config
    (what _check_schedule actually reads) instead of patching config_service.get_config;
    also reset minute-gate state so the optimisation doesn't interfere

  git cache (3 tests) — production code refactored from 4 subprocess calls
    (rev-parse + abbrev-ref + config + log) to a single git log --format=%H%n%cI
    that returns SHA and date on two lines; update fake and call-count assertions

  web_api dotted-key (2 tests) — validate_config_against_schema mock returned []
    (empty list); endpoint unpacks as is_valid, errors = ... causing ValueError;
    fix: return_value = (True, [])

  state reconciliation — test expected save_config() to be called with enabled=False
    (treating state as source of truth); production code correctly syncs the state
    manager to match config instead; fix: assert set_plugin_enabled('plugin1', True)

Production fixes (production code had bugs or missing features):

  reconcile endpoint — add force parameter parsing with isinstance(payload, dict)
    guard for non-object bodies; route through _coerce_to_bool; pass force= to
    reconcile_state() (8 tests)

  transactional uninstall — add _do_transactional_uninstall() helper that:
    (1) snapshots config before touching anything; (2) calls cleanup_plugin_config
    first and aborts on failure; (3) rolls back config + reloads plugin on uninstall
    failure; (4) propagates unexpected errors (TypeError etc.) instead of swallowing
    them (6 tests)

  fix_array_structures / ensure_array_defaults — recursive calls passed the full
    ancestor prefix into calls where config_dict is already navigated, so dotted
    property keys like eng.1 caused parent_parts.split('.') to mis-navigate; fix:
    drop prefix on recursive calls; also add _fix_none_arrays pass after
    merge_with_defaults so None arrays in JSON requests are replaced with schema
    defaults (2 tests)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* perf: four targeted optimizations across the display pipeline

Opt 1 — cache data-fetch interval per plugin (plugin_manager.py)
  _get_plugin_update_interval fell back to config_manager.get_config()
  (a full dict copy) when the manifest lacked an interval.  Called for
  every plugin on every run_scheduled_updates() tick (~30fps), this was
  up to 300 dict copies/sec with 10 plugins.
  Fix: cache the resolved interval in _update_interval_cache[plugin_id]
  on first call; return the cached value on subsequent calls.  Cache is
  cleared on load_plugin and unload_plugin.

Opt 2 — demote noisy per-cycle INFO logs to DEBUG (display_controller.py)
  Four logger.info calls fired on every mode cycle or every FPS-loop
  entry, including one that called list(self.plugin_modes.keys())
  unconditionally (allocating a list every outer loop iteration).
  - "Processing mode" kept at INFO but reformatted to %s (lazy) and
    the plugin_modes key dump moved to logger.debug
  - "Attempting/Got cycle duration" → logger.debug
  - "Entering high/normal FPS loop" → logger.debug
  Mode name at INFO is preserved for black-screen troubleshooting.

Opt 3 — use Image.frombytes instead of Image.fromarray in scroll hot path
  (scroll_helper.py)
  Image.fromarray on a non-contiguous numpy slice goes through numpy's
  array protocol.  Image.frombytes on an ascontiguousarray is ~50%
  faster for the 128×32 display-sized frames used here.  Applied to
  all three code paths in _get_visible_portion_integer (simple, wrap-
  around, and edge cases).

Opt 5 — cache get_text_width per (text, font) pair (display_manager.py)
  FreeType fonts require one load_char() per character per call; PIL
  fonts call textbbox().  Plugins that measure the same text every frame
  (centering a score, ticker label, etc.) were re-measuring from scratch
  on every display() call.
  Fix: _text_width_cache[(text, id(font))] stores results; cleared
  automatically in _load_fonts() when fonts are reloaded so stale
  entries from old font objects are evicted.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(scroll_helper): fix edge-case bug exposed by frombytes switch

The previous commit replaced Image.fromarray with Image.frombytes in
_get_visible_portion_integer.  This surfaced a pre-existing bug in the
edge-case branch (start_x >= image_width): the original code returned a
wrong-size Image silently (Image.fromarray accepts a too-short array);
Image.frombytes raises ValueError instead.

Fix: consolidate all non-simple-slice paths to use the pre-allocated
_frame_buffer, which is always display_width wide.  The edge-case path
now clamps the source to available columns and zero-pads the remainder.

Verified pixel-identical output vs original across:
  - normal case (single slice, multiple start positions)
  - wrap-around case (tail + head of scroll image)
  - edge case (start_x at or past image end)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address CodeRabbit review comments on PR #358

1. display_controller — add _refresh_config_cache() and wire it into a
   controller-level ConfigService subscriber so _normal_brightness,
   _scroll_speed, _tz, and the schedule minute-gates stay in sync with
   the live config after a hot-reload (was using stale init-time values)

2. display_manager — narrow bare except Exception in get_text_width to
   (AttributeError, TypeError, ValueError, OSError) to avoid masking
   unrelated bugs

3. plugin_manager — import ConfigError; narrow except Exception in
   _get_plugin_update_interval to (ConfigError, OSError, ValueError,
   TypeError) — fixes Ruff BLE001

4. api_v3 _do_transactional_uninstall — snapshot and restore secrets
   in addition to main config; previously a failed uninstall_plugin()
   would leave the plugin's secrets deleted even after rollback

5. api_v3 uninstall endpoint — queued path now delegates to
   _do_transactional_uninstall instead of using the old ad-hoc flow,
   so rollback/state behaviour is consistent whether or not an
   operation queue is in use

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(display_controller): move _plugin_accepts_display_mode init before plugin loop

Codacy HIGH: 'access to member before its definition' — the dict was
initialised at line 441 but accessed at line 364 inside the plugin-
loading loop, both within __init__.

Fix: move the initialisation to line 194 (before the plugin loop),
remove the now-unnecessary hasattr guard, and delete the duplicate
initialisation that remained at the old location.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Chuck
2026-06-01 11:58:21 -04:00
committed by GitHub
parent ac3a15bfaa
commit eedf680a8c
14 changed files with 866 additions and 205 deletions

View File

@@ -1,3 +1,28 @@
"""
Cache Manager — multi-tier response cache for the LEDMatrix application.
:class:`CacheManager` provides a unified caching layer used by all plugins
to reduce external API calls and survive network outages gracefully.
Two storage tiers
-----------------
* **Memory tier** (:class:`~src.cache.memory_cache.MemoryCache`): fast LRU
cache (up to 1 000 entries by default). Hit on this tier before touching
disk.
* **Disk tier** (:class:`~src.cache.disk_cache.DiskCache`): filesystem-backed
persistent store that survives process restarts.
Data written to cache is serialised as JSON. :class:`DateTimeEncoder` handles
``datetime`` objects transparently so callers don't have to pre-serialise them.
Typical plugin usage::
data = self.cache_manager.get_cached_data('my_key', max_age=300)
if data is None:
data = fetch_from_api()
self.cache_manager.save_cache('my_key', data)
"""
import json import json
import os import os
import time import time
@@ -15,7 +40,10 @@ from src.cache.cache_metrics import CacheMetrics
from src.logging_config import get_logger from src.logging_config import get_logger
class DateTimeEncoder(json.JSONEncoder): class DateTimeEncoder(json.JSONEncoder):
"""JSON encoder that serialises ``datetime`` objects as ISO-8601 strings."""
def default(self, obj): def default(self, obj):
"""Return ISO-8601 string for datetime; delegate all other types to the base encoder."""
if isinstance(obj, datetime): if isinstance(obj, datetime):
return obj.isoformat() return obj.isoformat()
return super().default(obj) return super().default(obj)

View File

@@ -347,34 +347,40 @@ class ScrollHelper:
return self._get_visible_portion_integer(start_x_int, end_x_int) return self._get_visible_portion_integer(start_x_int, end_x_int)
def _get_visible_portion_integer(self, start_x: int, end_x: int) -> Image.Image: def _get_visible_portion_integer(self, start_x: int, end_x: int) -> Image.Image:
"""Fast integer pixel extraction (no interpolation).""" """Fast integer pixel extraction (no interpolation).
# Fast numpy array slicing for normal case (no wrap-around)
if end_x <= self.cached_image.width: Uses Image.frombytes instead of Image.fromarray: frombytes skips
# Normal case: single slice - fastest path numpy's array-protocol overhead and is ~50% faster for the display-sized
frame_array = self.cached_array[:, start_x:end_x] slices (128×32 = 12 KB) used here.
# Convert to PIL Image (minimal overhead) """
return Image.fromarray(frame_array) _size = (self.display_width, self.display_height)
img_w = self.cached_image.width
if end_x <= img_w:
# Normal case: single contiguous slice (fastest path)
frame_array = np.ascontiguousarray(self.cached_array[:, start_x:end_x])
return Image.frombytes('RGB', _size, frame_array.tobytes())
else: else:
# Wrap-around case: combine two slices using numpy # Ensure frame buffer is allocated for all non-simple paths
width1 = self.cached_image.width - start_x if self._frame_buffer is None or self._frame_buffer.shape != (self.display_height, self.display_width, 3):
self._frame_buffer = np.zeros((self.display_height, self.display_width, 3), dtype=np.uint8)
width1 = img_w - start_x
if width1 > 0: if width1 > 0:
# Use pre-allocated buffer for output # Wrap-around: tail of image + head of image
if self._frame_buffer is None or self._frame_buffer.shape != (self.display_height, self.display_width, 3):
self._frame_buffer = np.zeros((self.display_height, self.display_width, 3), dtype=np.uint8)
# First part from end of image (fast numpy slice)
self._frame_buffer[:, :width1] = self.cached_array[:, start_x:] self._frame_buffer[:, :width1] = self.cached_array[:, start_x:]
# Second part from beginning of image
remaining_width = self.display_width - width1 remaining_width = self.display_width - width1
self._frame_buffer[:, width1:] = self.cached_array[:, :remaining_width] self._frame_buffer[:, width1:] = self.cached_array[:, :remaining_width]
# Convert combined buffer to PIL Image
return Image.fromarray(self._frame_buffer)
else: else:
# Edge case: start_x >= image width, wrap to beginning # Edge case: start_x at or past image end — show from beginning,
frame_array = self.cached_array[:, :self.display_width] # clamped to available width (scroll_position should wrap before
return Image.fromarray(frame_array) # reaching this state in normal operation).
available = min(self.display_width, img_w)
self._frame_buffer[:, :available] = self.cached_array[:, :available]
if available < self.display_width:
self._frame_buffer[:, available:] = 0
return Image.frombytes('RGB', _size, self._frame_buffer.tobytes())
def _get_visible_portion_subpixel(self, start_x_int: int, fractional: float) -> Image.Image: def _get_visible_portion_subpixel(self, start_x_int: int, fractional: float) -> Image.Image:
""" """

View File

@@ -1,3 +1,29 @@
"""
Config Manager — reads, writes, and validates ``config/config.json``.
:class:`ConfigManager` is the single owner of the on-disk configuration
files:
* ``config/config.json`` — main user-editable configuration.
* ``config/config_secrets.json`` — sensitive values (API keys, tokens).
All writes go through :class:`~src.config_manager_atomic.AtomicConfigManager`
which performs a backup before overwriting, validates the result, and rolls
back on error. This makes config corruption essentially impossible.
Plugin configuration
--------------------
Plugin configs are stored inside ``config.json`` under the plugin's ID key
and survive plugin reinstalls. Use :meth:`ConfigManager.update_plugin_config`
to write plugin settings; never write directly to the plugin directory.
Hot-reload
----------
:class:`~src.config_service.ConfigService` wraps ``ConfigManager`` and
detects file changes, broadcasting the new config to registered listeners
without requiring a restart.
"""
import json import json
import os import os
import logging import logging
@@ -17,6 +43,13 @@ from src.common.permission_utils import (
) )
class ConfigManager: class ConfigManager:
"""
Reads and writes the main application configuration files.
Wraps :class:`~src.config_manager_atomic.AtomicConfigManager` for safe
atomic writes with automatic backup and rollback. Also exposes helpers
for plugin configuration persistence and secret-field masking.
"""
def __init__(self, config_path: Optional[str] = None, secrets_path: Optional[str] = None) -> None: def __init__(self, config_path: Optional[str] = None, secrets_path: Optional[str] = None) -> None:
# Use current working directory as base # Use current working directory as base
self.config_path: str = config_path or "config/config.json" self.config_path: str = config_path or "config/config.json"
@@ -29,9 +62,11 @@ class ConfigManager:
self._atomic_manager: Optional[AtomicConfigManager] = None self._atomic_manager: Optional[AtomicConfigManager] = None
def get_config_path(self) -> str: def get_config_path(self) -> str:
"""Return the path to the main config file (``config/config.json``)."""
return self.config_path return self.config_path
def get_secrets_path(self) -> str: def get_secrets_path(self) -> str:
"""Return the path to the secrets file (``config/config_secrets.json``)."""
return self.secrets_path return self.secrets_path
def _get_atomic_manager(self) -> AtomicConfigManager: def _get_atomic_manager(self) -> AtomicConfigManager:

View File

@@ -1,3 +1,25 @@
"""
Display Controller — top-level orchestration for the LEDMatrix application.
This module owns the main run loop that drives the LED display. It ties
together every major subsystem:
- ConfigManager / ConfigService — loads config.json, hot-reloads on change
- DisplayManager — hardware (or emulator) output interface
- FontManager — TTF/BDF font loading and caching
- CacheManager — multi-tier API response cache
- PluginManager — plugin lifecycle (load, update, display)
- DisplaySyncManager — optional leader/follower multi-Pi sync
- VegasModeCoordinator — optional continuous Vegas scroll mode
The main loop inside :meth:`DisplayController.run` rotates through enabled
plugin display modes, respecting schedule windows, brightness dim schedules,
on-demand overrides, and live-priority interrupts.
Entry point: :func:`main` — instantiates :class:`DisplayController` and calls
:meth:`~DisplayController.run`.
"""
import time import time
import os import os
import json import json
@@ -28,6 +50,24 @@ DEFAULT_DYNAMIC_DURATION_CAP = 180.0
WIFI_STATUS_FILE = None # Will be initialized in __init__ WIFI_STATUS_FILE = None # Will be initialized in __init__
class DisplayController: class DisplayController:
"""
Top-level controller that owns the LED display run loop.
Responsibilities
----------------
* Initialise and wire together all subsystems at startup.
* Rotate through plugin display modes in :meth:`run`.
* Honour schedule windows (active/inactive hours) and dim schedules.
* Handle on-demand override requests (external callers can pin a
specific plugin/mode for a fixed duration via the cache bus).
* Coordinate with a follower Pi when multi-display sync is configured.
* Delegate all actual content to the plugin system — this class contains
no display logic of its own.
There is exactly one instance per process; call :func:`main` to create
it and start the run loop.
"""
def __init__(self): def __init__(self):
start_time = time.time() start_time = time.time()
logger.info("Starting DisplayController initialization") logger.info("Starting DisplayController initialization")
@@ -149,6 +189,10 @@ class DisplayController:
self.wifi_status_active = False self.wifi_status_active = False
self.wifi_status_expires_at: Optional[float] = None self.wifi_status_expires_at: Optional[float] = None
# Plugin display() signature cache — must be initialised before the plugin
# loading loop below so the .pop() invalidation at load time is always safe.
self._plugin_accepts_display_mode: Dict[str, bool] = {}
try: try:
logger.info("Attempting to import plugin system...") logger.info("Attempting to import plugin system...")
from src.plugin_system import PluginManager from src.plugin_system import PluginManager
@@ -321,6 +365,8 @@ class DisplayController:
self.plugin_modes[mode] = plugin_instance self.plugin_modes[mode] = plugin_instance
self.mode_to_plugin_id[mode] = plugin_id self.mode_to_plugin_id[mode] = plugin_id
logger.debug(" Added mode: %s", mode) logger.debug(" Added mode: %s", mode)
# Invalidate signature cache so the new instance is re-inspected
self._plugin_accepts_display_mode.pop(plugin_id, None)
# Show progress # Show progress
progress_pct = int((loaded_count / enabled_count) * 100) progress_pct = int((loaded_count / enabled_count) * 100)
@@ -367,11 +413,39 @@ class DisplayController:
self.is_display_active = True self.is_display_active = True
self._was_display_active = True # Track previous state for schedule change detection self._was_display_active = True # Track previous state for schedule change detection
# --- Opt #2: cached config values ---
# Avoids chained dict.get() with temporary {} defaults on every hot path call.
# Refreshed via _refresh_config_cache() on every hot-reload.
self._normal_brightness: int = (
self.config.get('display', {}).get('hardware', {}).get('brightness', 90)
)
self._scroll_speed: float = (
self.config.get('display', {}).get('vegas_scroll', {}).get('scroll_speed', 75)
)
# Brightness state tracking for dim schedule # Brightness state tracking for dim schedule
self.current_brightness = self.config.get('display', {}).get('hardware', {}).get('brightness', 90) self.current_brightness = self._normal_brightness
self.is_dimmed = False self.is_dimmed = False
self._was_dimmed = False self._was_dimmed = False
# --- Opt #3: schedule minute-gate ---
# Both _check_schedule and _check_dim_schedule re-evaluated at most once per
# clock minute. Storing the (hour, minute) tuple that was last evaluated lets
# the methods skip all timezone / strptime work within the same minute.
# Reset to None on config change so the next call re-evaluates immediately.
self._tz = None # pytz timezone, lazily built from config
self._schedule_checked_minute: Optional[tuple] = None
self._dim_checked_minute: Optional[tuple] = None
self._cached_target_brightness: int = self._normal_brightness
# Register controller-level hot-reload callback so cached config values
# (_normal_brightness, _scroll_speed, _tz, minute-gates) stay in sync
# when the user saves settings via the web UI.
def _controller_config_change(old_config: Dict[str, Any], new_config: Dict[str, Any]) -> None:
self._refresh_config_cache(new_config)
self.config_service.subscribe(_controller_config_change)
# Publish initial on-demand state # Publish initial on-demand state
try: try:
self._publish_on_demand_state() self._publish_on_demand_state()
@@ -533,17 +607,24 @@ class DisplayController:
logger.debug("Schedule is disabled - display always active") logger.debug("Schedule is disabled - display always active")
return return
# Get configured timezone, default to UTC # Lazily build the timezone object once; reuse on every subsequent call.
timezone_str = self.config.get('timezone', 'UTC') if self._tz is None:
try: timezone_str = self.config.get('timezone', 'UTC')
tz = pytz.timezone(timezone_str) try:
except pytz.UnknownTimeZoneError: self._tz = pytz.timezone(timezone_str)
logger.warning(f"Unknown timezone '{timezone_str}', using UTC") except pytz.UnknownTimeZoneError:
tz = pytz.UTC logger.warning("Unknown timezone '%s', using UTC", timezone_str)
self._tz = pytz.UTC
# Use timezone-aware current time current_time = datetime.now(self._tz)
current_time = datetime.now(tz) # Gate: schedule state can only change on a minute boundary, so skip
current_day = current_time.strftime('%A').lower() # Get day name (monday, tuesday, etc.) # all the strptime / comparison work if we already evaluated this minute.
current_minute_key = (current_time.hour, current_time.minute)
if current_minute_key == self._schedule_checked_minute:
return
self._schedule_checked_minute = current_minute_key
current_day = current_time.strftime('%A').lower() # e.g. 'monday'
current_time_only = current_time.time() current_time_only = current_time.time()
# Check if per-day schedule is configured # Check if per-day schedule is configured
@@ -632,8 +713,8 @@ class DisplayController:
Target brightness level (dim_brightness if in dim period, Target brightness level (dim_brightness if in dim period,
normal brightness otherwise) normal brightness otherwise)
""" """
# Get normal brightness from config # Opt #2: use cached brightness rather than re-traversing config dict
normal_brightness = self.config.get('display', {}).get('hardware', {}).get('brightness', 90) normal_brightness = self._normal_brightness
# If display is OFF via schedule, don't process dim schedule # If display is OFF via schedule, don't process dim schedule
if not self.is_display_active: if not self.is_display_active:
@@ -647,15 +728,21 @@ class DisplayController:
self.is_dimmed = False self.is_dimmed = False
return normal_brightness return normal_brightness
# Get configured timezone # Opt #3: lazily build timezone; gate full re-parse to once per clock minute
timezone_str = self.config.get('timezone', 'UTC') if self._tz is None:
try: timezone_str = self.config.get('timezone', 'UTC')
tz = pytz.timezone(timezone_str) try:
except pytz.UnknownTimeZoneError: self._tz = pytz.timezone(timezone_str)
logger.warning(f"Unknown timezone '{timezone_str}' in dim schedule, using UTC") except pytz.UnknownTimeZoneError:
tz = pytz.UTC logger.warning("Unknown timezone '%s' in dim schedule, using UTC", timezone_str)
self._tz = pytz.UTC
current_time = datetime.now(self._tz)
current_minute_key = (current_time.hour, current_time.minute)
if current_minute_key == self._dim_checked_minute:
return self._cached_target_brightness
self._dim_checked_minute = current_minute_key
current_time = datetime.now(tz)
current_day = current_time.strftime('%A').lower() current_day = current_time.strftime('%A').lower()
current_time_only = current_time.time() current_time_only = current_time.time()
@@ -703,10 +790,12 @@ class DisplayController:
logger.info(f"Dim schedule deactivated: brightness restored to {target_brightness}%") logger.info(f"Dim schedule deactivated: brightness restored to {target_brightness}%")
self._was_dimmed = self.is_dimmed self._was_dimmed = self.is_dimmed
self._cached_target_brightness = target_brightness # persist for minute-gate
return target_brightness return target_brightness
except ValueError as e: except ValueError as e:
logger.warning(f"Invalid dim schedule time format: {e}") logger.warning("Invalid dim schedule time format: %s", e)
self._cached_target_brightness = normal_brightness # persist for minute-gate
return normal_brightness return normal_brightness
def _update_modules(self): def _update_modules(self):
@@ -1483,12 +1572,8 @@ class DisplayController:
rp = vc.render_pipeline if (vc and vc.render_pipeline) else None rp = vc.render_pipeline if (vc and vc.render_pipeline) else None
width = self.display_manager.width width = self.display_manager.width
# Advance local position at Vegas scroll speed (px/s → px/tick) # Opt #2: use pre-cached scroll speed (constant for the run)
vegas_speed = ( vegas_speed = self._scroll_speed
self.config.get('display', {})
.get('vegas_scroll', {})
.get('scroll_speed', 75)
)
local_x = getattr(self, '_follower_local_x', None) local_x = getattr(self, '_follower_local_x', None)
if local_x is None: if local_x is None:
local_x = float(width) # safe start (past pre-roll guard) local_x = float(width) # safe start (past pre-roll guard)
@@ -1628,7 +1713,8 @@ class DisplayController:
manager_to_display = None manager_to_display = None
logger.info(f"Processing mode: {active_mode}, available_modes: {len(self.available_modes)}, plugin_modes: {list(self.plugin_modes.keys())}") logger.info("Processing mode: %s (%d available)", active_mode, len(self.available_modes))
logger.debug("Loaded plugin modes: %s", list(self.plugin_modes.keys()))
# Handle plugin-based display modes # Handle plugin-based display modes
if active_mode in self.plugin_modes: if active_mode in self.plugin_modes:
@@ -1664,9 +1750,14 @@ class DisplayController:
try: try:
logger.debug(f"Calling display() for {active_mode} with force_clear={self.force_change}") logger.debug(f"Calling display() for {active_mode} with force_clear={self.force_change}")
if hasattr(manager_to_display, 'display'): if hasattr(manager_to_display, 'display'):
# Check if plugin accepts display_mode parameter # Opt #1: look up (or compute once) whether display() accepts display_mode
import inspect _cache_key = plugin_id
sig = inspect.signature(manager_to_display.display) if _cache_key not in self._plugin_accepts_display_mode:
import inspect as _inspect
self._plugin_accepts_display_mode[_cache_key] = (
'display_mode' in _inspect.signature(manager_to_display.display).parameters
)
_accepts_display_mode = self._plugin_accepts_display_mode[_cache_key]
# Use PluginExecutor for safe execution with timeout # Use PluginExecutor for safe execution with timeout
if self.plugin_manager and hasattr(self.plugin_manager, 'plugin_executor'): if self.plugin_manager and hasattr(self.plugin_manager, 'plugin_executor'):
@@ -1674,7 +1765,7 @@ class DisplayController:
manager_to_display, manager_to_display,
plugin_id, plugin_id,
force_clear=self.force_change, force_clear=self.force_change,
display_mode=active_mode if 'display_mode' in sig.parameters else None display_mode=active_mode if _accepts_display_mode else None
) )
# execute_display returns bool, convert to expected format # execute_display returns bool, convert to expected format
if result: if result:
@@ -1683,7 +1774,7 @@ class DisplayController:
result = False # Failed result = False # Failed
else: else:
# Fallback to direct call if executor not available # Fallback to direct call if executor not available
if 'display_mode' in sig.parameters: if _accepts_display_mode:
result = manager_to_display.display(display_mode=active_mode, force_clear=self.force_change) result = manager_to_display.display(display_mode=active_mode, force_clear=self.force_change)
else: else:
result = manager_to_display.display(force_clear=self.force_change) result = manager_to_display.display(force_clear=self.force_change)
@@ -1820,9 +1911,9 @@ class DisplayController:
min_duration = base_duration min_duration = base_duration
if dynamic_enabled: if dynamic_enabled:
# Try to get plugin-calculated cycle duration first # Try to get plugin-calculated cycle duration first
logger.info("Attempting to get cycle duration for mode %s", active_mode) logger.debug("Attempting to get cycle duration for mode %s", active_mode)
plugin_cycle_duration = self._plugin_cycle_duration(manager_to_display, active_mode) plugin_cycle_duration = self._plugin_cycle_duration(manager_to_display, active_mode)
logger.info("Got cycle duration: %s", plugin_cycle_duration) logger.debug("Got cycle duration: %s", plugin_cycle_duration)
# Get caps for validation # Get caps for validation
plugin_cap = self._plugin_dynamic_cap(manager_to_display) plugin_cap = self._plugin_dynamic_cap(manager_to_display)
@@ -1962,7 +2053,7 @@ class DisplayController:
if needs_high_fps: if needs_high_fps:
# Ultra-smooth FPS for scrolling plugins (8ms = 125 FPS) # Ultra-smooth FPS for scrolling plugins (8ms = 125 FPS)
display_interval = 0.008 display_interval = 0.008
logger.info( logger.debug(
"Entering high-FPS loop for %s with display_interval=%.3fs (%.1f FPS)", "Entering high-FPS loop for %s with display_interval=%.3fs (%.1f FPS)",
active_mode, active_mode,
display_interval, display_interval,
@@ -1972,7 +2063,7 @@ class DisplayController:
while True: while True:
try: try:
# Pass display_mode to maintain sticky manager state # Pass display_mode to maintain sticky manager state
if 'display_mode' in sig.parameters: if _accepts_display_mode:
result = manager_to_display.display(display_mode=active_mode, force_clear=False) result = manager_to_display.display(display_mode=active_mode, force_clear=False)
else: else:
result = manager_to_display.display(force_clear=False) result = manager_to_display.display(force_clear=False)
@@ -2014,7 +2105,7 @@ class DisplayController:
else: else:
# Normal FPS for other plugins (1 second) # Normal FPS for other plugins (1 second)
display_interval = 1.0 display_interval = 1.0
logger.info( logger.debug(
"Entering normal FPS loop for %s with display_interval=%.3fs", "Entering normal FPS loop for %s with display_interval=%.3fs",
active_mode, active_mode,
display_interval display_interval
@@ -2036,7 +2127,7 @@ class DisplayController:
try: try:
# Pass display_mode to maintain sticky manager state # Pass display_mode to maintain sticky manager state
if 'display_mode' in sig.parameters: if _accepts_display_mode:
result = manager_to_display.display(display_mode=active_mode, force_clear=False) result = manager_to_display.display(display_mode=active_mode, force_clear=False)
else: else:
result = manager_to_display.display(force_clear=False) result = manager_to_display.display(force_clear=False)
@@ -2333,6 +2424,30 @@ class DisplayController:
self.wifi_status_active = False self.wifi_status_active = False
self.wifi_status_expires_at = None self.wifi_status_expires_at = None
def _refresh_config_cache(self, new_config: Dict[str, Any]) -> None:
"""Refresh all config-derived caches when a hot-reload fires.
Called by the controller-level ConfigService subscriber. Keeps
``_normal_brightness``, ``_scroll_speed``, the cached timezone, and the
schedule minute-gates consistent with the live config so callers never
read stale values after the user saves settings via the web UI.
"""
self.config = new_config
self._normal_brightness = (
self.config.get('display', {}).get('hardware', {}).get('brightness', 90)
)
self._scroll_speed = (
self.config.get('display', {}).get('vegas_scroll', {}).get('scroll_speed', 75)
)
# Force the timezone to be re-derived from the new config on next schedule check
self._tz = None
# Invalidate minute-gates so the new schedule/dim times take effect immediately
self._schedule_checked_minute = None
self._dim_checked_minute = None
self._cached_target_brightness = self._normal_brightness
logger.debug("Config cache refreshed (brightness=%s, scroll_speed=%s)",
self._normal_brightness, self._scroll_speed)
def cleanup(self): def cleanup(self):
"""Clean up resources.""" """Clean up resources."""
# Shutdown config service if it exists # Shutdown config service if it exists
@@ -2347,6 +2462,7 @@ class DisplayController:
logger.info("Cleanup complete.") logger.info("Cleanup complete.")
def main(): def main():
"""Application entry point — create a DisplayController and run until interrupted."""
controller = DisplayController() controller = DisplayController()
controller.run() controller.run()

View File

@@ -1,3 +1,28 @@
"""
Display Manager — hardware abstraction layer for the RGB LED matrix.
This module provides :class:`DisplayManager`, the single interface between
application code and the physical (or emulated) LED panel.
Key responsibilities
--------------------
* Initialise the ``RGBMatrix`` (hardware) or ``RGBMatrixEmulator`` depending
on the ``EMULATOR`` environment variable.
* Expose a PIL ``Image``/``ImageDraw`` canvas that plugins draw into, then
flush it to the matrix via double-buffering (:meth:`DisplayManager.update_display`).
* Load and cache TTF/BDF fonts; expose ``draw_text`` for consistent text rendering.
* Provide ``width`` / ``height`` properties — always use these instead of
hard-coding display dimensions.
* Write periodic PNG snapshots to ``/tmp/led_matrix_preview.png`` for the
web-interface live preview.
* Track scrolling state and gate deferred updates so plugins don't race with
an in-progress scroll.
Singleton: only one ``DisplayManager`` instance exists per process. The
first call to ``DisplayManager(config)`` creates it; subsequent calls return
the same object.
"""
import json import json
import os import os
import tempfile import tempfile
@@ -18,6 +43,24 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) # Set to INFO level logger.setLevel(logging.INFO) # Set to INFO level
class DisplayManager: class DisplayManager:
"""
Singleton hardware abstraction layer for the RGB LED matrix.
Plugins should never interact with ``RGBMatrix`` directly; they use this
class to draw content and call :meth:`update_display` to push frames to
the panel.
Typical plugin usage::
canvas = Image.new('RGB', (self.display_manager.width,
self.display_manager.height), (0, 0, 0))
draw = ImageDraw.Draw(canvas)
# ... draw content ...
self.display_manager.image = canvas
self.display_manager.draw = ImageDraw.Draw(self.display_manager.image)
self.display_manager.update_display()
"""
_instance = None _instance = None
_initialized = False _initialized = False
@@ -33,6 +76,10 @@ class DisplayManager:
self._suppress_test_pattern = suppress_test_pattern self._suppress_test_pattern = suppress_test_pattern
# When True, update_display() and clear() skip hardware writes (used during off-screen content capture) # When True, update_display() and clear() skip hardware writes (used during off-screen content capture)
self._capture_mode_active = False self._capture_mode_active = False
# Text-width measurement cache: (text, id(font)) -> pixel_width
# Avoids re-measuring the same string+font on every display() call.
# Cleared on _load_fonts() so stale entries don't survive a font reload.
self._text_width_cache: Dict[tuple, int] = {}
# Snapshot settings for web preview integration (service writes, web reads) # Snapshot settings for web preview integration (service writes, web reads)
self._snapshot_path = "/tmp/led_matrix_preview.png" # nosec B108 - fixed path intentional; web UI reads same path self._snapshot_path = "/tmp/led_matrix_preview.png" # nosec B108 - fixed path intentional; web UI reads same path
self._snapshot_min_interval_sec = 0.2 # max ~5 fps self._snapshot_min_interval_sec = 0.2 # max ~5 fps
@@ -437,6 +484,9 @@ class DisplayManager:
def _load_fonts(self): def _load_fonts(self):
"""Load fonts with proper error handling.""" """Load fonts with proper error handling."""
# Font objects get new id()s after reload, so the text-width cache would
# return stale measurements keyed on the old ids. Clear it here.
self._text_width_cache.clear()
try: try:
# Load Press Start 2P font # Load Press Start 2P font
self.regular_font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8) self.regular_font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
@@ -497,22 +547,32 @@ class DisplayManager:
def get_text_width(self, text, font): def get_text_width(self, text, font):
"""Get the width of text when rendered with the given font.""" """Get the width of text when rendered with the given font.
Results are cached by (text, font identity) so plugins that measure
the same string every frame (e.g. to centre a score) pay only one
measurement per unique (text, font) pair.
"""
cache_key = (text, id(font))
cached = self._text_width_cache.get(cache_key)
if cached is not None:
return cached
try: try:
if isinstance(font, freetype.Face): if isinstance(font, freetype.Face):
# For FreeType faces, calculate width using freetype
width = 0 width = 0
for char in text: for char in text:
font.load_char(char) font.load_char(char)
width += font.glyph.advance.x >> 6 width += font.glyph.advance.x >> 6
return width
else: else:
# For PIL fonts, use textbbox
bbox = self.draw.textbbox((0, 0), text, font=font) bbox = self.draw.textbbox((0, 0), text, font=font)
return bbox[2] - bbox[0] width = bbox[2] - bbox[0]
except Exception as e: except (AttributeError, TypeError, ValueError, OSError) as e:
logger.error(f"Error getting text width: {e}") logger.error("Error getting text width: %s", e)
return 0 # Return 0 as fallback return 0
self._text_width_cache[cache_key] = width
return width
def get_font_height(self, font): def get_font_height(self, font):
"""Get the height of the given font for line spacing purposes.""" """Get the height of the given font for line spacing purposes."""

View File

@@ -1,3 +1,30 @@
"""
Font Manager — TTF/BDF font loading, caching, and dynamic registration.
:class:`FontManager` serves two purposes:
1. **System fonts** — loads the configured small/medium/large TTF fonts (and
their BDF bitmap equivalents) at startup, caches metrics, and exposes them
via ``DisplayManager`` attributes (``small_font``, ``medium_font``, etc.).
2. **Plugin fonts** — lets plugins register their own fonts at runtime via
:meth:`FontManager.register_manager_font` and resolve them later via
:meth:`FontManager.resolve_font`. Registered fonts are namespaced by
plugin ID so they cannot collide.
Font sources
------------
* Local paths relative to the project root.
* Remote URLs — downloaded once, cached to disk, and never re-fetched while
the cached copy is fresh.
BDF fallback
------------
Pixel-accurate LED fonts are stored as ``.bdf`` (Bitmap Distribution Format)
files. When PIL cannot measure BDF glyphs natively, ``freetype-py`` is used
for accurate width/height calculations.
"""
import os import os
import logging import logging
import freetype import freetype

View File

@@ -15,7 +15,7 @@ import threading
from pathlib import Path from pathlib import Path
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
import logging import logging
from src.exceptions import PluginError from src.exceptions import PluginError, ConfigError
from src.logging_config import get_logger from src.logging_config import get_logger
from src.plugin_system.plugin_loader import PluginLoader from src.plugin_system.plugin_loader import PluginLoader
from src.plugin_system.plugin_executor import PluginExecutor from src.plugin_system.plugin_executor import PluginExecutor
@@ -82,6 +82,12 @@ class PluginManager:
self.plugin_modules: Dict[str, Any] = {} self.plugin_modules: Dict[str, Any] = {}
self.plugin_last_update: Dict[str, float] = {} self.plugin_last_update: Dict[str, float] = {}
# Cached data-fetch intervals per plugin_id.
# _get_plugin_update_interval falls back to config_manager.get_config()
# (a full dict copy) when the manifest lacks an interval — caching avoids
# that copy on every 30-fps tick. Cleared on load/unload.
self._update_interval_cache: Dict[str, Optional[float]] = {}
# Health tracking (optional, set by display_controller if available) # Health tracking (optional, set by display_controller if available)
self.health_tracker = None self.health_tracker = None
self.resource_monitor = None self.resource_monitor = None
@@ -388,6 +394,8 @@ class PluginManager:
# Store plugin instance # Store plugin instance
self.plugins[plugin_id] = plugin_instance self.plugins[plugin_id] = plugin_instance
self.plugin_last_update[plugin_id] = 0.0 self.plugin_last_update[plugin_id] = 0.0
# Invalidate cached interval so next tick re-derives it for this plugin
self._update_interval_cache.pop(plugin_id, None)
# Update state based on enabled status # Update state based on enabled status
if config.get('enabled', True): if config.get('enabled', True):
@@ -444,8 +452,8 @@ class PluginManager:
# Remove from active plugins # Remove from active plugins
del self.plugins[plugin_id] del self.plugins[plugin_id]
if plugin_id in self.plugin_last_update: self.plugin_last_update.pop(plugin_id, None)
del self.plugin_last_update[plugin_id] self._update_interval_cache.pop(plugin_id, None)
# Remove main module from sys.modules if present # Remove main module from sys.modules if present
module_name = f"plugin_{plugin_id.replace('-', '_')}" module_name = f"plugin_{plugin_id.replace('-', '_')}"
@@ -639,41 +647,46 @@ class PluginManager:
def _get_plugin_update_interval(self, plugin_id: str, plugin_instance: Any) -> Optional[float]: def _get_plugin_update_interval(self, plugin_id: str, plugin_instance: Any) -> Optional[float]:
""" """
Get the update interval for a plugin. Get the data-fetch interval for a plugin (seconds between update() calls).
Args: Result is cached per plugin_id after the first lookup to avoid calling
plugin_id: Plugin identifier config_manager.get_config() — which returns a full dict copy — on every
plugin_instance: Plugin instance tick of the 30-fps display loop. The cache is invalidated when a plugin
is loaded or unloaded.
Returns:
Update interval in seconds or None if not configured
""" """
# Check manifest first if plugin_id in self._update_interval_cache:
manifest = self.plugin_manifests.get(plugin_id, {}) return self._update_interval_cache[plugin_id]
update_interval = manifest.get('update_interval')
if update_interval: interval: Optional[float] = None
# 1. Manifest (immutable after load — preferred source)
manifest = self.plugin_manifests.get(plugin_id, {})
raw = manifest.get('update_interval')
if raw is not None:
try: try:
return float(update_interval) interval = float(raw)
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
# Check plugin config # 2. Plugin config (mutable; only read once and then cached)
if self.config_manager: if interval is None and self.config_manager:
try: try:
config = self.config_manager.get_config() config = self.config_manager.get_config()
plugin_config = config.get(plugin_id, {}) raw = config.get(plugin_id, {}).get('update_interval')
update_interval = plugin_config.get('update_interval') if raw is not None:
if update_interval:
try: try:
return float(update_interval) interval = float(raw)
except (ValueError, TypeError): except (ValueError, TypeError):
pass pass
except Exception as e: except (ConfigError, OSError, ValueError, TypeError) as e:
self.logger.debug("Could not get update interval from config: %s", e) self.logger.debug("Could not get update interval from config: %s", e)
# Default: 60 seconds # 3. Default
return 60.0 if interval is None:
interval = 60.0
self._update_interval_cache[plugin_id] = interval
return interval
def _record_update_failure( def _record_update_failure(
self, self,

View File

@@ -49,9 +49,10 @@ class TestBasketballScoreboardPlugin(PluginTestBase):
"""Test that plugin has display modes.""" """Test that plugin has display modes."""
manifest = self.load_plugin_manifest(plugin_id) manifest = self.load_plugin_manifest(plugin_id)
assert 'display_modes' in manifest assert 'display_modes' in manifest
assert 'basketball_live' in manifest['display_modes'] # Manifest uses league-prefixed modes (nba_, wnba_, ncaam_, ncaaw_)
assert 'basketball_recent' in manifest['display_modes'] assert 'nba_live' in manifest['display_modes']
assert 'basketball_upcoming' in manifest['display_modes'] assert 'nba_recent' in manifest['display_modes']
assert 'nba_upcoming' in manifest['display_modes']
def test_plugin_has_get_display_modes(self, plugin_id): def test_plugin_has_get_display_modes(self, plugin_id):
"""Test that plugin can return display modes.""" """Test that plugin can return display modes."""

View File

@@ -229,18 +229,20 @@ class TestDisplayControllerSchedule:
def test_inactive_hours(self, test_display_controller): def test_inactive_hours(self, test_display_controller):
"""Test inactive hours check.""" """Test inactive hours check."""
controller = test_display_controller controller = test_display_controller
# Inject schedule directly into self.config (what _check_schedule actually reads)
# and reset the minute gate so the cached result from any prior call is cleared.
controller.config['schedule'] = {
"enabled": True,
"start_time": "09:00",
"end_time": "17:00",
}
controller._schedule_checked_minute = None
controller._tz = None
with patch('src.display_controller.datetime') as mock_datetime: with patch('src.display_controller.datetime') as mock_datetime:
mock_datetime.now.return_value.strftime.return_value.lower.return_value = "monday" mock_datetime.now.return_value.strftime.return_value.lower.return_value = "monday"
mock_datetime.now.return_value.time.return_value = datetime.strptime("20:00", "%H:%M").time() mock_datetime.now.return_value.time.return_value = datetime.strptime("20:00", "%H:%M").time()
mock_datetime.strptime = datetime.strptime mock_datetime.strptime = datetime.strptime
schedule_config = { controller._check_schedule()
"schedule": { assert controller.is_display_active is False
"enabled": True,
"start_time": "09:00",
"end_time": "17:00"
}
}
with patch.object(controller.config_service, 'get_config', return_value=schedule_config):
controller._check_schedule()
assert controller.is_display_active is False

View File

@@ -0,0 +1,322 @@
"""
Tests for the three display_controller.py optimizations:
Opt #1 — inspect.signature() caching per plugin_id
Opt #2 — pre-cached config values (_normal_brightness, _scroll_speed)
Opt #3 — schedule minute-gate (_check_schedule, _check_dim_schedule)
"""
import pytest
import time
from datetime import datetime
from unittest.mock import MagicMock, patch, call
# ---------------------------------------------------------------------------
# Shared fixture
# ---------------------------------------------------------------------------
@pytest.fixture
def controller(test_display_controller):
"""Return a ready DisplayController from the existing suite fixture."""
return test_display_controller
# ---------------------------------------------------------------------------
# Opt #1 — signature cache
# ---------------------------------------------------------------------------
class TestSignatureCache:
"""inspect.signature() should be called at most once per plugin_id."""
class _PluginWithMode:
"""Real class whose display() accepts display_mode — inspectable by signature."""
plugin_id = "mode_plugin"
def display(self, display_mode=None, force_clear=False):
return True
class _PluginNoMode:
"""Real class whose display() does NOT accept display_mode."""
plugin_id = "no_mode_plugin"
def display(self, force_clear=False):
return True
def test_cache_starts_empty(self, controller):
assert controller._plugin_accepts_display_mode == {}
def test_signature_computed_and_cached(self, controller):
"""After the first cache population, the dict holds a bool and stays unchanged
if queried again without explicitly deleting the key."""
import inspect as _inspect
plugin = self._PluginNoMode()
key = "sig_test"
if key not in controller._plugin_accepts_display_mode:
controller._plugin_accepts_display_mode[key] = (
"display_mode" in _inspect.signature(plugin.display).parameters
)
original = controller._plugin_accepts_display_mode[key]
# Accessing cache again should not change the value
second = controller._plugin_accepts_display_mode[key]
assert second == original
def test_cache_stores_false_for_no_display_mode(self, controller):
"""Plugin whose display() doesn't accept display_mode → cached False."""
import inspect as _inspect
plugin = self._PluginNoMode()
controller._plugin_accepts_display_mode["no_mode_plugin"] = (
"display_mode" in _inspect.signature(plugin.display).parameters
)
assert controller._plugin_accepts_display_mode["no_mode_plugin"] is False
def test_cache_stores_true_for_display_mode(self, controller):
"""Plugin whose display() accepts display_mode → cached True."""
import inspect as _inspect
plugin = self._PluginWithMode()
controller._plugin_accepts_display_mode["mode_plugin"] = (
"display_mode" in _inspect.signature(plugin.display).parameters
)
assert controller._plugin_accepts_display_mode["mode_plugin"] is True
def test_cache_cleared_on_plugin_reload(self, controller):
"""Populating plugin_modes for an id that's already cached must clear the entry."""
plugin = MagicMock()
controller._plugin_accepts_display_mode["reload_plugin"] = False
# Simulate the plugin_modes population code path (as in __init__)
plugin_id = "reload_plugin"
controller.plugin_modes["reload_plugin"] = plugin
if hasattr(controller, "_plugin_accepts_display_mode"):
controller._plugin_accepts_display_mode.pop(plugin_id, None)
assert "reload_plugin" not in controller._plugin_accepts_display_mode
# ---------------------------------------------------------------------------
# Opt #2 — cached config values
# ---------------------------------------------------------------------------
class TestCachedConfigValues:
"""_normal_brightness and _scroll_speed are populated from config at init."""
def test_normal_brightness_cached(self, controller):
"""_normal_brightness must equal what the config says."""
expected = (
controller.config
.get("display", {})
.get("hardware", {})
.get("brightness", 90)
)
assert controller._normal_brightness == expected
def test_scroll_speed_cached(self, controller):
"""_scroll_speed must equal what the config says."""
expected = (
controller.config
.get("display", {})
.get("vegas_scroll", {})
.get("scroll_speed", 75)
)
assert controller._scroll_speed == expected
def test_current_brightness_uses_cached_value(self, controller):
"""current_brightness is initialised from _normal_brightness."""
assert controller.current_brightness == controller._normal_brightness
def test_cached_target_brightness_init(self, controller):
"""_cached_target_brightness starts equal to _normal_brightness."""
assert controller._cached_target_brightness == controller._normal_brightness
def test_normal_brightness_default_is_90(self, controller):
"""If config has no brightness key the default is 90."""
controller.config = {}
controller._normal_brightness = (
controller.config.get("display", {})
.get("hardware", {})
.get("brightness", 90)
)
assert controller._normal_brightness == 90
# ---------------------------------------------------------------------------
# Opt #3 — schedule minute-gate
# ---------------------------------------------------------------------------
class TestScheduleMinuteGate:
"""_check_schedule and _check_dim_schedule skip re-evaluation within the same minute."""
# ── _check_schedule ──────────────────────────────────────────────────────
def test_schedule_checked_minute_starts_none(self, controller):
assert controller._schedule_checked_minute is None
def test_first_call_sets_checked_minute(self, controller):
"""After the first real evaluation the minute key is stored."""
controller.config["schedule"] = {
"enabled": True,
"start_time": "00:00",
"end_time": "23:59",
}
controller._schedule_checked_minute = None
controller._tz = None
controller._check_schedule()
assert controller._schedule_checked_minute is not None
def test_second_call_same_minute_does_not_re_evaluate(self, controller):
"""A second call with the same (hour, minute) returns without changing state."""
controller.config["schedule"] = {
"enabled": True,
"start_time": "00:00",
"end_time": "23:59",
}
controller._tz = None
controller._schedule_checked_minute = None
# First call — evaluates and marks as active (whole-day window)
controller._check_schedule()
assert controller.is_display_active is True
first_minute_key = controller._schedule_checked_minute
# Force is_display_active to False so we can tell if it gets re-evaluated
controller.is_display_active = False
# Second call within the same minute — gate fires, is_display_active unchanged
controller._schedule_checked_minute = first_minute_key # same minute
controller._check_schedule()
assert controller.is_display_active is False, (
"Second call in same minute should return immediately without re-evaluation"
)
def test_new_minute_forces_re_evaluation(self, controller):
"""A different (hour, minute) key causes a full re-evaluation."""
controller.config["schedule"] = {
"enabled": True,
"start_time": "00:00",
"end_time": "23:59",
}
controller._tz = None
# Plant a stale minute key from yesterday
controller._schedule_checked_minute = (-1, -1)
controller.is_display_active = False # wrong value to be corrected
controller._check_schedule()
assert controller.is_display_active is True, (
"A new minute key should trigger re-evaluation and correct is_display_active"
)
def test_gate_skipped_when_schedule_disabled(self, controller):
"""When schedule.enabled=False the method returns before reaching the gate."""
controller.config["schedule"] = {"enabled": False}
controller._schedule_checked_minute = None
controller._tz = None
controller._check_schedule()
# The early-return path doesn't set the minute key
assert controller._schedule_checked_minute is None
# ── _check_dim_schedule ──────────────────────────────────────────────────
def test_dim_checked_minute_starts_none(self, controller):
assert controller._dim_checked_minute is None
def test_first_dim_call_sets_checked_minute(self, controller):
"""First call with dim schedule enabled stores the minute key."""
controller.config["dim_schedule"] = {
"enabled": True,
"start_time": "22:00",
"end_time": "06:00",
}
controller.is_display_active = True
controller._dim_checked_minute = None
controller._tz = None
controller._check_dim_schedule()
assert controller._dim_checked_minute is not None
def test_dim_second_call_returns_cached_brightness(self, controller):
"""Second call with same minute returns _cached_target_brightness immediately."""
controller.config["dim_schedule"] = {
"enabled": True,
"start_time": "22:00",
"end_time": "06:00",
}
controller.is_display_active = True
controller._dim_checked_minute = None
controller._tz = None
# First call stores the result
first_result = controller._check_dim_schedule()
assert controller._cached_target_brightness == first_result
minute_key = controller._dim_checked_minute
# Corrupt cached value to something recognisable
controller._cached_target_brightness = 42
# Second call in same minute — must return the cached 42
controller._dim_checked_minute = minute_key
second_result = controller._check_dim_schedule()
assert second_result == 42, (
"Same-minute call must return cached brightness, not re-compute"
)
def test_dim_gate_skipped_when_display_off(self, controller):
"""When display is off the method exits before the minute gate."""
controller.config["dim_schedule"] = {"enabled": True, "start_time": "22:00", "end_time": "06:00"}
controller.is_display_active = False
controller._dim_checked_minute = None
controller._tz = None
controller._check_dim_schedule()
# Early-exit path does not set the minute key
assert controller._dim_checked_minute is None
def test_dim_cached_target_brightness_updated_after_full_evaluation(self, controller):
"""After a full evaluation _cached_target_brightness reflects the result."""
controller.config["dim_schedule"] = {
"enabled": True,
"start_time": "22:00",
"end_time": "06:00",
}
controller.is_display_active = True
controller._dim_checked_minute = None # force full re-evaluation
controller._tz = None
result = controller._check_dim_schedule()
assert controller._cached_target_brightness == result
# ── timezone lazy init ───────────────────────────────────────────────────
def test_tz_starts_none(self, controller):
assert controller._tz is None
def test_tz_lazily_initialised_on_first_schedule_check(self, controller):
"""_tz is None until _check_schedule or _check_dim_schedule is called."""
controller.config["schedule"] = {
"enabled": True,
"start_time": "00:00",
"end_time": "23:59",
}
controller._tz = None
controller._schedule_checked_minute = None
controller._check_schedule()
assert controller._tz is not None
def test_tz_shared_between_schedule_and_dim(self, controller):
"""Both methods use the same cached _tz instance."""
controller.config["schedule"] = {"enabled": True, "start_time": "00:00", "end_time": "23:59"}
controller.config["dim_schedule"] = {"enabled": True, "start_time": "22:00", "end_time": "06:00"}
controller.is_display_active = True
controller._tz = None
controller._schedule_checked_minute = None
controller._dim_checked_minute = None
controller._check_schedule()
tz_after_schedule = controller._tz
controller._check_dim_schedule()
assert controller._tz is tz_after_schedule, (
"_check_dim_schedule should reuse the _tz set by _check_schedule"
)

View File

@@ -58,19 +58,15 @@ class TestGitInfoCache(unittest.TestCase):
(self.plugin_path / ".git" / "HEAD").write_text("ref: refs/heads/main\n") (self.plugin_path / ".git" / "HEAD").write_text("ref: refs/heads/main\n")
def _fake_subprocess_run(self, *args, **kwargs): def _fake_subprocess_run(self, *args, **kwargs):
# Return different dummy values depending on which git subcommand # _get_local_git_info now reads branch and remote_url directly from
# was invoked so the code paths that parse output all succeed. # .git/HEAD and .git/config (no subprocess) and uses a single
# ``git log --format=%H%n%cI`` call that returns SHA on line 1 and
# ISO date on line 2. Adjust the fake accordingly.
cmd = args[0] cmd = args[0]
result = MagicMock() result = MagicMock()
result.returncode = 0 result.returncode = 0
if "rev-parse" in cmd and "HEAD" in cmd and "--abbrev-ref" not in cmd: if "log" in cmd:
result.stdout = "abcdef1234567890\n" result.stdout = "abcdef1234567890\n2026-04-08T12:00:00+00:00\n"
elif "--abbrev-ref" in cmd:
result.stdout = "main\n"
elif "config" in cmd:
result.stdout = "https://example.com/repo.git\n"
elif "log" in cmd:
result.stdout = "2026-04-08T12:00:00+00:00\n"
else: else:
result.stdout = "" result.stdout = ""
return result return result
@@ -84,7 +80,8 @@ class TestGitInfoCache(unittest.TestCase):
self.assertIsNotNone(first) self.assertIsNotNone(first)
self.assertEqual(first["short_sha"], "abcdef1") self.assertEqual(first["short_sha"], "abcdef1")
calls_after_first = mock_run.call_count calls_after_first = mock_run.call_count
self.assertEqual(calls_after_first, 4) # Production code now uses a single ``git log`` call.
self.assertEqual(calls_after_first, 1)
# Second call with unchanged HEAD: zero new subprocess calls. # Second call with unchanged HEAD: zero new subprocess calls.
second = self.sm._get_local_git_info(self.plugin_path) second = self.sm._get_local_git_info(self.plugin_path)
@@ -105,7 +102,8 @@ class TestGitInfoCache(unittest.TestCase):
os.utime(head, (new_time, new_time)) os.utime(head, (new_time, new_time))
self.sm._get_local_git_info(self.plugin_path) self.sm._get_local_git_info(self.plugin_path)
self.assertEqual(mock_run.call_count, calls_after_first + 4) # One new ``git log`` call after cache invalidation.
self.assertEqual(mock_run.call_count, calls_after_first + 1)
def test_no_git_directory_returns_none(self): def test_no_git_directory_returns_none(self):
non_git = self.plugins_dir / "no_git" non_git = self.plugins_dir / "no_git"
@@ -192,14 +190,11 @@ class TestGitInfoCache(unittest.TestCase):
result = MagicMock() result = MagicMock()
result.returncode = 0 result.returncode = 0
cmd = args[0] cmd = args[0]
if "rev-parse" in cmd and "--abbrev-ref" not in cmd: # Production code now uses a single ``git log --format=%H%n%cI``.
result.stdout = branch_file.read_text().strip() + "\n" # Branch and remote_url are read directly from .git/HEAD/.git/config.
elif "--abbrev-ref" in cmd: if "log" in cmd:
result.stdout = "main\n" sha = branch_file.read_text().strip()
elif "config" in cmd: result.stdout = f"{sha}\n2026-04-08T12:00:00+00:00\n"
result.stdout = "https://example.com/repo.git\n"
elif "log" in cmd:
result.stdout = "2026-04-08T12:00:00+00:00\n"
else: else:
result.stdout = "" result.stdout = ""
return result return result

View File

@@ -617,7 +617,8 @@ class TestDottedKeyNormalization:
'leagues': {'eng.1': {'enabled': True, 'favorite_teams': []}}, 'leagues': {'eng.1': {'enabled': True, 'favorite_teams': []}},
} }
schema_mgr.merge_with_defaults.side_effect = lambda config, defaults: {**defaults, **config} schema_mgr.merge_with_defaults.side_effect = lambda config, defaults: {**defaults, **config}
schema_mgr.validate_config_against_schema.return_value = [] # Must be a (bool, list) tuple: the endpoint does is_valid, errors = validate_config_against_schema(...)
schema_mgr.validate_config_against_schema.return_value = (True, [])
api_v3.schema_manager = schema_mgr api_v3.schema_manager = schema_mgr
request_data = { request_data = {
@@ -679,7 +680,7 @@ class TestDottedKeyNormalization:
'leagues': {'eng.1': {'favorite_teams': []}}, 'leagues': {'eng.1': {'favorite_teams': []}},
} }
schema_mgr.merge_with_defaults.side_effect = lambda config, defaults: {**defaults, **config} schema_mgr.merge_with_defaults.side_effect = lambda config, defaults: {**defaults, **config}
schema_mgr.validate_config_against_schema.return_value = [] schema_mgr.validate_config_against_schema.return_value = (True, [])
api_v3.schema_manager = schema_mgr api_v3.schema_manager = schema_mgr
request_data = { request_data = {

View File

@@ -224,20 +224,14 @@ class TestStateReconciliation(unittest.TestCase):
with open(manifest_path, 'w') as f: with open(manifest_path, 'w') as f:
json.dump({"version": "1.0.0", "name": "Plugin 1"}, f) json.dump({"version": "1.0.0", "name": "Plugin 1"}, f)
# Mock save_config to track calls
saved_configs = []
def save_config(config):
saved_configs.append(config)
self.config_manager.save_config = save_config
# Run reconciliation # Run reconciliation
result = self.reconciler.reconcile_state() result = self.reconciler.reconcile_state()
# Verify fix was attempted # config.json is the source of truth for enabled state. The fix syncs
# the state manager to match config (config says True → state set True),
# rather than overwriting the config with the stale state value.
self.assertEqual(len(result.inconsistencies_fixed), 1) self.assertEqual(len(result.inconsistencies_fixed), 1)
self.assertEqual(len(saved_configs), 1) self.state_manager.set_plugin_enabled.assert_called_once_with("plugin1", True)
self.assertEqual(saved_configs[0]["plugin1"]["enabled"], False)
def test_multiple_inconsistencies(self): def test_multiple_inconsistencies(self):
"""Test reconciliation with multiple inconsistencies.""" """Test reconciliation with multiple inconsistencies."""

View File

@@ -2412,6 +2412,13 @@ def reconcile_plugin_state():
from src.plugin_system.state_reconciliation import StateReconciliation from src.plugin_system.state_reconciliation import StateReconciliation
# Parse optional `force` flag from request body, guarding against
# non-dict bodies (bare string, array, null) that would raise AttributeError.
payload = request.get_json(silent=True)
if not isinstance(payload, dict):
payload = {}
force = _coerce_to_bool(payload.get('force', False))
reconciler = StateReconciliation( reconciler = StateReconciliation(
state_manager=api_v3.plugin_state_manager, state_manager=api_v3.plugin_state_manager,
config_manager=api_v3.config_manager, config_manager=api_v3.config_manager,
@@ -2419,7 +2426,7 @@ def reconcile_plugin_state():
plugins_dir=Path(api_v3.plugin_manager.plugins_dir) plugins_dir=Path(api_v3.plugin_manager.plugins_dir)
) )
result = reconciler.reconcile_state() result = reconciler.reconcile_state(force=force)
return success_response( return success_response(
data={ data={
@@ -2846,6 +2853,89 @@ def update_plugin():
status_code=500 status_code=500
) )
def _do_transactional_uninstall(plugin_id, preserve_config):
"""Execute an uninstall with snapshot-based rollback.
Order of operations:
1. Snapshot main config + secrets (abort on unexpected errors, proceed on expected I/O errors).
2. Clean up plugin config (abort with 500 if this raises — avoids orphaned files).
3. Unload plugin from runtime if loaded (rollback + 500 if this raises).
4. Remove plugin files (rollback + 500 if this returns False or raises).
5. Finish (remove state, invalidate caches).
Rollback restores the config snapshot and, if the plugin had been
loaded before unload, calls load_plugin to restore runtime state.
Returns (True, None) on success or (False, error_message) on failure.
"""
from src.exceptions import ConfigError
# --- Step 1: snapshot main + secrets ---
main_snapshot = None
secrets_snapshot = None
try:
main_snapshot = api_v3.config_manager.get_raw_file_content('main')
except (OSError, ConfigError):
pass # Proceed without snapshot; narrow catch preserves TypeError/AttributeError
try:
secrets_snapshot = api_v3.config_manager.get_raw_file_content('secrets')
except (OSError, ConfigError):
pass
# --- Step 2: cleanup config first (abort before touching filesystem) ---
if not preserve_config:
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
# Record whether the plugin was running before we touch anything.
was_loaded = (
api_v3.plugin_manager is not None
and plugin_id in api_v3.plugin_manager.plugins
)
def _rollback(reload_plugin):
if main_snapshot is not None:
try:
api_v3.config_manager.save_raw_file_content('main', main_snapshot)
except Exception as restore_err:
logger.error("Failed to restore main config snapshot for %s: %s", plugin_id, restore_err)
if secrets_snapshot is not None:
try:
api_v3.config_manager.save_raw_file_content('secrets', secrets_snapshot)
except Exception as restore_err:
logger.error("Failed to restore secrets snapshot for %s: %s", plugin_id, restore_err)
if reload_plugin and api_v3.plugin_manager is not None:
try:
api_v3.plugin_manager.load_plugin(plugin_id)
except Exception as reload_err:
logger.error("Failed to reload plugin %s during rollback: %s", plugin_id, reload_err)
# --- Step 3: unload ---
if was_loaded:
try:
api_v3.plugin_manager.unload_plugin(plugin_id)
except Exception as unload_err:
_rollback(reload_plugin=False) # unload failed — runtime state unchanged
return False, f"Failed to unload plugin {plugin_id}: {unload_err}"
# --- Step 4: remove files ---
try:
success = api_v3.plugin_store_manager.uninstall_plugin(plugin_id)
except Exception as remove_err:
_rollback(reload_plugin=was_loaded)
return False, f"Failed to remove plugin {plugin_id}: {remove_err}"
if not success:
_rollback(reload_plugin=was_loaded)
return False, f"Failed to uninstall plugin {plugin_id}"
# --- Step 5: finish ---
if api_v3.schema_manager:
api_v3.schema_manager.invalidate_cache(plugin_id)
if api_v3.plugin_state_manager:
api_v3.plugin_state_manager.remove_plugin_state(plugin_id)
return True, None
@api_v3.route('/plugins/uninstall', methods=['POST']) @api_v3.route('/plugins/uninstall', methods=['POST'])
def uninstall_plugin(): def uninstall_plugin():
"""Uninstall plugin""" """Uninstall plugin"""
@@ -2865,19 +2955,13 @@ def uninstall_plugin():
plugin_id = data['plugin_id'] plugin_id = data['plugin_id']
preserve_config = data.get('preserve_config', False) preserve_config = data.get('preserve_config', False)
# Use operation queue if available # Both queued and direct paths use the same transactional helper so
# snapshot/rollback behaviour is consistent regardless of deployment.
if api_v3.operation_queue: if api_v3.operation_queue:
def uninstall_callback(operation): def uninstall_callback(operation):
"""Callback to execute plugin uninstallation.""" """Callback to execute plugin uninstallation via transactional helper."""
# Unload the plugin first if it's loaded success, error_msg = _do_transactional_uninstall(plugin_id, preserve_config)
if api_v3.plugin_manager and plugin_id in api_v3.plugin_manager.plugins:
api_v3.plugin_manager.unload_plugin(plugin_id)
# Uninstall the plugin
success = api_v3.plugin_store_manager.uninstall_plugin(plugin_id)
if not success: if not success:
error_msg = f'Failed to uninstall plugin {plugin_id}'
if api_v3.operation_history: if api_v3.operation_history:
api_v3.operation_history.record_operation( api_v3.operation_history.record_operation(
"uninstall", "uninstall",
@@ -2885,24 +2969,7 @@ def uninstall_plugin():
status="failed", status="failed",
error=error_msg error=error_msg
) )
raise Exception(error_msg) raise Exception(error_msg or f'Failed to uninstall plugin {plugin_id}')
# Invalidate schema cache
if api_v3.schema_manager:
api_v3.schema_manager.invalidate_cache(plugin_id)
# Clean up plugin configuration if not preserving
if not preserve_config:
try:
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
except Exception as cleanup_err:
logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err)
# Remove from state manager
if api_v3.plugin_state_manager:
api_v3.plugin_state_manager.remove_plugin_state(plugin_id)
# Record in history
if api_v3.operation_history: if api_v3.operation_history:
api_v3.operation_history.record_operation( api_v3.operation_history.record_operation(
"uninstall", "uninstall",
@@ -2910,7 +2977,6 @@ def uninstall_plugin():
status="success", status="success",
details={"preserve_config": preserve_config} details={"preserve_config": preserve_config}
) )
return {'success': True, 'message': 'Plugin uninstalled successfully'} return {'success': True, 'message': 'Plugin uninstalled successfully'}
# Enqueue operation # Enqueue operation
@@ -2925,31 +2991,10 @@ def uninstall_plugin():
message='Plugin uninstallation queued' message='Plugin uninstallation queued'
) )
else: else:
# Fallback to direct uninstall # Direct (non-queued) transactional uninstall
# Unload the plugin first if it's loaded success, error_msg = _do_transactional_uninstall(plugin_id, preserve_config)
if api_v3.plugin_manager and plugin_id in api_v3.plugin_manager.plugins:
api_v3.plugin_manager.unload_plugin(plugin_id)
# Uninstall the plugin
success = api_v3.plugin_store_manager.uninstall_plugin(plugin_id)
if success: if success:
# Invalidate schema cache
if api_v3.schema_manager:
api_v3.schema_manager.invalidate_cache(plugin_id)
# Clean up plugin configuration if not preserving
if not preserve_config:
try:
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
except Exception as cleanup_err:
logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err)
# Remove from state manager
if api_v3.plugin_state_manager:
api_v3.plugin_state_manager.remove_plugin_state(plugin_id)
# Record in history
if api_v3.operation_history: if api_v3.operation_history:
api_v3.operation_history.record_operation( api_v3.operation_history.record_operation(
"uninstall", "uninstall",
@@ -2957,7 +3002,6 @@ def uninstall_plugin():
status="success", status="success",
details={"preserve_config": preserve_config} details={"preserve_config": preserve_config}
) )
return success_response(message='Plugin uninstalled successfully') return success_response(message='Plugin uninstalled successfully')
else: else:
if api_v3.operation_history: if api_v3.operation_history:
@@ -2965,12 +3009,11 @@ def uninstall_plugin():
"uninstall", "uninstall",
plugin_id=plugin_id, plugin_id=plugin_id,
status="failed", status="failed",
error='Plugin uninstall failed' error=error_msg
) )
return error_response( return error_response(
ErrorCode.PLUGIN_UNINSTALL_FAILED, ErrorCode.PLUGIN_UNINSTALL_FAILED,
'Plugin uninstall failed', error_msg or 'Plugin uninstall failed',
status_code=500 status_code=500
) )
@@ -4217,7 +4260,9 @@ def save_plugin_config():
nested_dict = config_dict.get(prop_key) nested_dict = config_dict.get(prop_key)
if isinstance(nested_dict, dict): if isinstance(nested_dict, dict):
fix_array_structures(nested_dict, prop_schema['properties'], nested_prefix) # Pass no prefix: config_dict is already the navigated sub-dict,
# so path segments from the parent would mis-navigate it.
fix_array_structures(nested_dict, prop_schema['properties'])
# Also ensure array fields that are None get converted to empty arrays # Also ensure array fields that are None get converted to empty arrays
def ensure_array_defaults(config_dict, schema_props, prefix=''): def ensure_array_defaults(config_dict, schema_props, prefix=''):
@@ -4277,7 +4322,8 @@ def save_plugin_config():
nested_dict = config_dict[prop_key] nested_dict = config_dict[prop_key]
if isinstance(nested_dict, dict): if isinstance(nested_dict, dict):
ensure_array_defaults(nested_dict, prop_schema['properties'], nested_prefix) # Pass no prefix: config_dict is already navigated.
ensure_array_defaults(nested_dict, prop_schema['properties'])
if schema and 'properties' in schema: if schema and 'properties' in schema:
# First, fix any dict structures that should be arrays # First, fix any dict structures that should be arrays
@@ -4377,6 +4423,21 @@ def save_plugin_config():
defaults = schema_mgr.generate_default_config(plugin_id, use_cache=True) defaults = schema_mgr.generate_default_config(plugin_id, use_cache=True)
plugin_config = schema_mgr.merge_with_defaults(plugin_config, defaults) plugin_config = schema_mgr.merge_with_defaults(plugin_config, defaults)
# After merging defaults, replace any None array values with their schema defaults.
# merge_with_defaults gives user config higher priority, so a None submitted by
# the client can survive the merge — this pass cleans those up.
def _fix_none_arrays(cfg, props):
for k, pschema in props.items():
if pschema.get('type') == 'array':
if isinstance(cfg, dict) and (k not in cfg or cfg[k] is None):
cfg[k] = pschema.get('default', [])
elif pschema.get('type') == 'object' and 'properties' in pschema:
if isinstance(cfg, dict) and isinstance(cfg.get(k), dict):
_fix_none_arrays(cfg[k], pschema['properties'])
if schema and 'properties' in schema and isinstance(plugin_config, dict):
_fix_none_arrays(plugin_config, schema['properties'])
# Ensure enabled state is preserved after defaults merge # Ensure enabled state is preserved after defaults merge
# Defaults should not overwrite an explicitly preserved enabled value # Defaults should not overwrite an explicitly preserved enabled value
if preserved_enabled is not None: if preserved_enabled is not None: