Feature/vegas scroll mode (#215)

* feat(display): add Vegas-style continuous scroll mode

Implement an opt-in Vegas ticker mode that composes all enabled plugin
content into a single continuous horizontal scroll. Includes a modular
package (src/vegas_mode/) with double-buffered streaming, 125 FPS
render pipeline using the existing ScrollHelper, live priority
interruption support, and a web UI for configuration with drag-drop
plugin ordering.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat(vegas): add three-mode display system (SCROLL, FIXED_SEGMENT, STATIC)

Adds a flexible display mode system for Vegas scroll mode that allows
plugins to control how their content appears in the continuous scroll:

- SCROLL: Content scrolls continuously (multi-item plugins like sports)
- FIXED_SEGMENT: Fixed block that scrolls by (clock, weather)
- STATIC: Scroll pauses, plugin displays, then resumes (alerts)

Changes:
- Add VegasDisplayMode enum to base_plugin.py with backward-compatible
  mapping from legacy get_vegas_content_type()
- Add static pause handling to coordinator with scroll position save/restore
- Add mode-aware content composition to stream_manager
- Add vegas_mode info to /api/v3/plugins/installed endpoint
- Add mode indicators to Vegas settings UI
- Add comprehensive plugin developer documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas,widgets): address validation, thread safety, and XSS issues

Vegas mode fixes:
- config.py: align validation limits with UI (scroll_speed max 200, separator_width max 128)
- coordinator.py: fix race condition by properly initializing _pending_config
- plugin_adapter.py: remove unused import
- render_pipeline.py: preserve deque type in reset() method
- stream_manager.py: fix lock handling and swap_buffers to truly swap

API fixes:
- api_v3.py: normalize boolean checkbox values, validate numeric fields, ensure JSON arrays

Widget fixes:
- day-selector.js: remove escapeHtml from JSON.stringify to prevent corruption
- password-input.js: use deterministic color class mapping for Tailwind JIT
- radio-group.js: replace inline onchange with addEventListener to prevent XSS
- select-dropdown.js: guard global registry access
- slider.js: add escapeAttr for attributes, fix null dereference in setValue

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): improve exception handling and static pause state management

coordinator.py:
- _check_live_priority: use logger.exception for full traceback
- _end_static_pause: guard scroll resume on interruption (stop/live priority)
- _update_static_mode_plugins: log errors instead of silently swallowing

render_pipeline.py:
- compose_scroll_content: use specific exceptions and logger.exception
- render_frame: use specific exceptions and logger.exception
- hot_swap_content: use specific exceptions and logger.exception

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): add interrupt mechanism and improve config/exception handling

- Add interrupt checker callback to Vegas coordinator for responsive
  handling of on-demand requests and wifi status during Vegas mode
- Fix config.py update() to include dynamic duration fields
- Fix is_plugin_included() consistency with get_ordered_plugins()
- Update _apply_pending_config to propagate config to StreamManager
- Change _fetch_plugin_content to use logger.exception for traceback
- Replace bare except in _refresh_plugin_list with specific exceptions
- Add aria-label accessibility to Vegas toggle checkbox
- Fix XSS vulnerability in plugin metadata rendering with escapeHtml

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): improve logging, validation, lock handling, and config updates

- display_controller.py: use logger.exception for Vegas errors with traceback
- base_plugin.py: validate vegas_panel_count as positive integer with warning
- coordinator.py: fix _apply_pending_config to avoid losing concurrent updates
  by clearing _pending_config while holding lock
- plugin_adapter.py: remove broad catch-all, use narrower exception types
  (AttributeError, TypeError, ValueError, OSError, RuntimeError) and
  logger.exception for traceback preservation
- api_v3.py: only update vegas_config['enabled'] when key is present in data
  to prevent incorrect disabling when checkbox is omitted

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): improve cycle advancement, logging, and accessibility

- Add advance_cycle() method to StreamManager for clearing buffer between cycles
- Call advance_cycle() in RenderPipeline.start_new_cycle() for fresh content
- Use logger.exception() for interrupt check and static pause errors (full tracebacks)
- Add id="vegas_scroll_label" to h3 for aria-labelledby reference
- Call updatePluginConfig() after rendering plugin list for proper initialization

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): add thread-safety, preserve updates, and improve logging

- display_controller.py: Use logger.exception() for Vegas import errors
- plugin_adapter.py: Add thread-safe cache lock, remove unused exception binding
- stream_manager.py: In-place merge in process_updates() preserves non-updated plugins
- api_v3.py: Change vegas_scroll_enabled default from False to True

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): add debug logging and narrow exception types

- stream_manager.py: Log when get_vegas_display_mode() is unavailable
- stream_manager.py: Narrow exception type from Exception to (AttributeError, TypeError)
- api_v3.py: Log exceptions when reading Vegas display metadata with plugin context

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): fix method call and improve exception logging

- Fix _check_vegas_interrupt() calling nonexistent _check_wifi_status(),
  now correctly calls _check_wifi_status_message()
- Update _refresh_plugin_list() exception handler to use logger.exception()
  with plugin_id and class name for remote debugging

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(web): replace complex toggle with standard checkbox for Vegas mode

The Tailwind pseudo-element toggle (after:content-[''], etc.) wasn't
rendering because these classes weren't in the CSS bundle. Replaced
with a simple checkbox that matches other form controls in the template.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* debug(vegas): add detailed logging to _refresh_plugin_list

Track why plugins aren't being found for Vegas scroll:
- Log count of loaded plugins
- Log enabled status for each plugin
- Log content_type and display_mode checks
- Log when plugin_manager lacks loaded_plugins

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): use correct attribute name for plugin manager

StreamManager and VegasModeCoordinator were checking for
plugin_manager.loaded_plugins but PluginManager stores active
plugins in plugin_manager.plugins. This caused Vegas scroll
to find zero plugins despite plugins being available.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): convert scroll_speed from px/sec to px/frame correctly

The config scroll_speed is in pixels per second, but ScrollHelper
in frame_based_scrolling mode interprets it as pixels per frame.
Previously this caused the speed to be clamped to max 5.0 regardless
of the configured value.

Now properly converts: pixels_per_frame = scroll_speed * scroll_delay

With defaults (50 px/s, 0.02s delay), this gives 1 px/frame = 50 px/s.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat(vegas): add FPS logging every 5 seconds

Logs actual FPS vs target FPS to help diagnose performance issues.
Shows frame count in each 5-second interval.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): improve plugin content capture reliability

- Call update_data() before capture to ensure fresh plugin data
- Try display() without force_clear first, fallback if TypeError
- Retry capture with force_clear=True if first attempt is blank
- Use histogram-based blank detection instead of point sampling
  (more reliable for content positioned anywhere in frame)

This should help capture content from plugins that don't implement
get_vegas_content() natively.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): handle callable width/height on display_manager

DisplayManager.width and .height may be methods or properties depending
on the implementation. Use callable() check to call them if needed,
ensuring display_width and display_height are always integers.

Fixes potential TypeError when width/height are methods.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): use logger.exception for display mode errors

Replace logger.error with logger.exception to capture full stack trace
when get_vegas_display_mode() fails on a plugin.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): protect plugin list updates with buffer lock

Move assignment of _ordered_plugins and index resets under _buffer_lock
to prevent race conditions with _prefetch_content() which reads these
variables under the same lock.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): catch all exceptions in get_vegas_display_mode

Broaden exception handling from AttributeError/TypeError to Exception
so any plugin error in get_vegas_display_mode() doesn't abort the
entire plugin list refresh. The loop continues with the default
FIXED_SEGMENT mode.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(vegas): refresh stream manager when config updates

After updating stream_manager.config, force a refresh to pick up changes
to plugin_order, excluded_plugins, and buffer_ahead settings. Also use
logger.exception to capture full stack traces on config update errors.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* debug(vegas): add detailed logging for blank image detection

* feat(vegas): extract full scroll content from plugins using ScrollHelper

Plugins like ledmatrix-stocks and odds-ticker use ScrollHelper with a
cached_image that contains their full scrolling content. Instead of
falling back to single-frame capture, now check for scroll_helper.cached_image
first to get the complete scrolling content for Vegas mode.

* debug(vegas): add comprehensive INFO-level logging for plugin content flow

- Log each plugin being processed with class name
- Log which content methods are tried (native, scroll_helper, fallback)
- Log success/failure of each method with image dimensions
- Log brightness check results for blank image detection
- Add visual separators in logs for easier debugging
- Log plugin list refresh with enabled/excluded status

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat(vegas): trigger scroll content generation when cache is empty

When a plugin has a scroll_helper but its cached_image is not yet
populated, try to trigger content generation by:
1. Calling _create_scrolling_display() if available (stocks pattern)
2. Calling display(force_clear=True) as a fallback

This allows plugins like stocks to provide their full scroll content
even when Vegas mode starts before the plugin has run its normal
display cycle.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: improve exception handling in plugin_adapter scroll content retrieval

Replace broad except Exception handlers with narrow exception types
(AttributeError, TypeError, ValueError, OSError) and use logger.exception
instead of logger.warning/info to capture full stack traces for better
diagnosability.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: narrow exception handling in coordinator and plugin_adapter

- coordinator.py: Replace broad Exception catch around get_vegas_display_mode()
  with (AttributeError, TypeError) and use logger.exception for stack traces
- plugin_adapter.py: Narrow update_data() exception handler to
  (AttributeError, RuntimeError, OSError) and use logger.exception

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: improve Vegas mode robustness and API validation

- display_controller: Guard against None plugin_manager in Vegas init
- coordinator: Restore scrolling state in resume() to match pause()
- api_v3: Validate Vegas numeric fields with range checks and 400 errors

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Chuck
2026-01-29 10:23:56 -05:00
committed by GitHub
parent 10d70d911a
commit 7524747e44
17 changed files with 3576 additions and 21 deletions

View File

@@ -18,6 +18,10 @@ from src.logging_config import get_logger
# Get logger with consistent configuration
logger = get_logger(__name__)
# Vegas mode import (lazy loaded to avoid circular imports)
_vegas_mode_imported = False
VegasModeCoordinator = None
DEFAULT_DYNAMIC_DURATION_CAP = 180.0
# WiFi status message file path (same as used in wifi_manager.py)
@@ -343,8 +347,87 @@ class DisplayController:
self._update_modules()
logger.info("Initial plugin update completed in %.3f seconds", time.time() - update_start)
# Initialize Vegas mode coordinator
self.vegas_coordinator = None
self._initialize_vegas_mode()
logger.info("DisplayController initialization completed in %.3f seconds", time.time() - start_time)
def _initialize_vegas_mode(self):
"""Initialize Vegas mode coordinator if enabled."""
global _vegas_mode_imported, VegasModeCoordinator
vegas_config = self.config.get('display', {}).get('vegas_scroll', {})
if not vegas_config.get('enabled', False):
logger.debug("Vegas mode disabled in config")
return
if self.plugin_manager is None:
logger.warning("Vegas mode skipped: plugin_manager is None")
return
try:
# Lazy import to avoid circular imports
if not _vegas_mode_imported:
try:
from src.vegas_mode import VegasModeCoordinator as VMC
VegasModeCoordinator = VMC
_vegas_mode_imported = True
except ImportError:
logger.exception("Failed to import Vegas mode module")
return
self.vegas_coordinator = VegasModeCoordinator(
config=self.config,
display_manager=self.display_manager,
plugin_manager=self.plugin_manager
)
# Set up live priority checker
self.vegas_coordinator.set_live_priority_checker(self._check_live_priority)
# Set up interrupt checker for on-demand/wifi status
self.vegas_coordinator.set_interrupt_checker(
self._check_vegas_interrupt,
check_interval=10 # Check every 10 frames (~80ms at 125 FPS)
)
logger.info("Vegas mode coordinator initialized")
except Exception as e:
logger.error("Failed to initialize Vegas mode: %s", e, exc_info=True)
self.vegas_coordinator = None
def _is_vegas_mode_active(self) -> bool:
"""Check if Vegas mode should be running."""
if not self.vegas_coordinator:
return False
if not self.vegas_coordinator.is_enabled:
return False
if self.on_demand_active:
return False # On-demand takes priority
return True
def _check_vegas_interrupt(self) -> bool:
"""
Check if Vegas should yield control for higher priority events.
Called periodically by Vegas coordinator to allow responsive
handling of on-demand requests, wifi status, etc.
Returns:
True if Vegas should yield control, False to continue
"""
# Check for pending on-demand request
if self.on_demand_active:
return True
# Check for wifi status that needs display
if self._check_wifi_status_message():
return True
return False
def _check_schedule(self):
"""Check if display should be active based on schedule."""
schedule_config = self.config.get('schedule', {})
@@ -1152,6 +1235,23 @@ class DisplayController:
except ValueError:
pass
# Vegas scroll mode - continuous ticker across all plugins
# Priority: on-demand > wifi-status > live-priority > vegas > normal rotation
if self._is_vegas_mode_active() and not wifi_status_data:
live_mode = self._check_live_priority()
if not live_mode:
try:
# Run Vegas mode iteration
if self.vegas_coordinator.run_iteration():
# Vegas completed an iteration, continue to next loop
continue
else:
# Vegas was interrupted (live priority), fall through to normal handling
logger.debug("Vegas mode interrupted, falling back to normal rotation")
except Exception:
logger.exception("Vegas mode error")
# Fall through to normal rotation on error
if self.on_demand_active:
# Guard against empty on_demand_modes
if not self.on_demand_modes:

View File

@@ -9,11 +9,35 @@ Stability: Stable - maintains backward compatibility
"""
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Any, Optional, List
import logging
from src.logging_config import get_logger
class VegasDisplayMode(Enum):
"""
Display mode for Vegas scroll integration.
Determines how a plugin's content behaves within the continuous scroll:
- SCROLL: Content scrolls continuously within the stream.
Best for multi-item plugins like sports scores, odds tickers, news feeds.
Plugin provides multiple frames via get_vegas_content().
- FIXED_SEGMENT: Content is a fixed-width block that scrolls BY with
the rest of the content. Best for static info like clock, weather.
Plugin provides a single image sized to vegas_panel_count panels.
- STATIC: Scroll pauses, plugin displays for its duration, then scroll
resumes. Best for important alerts or detailed views that need attention.
Plugin uses standard display() method during the pause.
"""
SCROLL = "scroll"
FIXED_SEGMENT = "fixed"
STATIC = "static"
class BasePlugin(ABC):
"""
Base class that all plugins must inherit from.
@@ -141,7 +165,7 @@ class BasePlugin(ABC):
pass # Fall through to config
except (TypeError, ValueError, AttributeError):
pass # Fall through to config
# Fall back to config
config_duration = self.config.get("display_duration", 15.0)
try:
@@ -152,7 +176,7 @@ class BasePlugin(ABC):
return float(config_duration) if float(config_duration) > 0 else 15.0
except (ValueError, TypeError):
pass
return 15.0
# ---------------------------------------------------------------------
@@ -285,6 +309,168 @@ class BasePlugin(ABC):
return manifest.get("display_modes", [self.plugin_id])
return [self.plugin_id]
# -------------------------------------------------------------------------
# Vegas scroll mode support
# -------------------------------------------------------------------------
def get_vegas_content(self) -> Optional[Any]:
"""
Get content for Vegas-style continuous scroll mode.
Override this method to provide optimized content for continuous scrolling.
Plugins can return:
- A single PIL Image: Displayed as a static block in the scroll
- A list of PIL Images: Each image becomes a separate item in the scroll
- None: Vegas mode will fall back to capturing display() output
Multi-item plugins (sports scores, odds) should return individual game/item
images so they scroll smoothly with other plugins.
Returns:
PIL Image, list of PIL Images, or None
Example (sports plugin):
def get_vegas_content(self):
# Return individual game cards for smooth scrolling
return [self._render_game(game) for game in self.games]
Example (static plugin):
def get_vegas_content(self):
# Return current display as single block
return self._render_current_view()
"""
return None
def get_vegas_content_type(self) -> str:
"""
Indicate the type of content this plugin provides for Vegas scroll.
Override this to specify how Vegas mode should treat this plugin's content.
Returns:
'multi' - Plugin has multiple scrollable items (sports, odds, news)
'static' - Plugin is a static block (clock, weather, music)
'none' - Plugin should not appear in Vegas scroll mode
Example:
def get_vegas_content_type(self):
return 'multi' # We have multiple games to scroll
"""
return 'static'
def get_vegas_display_mode(self) -> VegasDisplayMode:
"""
Get the display mode for Vegas scroll integration.
This method determines how the plugin's content behaves within Vegas mode:
- SCROLL: Content scrolls continuously (multi-item plugins)
- FIXED_SEGMENT: Fixed block that scrolls by (clock, weather)
- STATIC: Pause scroll to display (alerts, detailed views)
Override to change default behavior. By default, reads from config
or maps legacy get_vegas_content_type() for backward compatibility.
Returns:
VegasDisplayMode enum value
Example:
def get_vegas_display_mode(self):
return VegasDisplayMode.SCROLL
"""
# Check for explicit config setting first
config_mode = self.config.get("vegas_mode")
if config_mode:
try:
return VegasDisplayMode(config_mode)
except ValueError:
self.logger.warning(
"Invalid vegas_mode '%s' for %s, using default",
config_mode, self.plugin_id
)
# Fall back to mapping legacy content_type
content_type = self.get_vegas_content_type()
if content_type == 'multi':
return VegasDisplayMode.SCROLL
elif content_type == 'static':
return VegasDisplayMode.FIXED_SEGMENT
elif content_type == 'none':
# 'none' means excluded - return FIXED_SEGMENT as default
# The exclusion is handled by checking get_vegas_content_type() separately
return VegasDisplayMode.FIXED_SEGMENT
return VegasDisplayMode.FIXED_SEGMENT
def get_supported_vegas_modes(self) -> List[VegasDisplayMode]:
"""
Return list of Vegas display modes this plugin supports.
Used by the web UI to show available mode options for user configuration.
Override to customize which modes are available for this plugin.
By default:
- 'multi' content type plugins support SCROLL and FIXED_SEGMENT
- 'static' content type plugins support FIXED_SEGMENT and STATIC
- 'none' content type plugins return empty list (excluded from Vegas)
Returns:
List of VegasDisplayMode values this plugin can use
Example:
def get_supported_vegas_modes(self):
# This plugin only makes sense as a scrolling ticker
return [VegasDisplayMode.SCROLL]
"""
content_type = self.get_vegas_content_type()
if content_type == 'none':
return []
elif content_type == 'multi':
return [VegasDisplayMode.SCROLL, VegasDisplayMode.FIXED_SEGMENT]
else: # 'static'
return [VegasDisplayMode.FIXED_SEGMENT, VegasDisplayMode.STATIC]
def get_vegas_segment_width(self) -> Optional[int]:
"""
Get the preferred width for this plugin in Vegas FIXED_SEGMENT mode.
Returns the number of panels this plugin should occupy when displayed
as a fixed segment. The actual pixel width is calculated as:
width = panels * single_panel_width
Where single_panel_width comes from display.hardware.cols in config.
Override to provide dynamic sizing based on content.
Returns None to use the default (1 panel).
Returns:
Number of panels, or None for default (1 panel)
Example:
def get_vegas_segment_width(self):
# Clock needs 2 panels to show time clearly
return 2
"""
raw_value = self.config.get("vegas_panel_count", None)
if raw_value is None:
return None
try:
panel_count = int(raw_value)
if panel_count > 0:
return panel_count
else:
self.logger.warning(
"vegas_panel_count must be positive, got %s; using default",
raw_value
)
return None
except (ValueError, TypeError):
self.logger.warning(
"Invalid vegas_panel_count value '%s'; using default",
raw_value
)
return None
def validate_config(self) -> bool:
"""
Validate plugin configuration against schema.

View File

@@ -0,0 +1,21 @@
"""
Vegas Mode - Continuous Scrolling Ticker
This package implements a Vegas-style continuous scroll mode where all enabled
plugins' content is composed into a single horizontally scrolling display.
Components:
- VegasModeCoordinator: Main orchestrator for Vegas mode
- StreamManager: Manages plugin content streaming with 1-2 ahead buffering
- RenderPipeline: Handles 125 FPS rendering with double-buffering
- PluginAdapter: Converts plugin content to scrollable images
- VegasModeConfig: Configuration management
"""
from src.vegas_mode.config import VegasModeConfig
from src.vegas_mode.coordinator import VegasModeCoordinator
__all__ = [
'VegasModeConfig',
'VegasModeCoordinator',
]

200
src/vegas_mode/config.py Normal file
View File

@@ -0,0 +1,200 @@
"""
Vegas Mode Configuration
Handles configuration for Vegas-style continuous scroll mode including
plugin ordering, exclusions, scroll speed, and display settings.
"""
import logging
from typing import Dict, Any, List, Set, Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class VegasModeConfig:
"""Configuration for Vegas scroll mode."""
# Core settings
enabled: bool = False
scroll_speed: float = 50.0 # Pixels per second
separator_width: int = 32 # Gap between plugins (pixels)
# Plugin management
plugin_order: List[str] = field(default_factory=list)
excluded_plugins: Set[str] = field(default_factory=set)
# Performance settings
target_fps: int = 125 # Target frame rate
buffer_ahead: int = 2 # Number of plugins to buffer ahead
# Scroll behavior
frame_based_scrolling: bool = True
scroll_delay: float = 0.02 # 50 FPS effective scroll updates
# Dynamic duration
dynamic_duration_enabled: bool = True
min_cycle_duration: int = 60 # Minimum seconds per full cycle
max_cycle_duration: int = 600 # Maximum seconds per full cycle
@classmethod
def from_config(cls, config: Dict[str, Any]) -> 'VegasModeConfig':
"""
Create VegasModeConfig from main configuration dictionary.
Args:
config: Main config dict (expects config['display']['vegas_scroll'])
Returns:
VegasModeConfig instance
"""
vegas_config = config.get('display', {}).get('vegas_scroll', {})
return cls(
enabled=vegas_config.get('enabled', False),
scroll_speed=float(vegas_config.get('scroll_speed', 50.0)),
separator_width=int(vegas_config.get('separator_width', 32)),
plugin_order=list(vegas_config.get('plugin_order', [])),
excluded_plugins=set(vegas_config.get('excluded_plugins', [])),
target_fps=int(vegas_config.get('target_fps', 125)),
buffer_ahead=int(vegas_config.get('buffer_ahead', 2)),
frame_based_scrolling=vegas_config.get('frame_based_scrolling', True),
scroll_delay=float(vegas_config.get('scroll_delay', 0.02)),
dynamic_duration_enabled=vegas_config.get('dynamic_duration_enabled', True),
min_cycle_duration=int(vegas_config.get('min_cycle_duration', 60)),
max_cycle_duration=int(vegas_config.get('max_cycle_duration', 600)),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert config to dictionary for serialization."""
return {
'enabled': self.enabled,
'scroll_speed': self.scroll_speed,
'separator_width': self.separator_width,
'plugin_order': self.plugin_order,
'excluded_plugins': list(self.excluded_plugins),
'target_fps': self.target_fps,
'buffer_ahead': self.buffer_ahead,
'frame_based_scrolling': self.frame_based_scrolling,
'scroll_delay': self.scroll_delay,
'dynamic_duration_enabled': self.dynamic_duration_enabled,
'min_cycle_duration': self.min_cycle_duration,
'max_cycle_duration': self.max_cycle_duration,
}
def get_frame_interval(self) -> float:
"""Get the frame interval in seconds for target FPS."""
return 1.0 / max(1, self.target_fps)
def is_plugin_included(self, plugin_id: str) -> bool:
"""
Check if a plugin should be included in Vegas scroll.
This is consistent with get_ordered_plugins - plugins not explicitly
in plugin_order are still included (appended at the end) unless excluded.
Args:
plugin_id: Plugin identifier to check
Returns:
True if plugin should be included
"""
# Plugins are included unless explicitly excluded
return plugin_id not in self.excluded_plugins
def get_ordered_plugins(self, available_plugins: List[str]) -> List[str]:
"""
Get plugins in configured order, filtering excluded ones.
Args:
available_plugins: List of all available plugin IDs
Returns:
Ordered list of plugin IDs to include in Vegas scroll
"""
if self.plugin_order:
# Use explicit order, filter to only available and non-excluded
ordered = [
p for p in self.plugin_order
if p in available_plugins and p not in self.excluded_plugins
]
# Add any available plugins not in the order list (at the end)
for p in available_plugins:
if p not in ordered and p not in self.excluded_plugins:
ordered.append(p)
return ordered
else:
# Use natural order, filter excluded
return [p for p in available_plugins if p not in self.excluded_plugins]
def validate(self) -> List[str]:
"""
Validate configuration values.
Returns:
List of validation error messages (empty if valid)
"""
errors = []
if self.scroll_speed < 1.0:
errors.append(f"scroll_speed must be >= 1.0, got {self.scroll_speed}")
if self.scroll_speed > 200.0:
errors.append(f"scroll_speed must be <= 200.0, got {self.scroll_speed}")
if self.separator_width < 0:
errors.append(f"separator_width must be >= 0, got {self.separator_width}")
if self.separator_width > 128:
errors.append(f"separator_width must be <= 128, got {self.separator_width}")
if self.target_fps < 30:
errors.append(f"target_fps must be >= 30, got {self.target_fps}")
if self.target_fps > 200:
errors.append(f"target_fps must be <= 200, got {self.target_fps}")
if self.buffer_ahead < 1:
errors.append(f"buffer_ahead must be >= 1, got {self.buffer_ahead}")
if self.buffer_ahead > 5:
errors.append(f"buffer_ahead must be <= 5, got {self.buffer_ahead}")
return errors
def update(self, new_config: Dict[str, Any]) -> None:
"""
Update configuration from new values.
Args:
new_config: New configuration values to apply
"""
vegas_config = new_config.get('display', {}).get('vegas_scroll', {})
if 'enabled' in vegas_config:
self.enabled = vegas_config['enabled']
if 'scroll_speed' in vegas_config:
self.scroll_speed = float(vegas_config['scroll_speed'])
if 'separator_width' in vegas_config:
self.separator_width = int(vegas_config['separator_width'])
if 'plugin_order' in vegas_config:
self.plugin_order = list(vegas_config['plugin_order'])
if 'excluded_plugins' in vegas_config:
self.excluded_plugins = set(vegas_config['excluded_plugins'])
if 'target_fps' in vegas_config:
self.target_fps = int(vegas_config['target_fps'])
if 'buffer_ahead' in vegas_config:
self.buffer_ahead = int(vegas_config['buffer_ahead'])
if 'frame_based_scrolling' in vegas_config:
self.frame_based_scrolling = vegas_config['frame_based_scrolling']
if 'scroll_delay' in vegas_config:
self.scroll_delay = float(vegas_config['scroll_delay'])
if 'dynamic_duration_enabled' in vegas_config:
self.dynamic_duration_enabled = vegas_config['dynamic_duration_enabled']
if 'min_cycle_duration' in vegas_config:
self.min_cycle_duration = int(vegas_config['min_cycle_duration'])
if 'max_cycle_duration' in vegas_config:
self.max_cycle_duration = int(vegas_config['max_cycle_duration'])
# Log config update
logger.info(
"Vegas mode config updated: enabled=%s, speed=%.1f, fps=%d, buffer=%d",
self.enabled, self.scroll_speed, self.target_fps, self.buffer_ahead
)

View File

@@ -0,0 +1,655 @@
"""
Vegas Mode Coordinator
Main orchestrator for Vegas-style continuous scroll mode. Coordinates between
StreamManager, RenderPipeline, and the display system to provide smooth
continuous scrolling of all enabled plugin content.
Supports three display modes per plugin:
- SCROLL: Content scrolls continuously within the stream
- FIXED_SEGMENT: Fixed block that scrolls by with other content
- STATIC: Scroll pauses, plugin displays for its duration, then resumes
"""
import logging
import time
import threading
from typing import Optional, Dict, Any, List, Callable, TYPE_CHECKING
from src.vegas_mode.config import VegasModeConfig
from src.vegas_mode.plugin_adapter import PluginAdapter
from src.vegas_mode.stream_manager import StreamManager
from src.vegas_mode.render_pipeline import RenderPipeline
from src.plugin_system.base_plugin import VegasDisplayMode
if TYPE_CHECKING:
from src.plugin_system.plugin_manager import PluginManager
from src.plugin_system.base_plugin import BasePlugin
from src.display_manager import DisplayManager
logger = logging.getLogger(__name__)
class VegasModeCoordinator:
"""
Orchestrates Vegas scroll mode operation.
Responsibilities:
- Initialize and coordinate all Vegas mode components
- Manage the high-FPS render loop
- Handle live priority interruptions
- Process config updates
- Provide status and control interface
"""
def __init__(
self,
config: Dict[str, Any],
display_manager: 'DisplayManager',
plugin_manager: 'PluginManager'
):
"""
Initialize the Vegas mode coordinator.
Args:
config: Main configuration dictionary
display_manager: DisplayManager instance
plugin_manager: PluginManager instance
"""
# Parse configuration
self.vegas_config = VegasModeConfig.from_config(config)
# Store references
self.display_manager = display_manager
self.plugin_manager = plugin_manager
# Initialize components
self.plugin_adapter = PluginAdapter(display_manager)
self.stream_manager = StreamManager(
self.vegas_config,
plugin_manager,
self.plugin_adapter
)
self.render_pipeline = RenderPipeline(
self.vegas_config,
display_manager,
self.stream_manager
)
# State management
self._is_active = False
self._is_paused = False
self._should_stop = False
self._state_lock = threading.Lock()
# Live priority tracking
self._live_priority_active = False
self._live_priority_check: Optional[Callable[[], Optional[str]]] = None
# Interrupt checker for yielding control back to display controller
self._interrupt_check: Optional[Callable[[], bool]] = None
self._interrupt_check_interval: int = 10 # Check every N frames
# Config update tracking
self._config_version = 0
self._pending_config_update = False
self._pending_config: Optional[Dict[str, Any]] = None
# Static pause handling
self._static_pause_active = False
self._static_pause_plugin: Optional['BasePlugin'] = None
self._static_pause_start: Optional[float] = None
self._saved_scroll_position: Optional[int] = None
# Track which plugins should use STATIC mode (pause scroll)
self._static_mode_plugins: set = set()
# Statistics
self.stats = {
'total_runtime_seconds': 0.0,
'cycles_completed': 0,
'interruptions': 0,
'config_updates': 0,
'static_pauses': 0,
}
self._start_time: Optional[float] = None
logger.info(
"VegasModeCoordinator initialized: enabled=%s, fps=%d, buffer_ahead=%d",
self.vegas_config.enabled,
self.vegas_config.target_fps,
self.vegas_config.buffer_ahead
)
@property
def is_enabled(self) -> bool:
"""Check if Vegas mode is enabled in configuration."""
return self.vegas_config.enabled
@property
def is_active(self) -> bool:
"""Check if Vegas mode is currently running."""
return self._is_active
def set_live_priority_checker(self, checker: Callable[[], Optional[str]]) -> None:
"""
Set the callback for checking live priority content.
Args:
checker: Callable that returns live priority mode name or None
"""
self._live_priority_check = checker
def set_interrupt_checker(
self,
checker: Callable[[], bool],
check_interval: int = 10
) -> None:
"""
Set the callback for checking if Vegas should yield control.
This allows the display controller to interrupt Vegas mode
when on-demand, wifi status, or other priority events occur.
Args:
checker: Callable that returns True if Vegas should yield
check_interval: Check every N frames (default 10)
"""
self._interrupt_check = checker
self._interrupt_check_interval = max(1, check_interval)
def start(self) -> bool:
"""
Start Vegas mode operation.
Returns:
True if started successfully
"""
if not self.vegas_config.enabled:
logger.warning("Cannot start Vegas mode - not enabled in config")
return False
with self._state_lock:
if self._is_active:
logger.warning("Vegas mode already active")
return True
# Validate configuration
errors = self.vegas_config.validate()
if errors:
logger.error("Vegas config validation failed: %s", errors)
return False
# Initialize stream manager
if not self.stream_manager.initialize():
logger.error("Failed to initialize stream manager")
return False
# Compose initial content
if not self.render_pipeline.compose_scroll_content():
logger.error("Failed to compose initial scroll content")
return False
self._is_active = True
self._should_stop = False
self._start_time = time.time()
logger.info("Vegas mode started")
return True
def stop(self) -> None:
"""Stop Vegas mode operation."""
with self._state_lock:
if not self._is_active:
return
self._should_stop = True
self._is_active = False
if self._start_time:
self.stats['total_runtime_seconds'] += time.time() - self._start_time
self._start_time = None
# Cleanup components
self.render_pipeline.reset()
self.stream_manager.reset()
self.display_manager.set_scrolling_state(False)
logger.info("Vegas mode stopped")
def pause(self) -> None:
"""Pause Vegas mode (for live priority interruption)."""
with self._state_lock:
if not self._is_active:
return
self._is_paused = True
self.stats['interruptions'] += 1
self.display_manager.set_scrolling_state(False)
logger.info("Vegas mode paused")
def resume(self) -> None:
"""Resume Vegas mode after pause."""
with self._state_lock:
if not self._is_active:
return
self._is_paused = False
self.display_manager.set_scrolling_state(True)
logger.info("Vegas mode resumed")
def run_frame(self) -> bool:
"""
Run a single frame of Vegas mode.
Should be called at target FPS (e.g., 125 FPS = every 8ms).
Returns:
True if frame was rendered, False if Vegas mode is not active
"""
# Check if we should be running
with self._state_lock:
if not self._is_active or self._is_paused or self._should_stop:
return False
# Check for config updates (synchronized access)
has_pending_update = self._pending_config_update
# Check for live priority
if self._check_live_priority():
return False
# Apply pending config update outside lock
if has_pending_update:
self._apply_pending_config()
# Check if we need to start a new cycle
if self.render_pipeline.is_cycle_complete():
if not self.render_pipeline.start_new_cycle():
logger.warning("Failed to start new Vegas cycle")
return False
self.stats['cycles_completed'] += 1
# Check for hot-swap opportunities
if self.render_pipeline.should_recompose():
self.render_pipeline.hot_swap_content()
# Render frame
return self.render_pipeline.render_frame()
def run_iteration(self) -> bool:
"""
Run a complete Vegas mode iteration (display duration).
This is called by DisplayController to run Vegas mode for one
"display duration" period before checking for mode changes.
Handles three display modes:
- SCROLL/FIXED_SEGMENT: Continue normal scroll rendering
- STATIC: Pause scroll, display plugin, resume on completion
Returns:
True if iteration completed normally, False if interrupted
"""
if not self.is_active:
if not self.start():
return False
# Update static mode plugin list on iteration start
self._update_static_mode_plugins()
frame_interval = self.vegas_config.get_frame_interval()
duration = self.render_pipeline.get_dynamic_duration()
start_time = time.time()
frame_count = 0
fps_log_interval = 5.0 # Log FPS every 5 seconds
last_fps_log_time = start_time
fps_frame_count = 0
logger.info("Starting Vegas iteration for %.1fs", duration)
while True:
# Check for STATIC mode plugin that should pause scroll
static_plugin = self._check_static_plugin_trigger()
if static_plugin:
if not self._handle_static_pause(static_plugin):
# Static pause was interrupted
return False
# After static pause, skip this segment and continue
self.stream_manager.get_next_segment() # Consume the segment
continue
# Run frame
if not self.run_frame():
# Check why we stopped
with self._state_lock:
if self._should_stop:
return False
if self._is_paused:
# Paused for live priority - let caller handle
return False
# Sleep for frame interval
time.sleep(frame_interval)
# Increment frame count and check for interrupt periodically
frame_count += 1
fps_frame_count += 1
# Periodic FPS logging
current_time = time.time()
if current_time - last_fps_log_time >= fps_log_interval:
fps = fps_frame_count / (current_time - last_fps_log_time)
logger.info(
"Vegas FPS: %.1f (target: %d, frames: %d)",
fps, self.vegas_config.target_fps, fps_frame_count
)
last_fps_log_time = current_time
fps_frame_count = 0
if (self._interrupt_check and
frame_count % self._interrupt_check_interval == 0):
try:
if self._interrupt_check():
logger.debug(
"Vegas interrupted by callback after %d frames",
frame_count
)
return False
except Exception:
# Log but don't let interrupt check errors stop Vegas
logger.exception("Interrupt check failed")
# Check elapsed time
elapsed = time.time() - start_time
if elapsed >= duration:
break
# Check for cycle completion
if self.render_pipeline.is_cycle_complete():
break
logger.info("Vegas iteration completed after %.1fs", time.time() - start_time)
return True
def _check_live_priority(self) -> bool:
"""
Check if live priority content should interrupt Vegas mode.
Returns:
True if Vegas mode should be paused for live priority
"""
if not self._live_priority_check:
return False
try:
live_mode = self._live_priority_check()
if live_mode:
if not self._live_priority_active:
self._live_priority_active = True
self.pause()
logger.info("Live priority detected: %s - pausing Vegas", live_mode)
return True
else:
if self._live_priority_active:
self._live_priority_active = False
self.resume()
logger.info("Live priority ended - resuming Vegas")
return False
except Exception:
logger.exception("Error checking live priority")
return False
def update_config(self, new_config: Dict[str, Any]) -> None:
"""
Update Vegas mode configuration.
Config changes are applied at next safe point to avoid disruption.
Args:
new_config: New configuration dictionary
"""
with self._state_lock:
self._pending_config_update = True
self._pending_config = new_config
self._config_version += 1
self.stats['config_updates'] += 1
logger.debug("Config update queued (version %d)", self._config_version)
def _apply_pending_config(self) -> None:
"""Apply pending configuration update."""
# Atomically grab pending config and clear it to avoid losing concurrent updates
with self._state_lock:
if self._pending_config is None:
self._pending_config_update = False
return
pending_config = self._pending_config
self._pending_config = None # Clear while holding lock
try:
new_vegas_config = VegasModeConfig.from_config(pending_config)
# Check if enabled state changed
was_enabled = self.vegas_config.enabled
self.vegas_config = new_vegas_config
# Update components
self.render_pipeline.update_config(new_vegas_config)
self.stream_manager.config = new_vegas_config
# Force refresh of stream manager to pick up plugin_order/buffer changes
self.stream_manager._last_refresh = 0
self.stream_manager.refresh()
# Handle enable/disable
if was_enabled and not new_vegas_config.enabled:
self.stop()
elif not was_enabled and new_vegas_config.enabled:
self.start()
logger.info("Config update applied (version %d)", self._config_version)
except Exception:
logger.exception("Error applying config update")
finally:
# Only clear update flag if no new config arrived during processing
with self._state_lock:
if self._pending_config is None:
self._pending_config_update = False
def mark_plugin_updated(self, plugin_id: str) -> None:
"""
Notify that a plugin's data has been updated.
Args:
plugin_id: ID of plugin that was updated
"""
if self._is_active:
self.stream_manager.mark_plugin_updated(plugin_id)
self.plugin_adapter.invalidate_cache(plugin_id)
def get_status(self) -> Dict[str, Any]:
"""Get comprehensive Vegas mode status."""
status = {
'enabled': self.vegas_config.enabled,
'active': self._is_active,
'paused': self._is_paused,
'live_priority_active': self._live_priority_active,
'config': self.vegas_config.to_dict(),
'stats': self.stats.copy(),
}
if self._is_active:
status['render_info'] = self.render_pipeline.get_current_scroll_info()
status['stream_status'] = self.stream_manager.get_buffer_status()
return status
def get_ordered_plugins(self) -> List[str]:
"""Get the current ordered list of plugins in Vegas scroll."""
if hasattr(self.plugin_manager, 'plugins'):
available = list(self.plugin_manager.plugins.keys())
return self.vegas_config.get_ordered_plugins(available)
return []
# -------------------------------------------------------------------------
# Static pause handling (for STATIC display mode)
# -------------------------------------------------------------------------
def _check_static_plugin_trigger(self) -> Optional['BasePlugin']:
"""
Check if a STATIC mode plugin should take over display.
Called during iteration to detect when scroll should pause
for a static plugin display.
Returns:
Plugin instance if static pause should begin, None otherwise
"""
# Get the next plugin that would be displayed
next_segment = self.stream_manager.peek_next_segment()
if not next_segment:
return None
plugin_id = next_segment.plugin_id
plugin = self.plugin_manager.get_plugin(plugin_id)
if not plugin:
return None
# Check if this plugin is configured for STATIC mode
try:
display_mode = plugin.get_vegas_display_mode()
if display_mode == VegasDisplayMode.STATIC:
return plugin
except (AttributeError, TypeError):
logger.exception("Error checking vegas mode for %s", plugin_id)
return None
def _handle_static_pause(self, plugin: 'BasePlugin') -> bool:
"""
Handle a static pause - scroll pauses while plugin displays.
Args:
plugin: The STATIC mode plugin to display
Returns:
True if completed normally, False if interrupted
"""
plugin_id = plugin.plugin_id
with self._state_lock:
if self._static_pause_active:
logger.warning("Static pause already active")
return True
# Save current scroll position for smooth resume
self._saved_scroll_position = self.render_pipeline.get_scroll_position()
self._static_pause_active = True
self._static_pause_plugin = plugin
self._static_pause_start = time.time()
self.stats['static_pauses'] += 1
logger.info("Static pause started for plugin: %s", plugin_id)
# Stop scrolling indicator
self.display_manager.set_scrolling_state(False)
try:
# Display the plugin using its standard display() method
plugin.display(force_clear=True)
self.display_manager.update_display()
# Wait for the plugin's display duration
duration = plugin.get_display_duration()
start = time.time()
while time.time() - start < duration:
# Check for interruptions
if self._should_stop:
logger.info("Static pause interrupted by stop request")
return False
if self._check_live_priority():
logger.info("Static pause interrupted by live priority")
return False
# Sleep in small increments to remain responsive
time.sleep(0.1)
logger.info(
"Static pause completed for %s after %.1fs",
plugin_id, time.time() - start
)
except Exception:
logger.exception("Error during static pause for %s", plugin_id)
return False
finally:
self._end_static_pause()
return True
def _end_static_pause(self) -> None:
"""End static pause and restore scroll state."""
should_resume_scrolling = False
with self._state_lock:
# Only resume scrolling if we weren't interrupted
was_active = self._static_pause_active
should_resume_scrolling = (
was_active and
not self._should_stop and
not self._live_priority_active
)
# Clear pause state
self._static_pause_active = False
self._static_pause_plugin = None
self._static_pause_start = None
# Restore scroll position if we're resuming
if should_resume_scrolling and self._saved_scroll_position is not None:
self.render_pipeline.set_scroll_position(self._saved_scroll_position)
self._saved_scroll_position = None
# Only resume scrolling state if not interrupted
if should_resume_scrolling:
self.display_manager.set_scrolling_state(True)
logger.debug("Static pause ended, scroll resumed")
else:
logger.debug("Static pause ended (interrupted, not resuming scroll)")
def _update_static_mode_plugins(self) -> None:
"""Update the set of plugins using STATIC display mode."""
self._static_mode_plugins.clear()
for plugin_id in self.get_ordered_plugins():
plugin = self.plugin_manager.get_plugin(plugin_id)
if plugin:
try:
mode = plugin.get_vegas_display_mode()
if mode == VegasDisplayMode.STATIC:
self._static_mode_plugins.add(plugin_id)
except Exception:
logger.exception(
"Error getting vegas display mode for plugin %s",
plugin_id
)
if self._static_mode_plugins:
logger.info(
"Static mode plugins: %s",
', '.join(self._static_mode_plugins)
)
def cleanup(self) -> None:
"""Clean up all resources."""
self.stop()
self.render_pipeline.cleanup()
self.stream_manager.cleanup()
self.plugin_adapter.cleanup()
logger.info("VegasModeCoordinator cleanup complete")

View File

@@ -0,0 +1,612 @@
"""
Plugin Adapter for Vegas Mode
Converts plugin content to scrollable images. Supports both plugins that
implement get_vegas_content() and fallback capture of display() output.
"""
import logging
import threading
import time
from typing import Optional, List, Any, Tuple, Union, TYPE_CHECKING
from PIL import Image
if TYPE_CHECKING:
from src.plugin_system.base_plugin import BasePlugin
logger = logging.getLogger(__name__)
class PluginAdapter:
"""
Adapter for extracting scrollable content from plugins.
Supports two modes:
1. Native: Plugin implements get_vegas_content() returning PIL Image(s)
2. Fallback: Capture display_manager.image after calling plugin.display()
"""
def __init__(self, display_manager: Any):
"""
Initialize the plugin adapter.
Args:
display_manager: DisplayManager instance for fallback capture
"""
self.display_manager = display_manager
# Handle both property and method access patterns
self.display_width = (
display_manager.width() if callable(display_manager.width)
else display_manager.width
)
self.display_height = (
display_manager.height() if callable(display_manager.height)
else display_manager.height
)
# Cache for recently fetched content (prevents redundant fetch)
self._content_cache: dict = {}
self._cache_lock = threading.Lock()
self._cache_ttl = 5.0 # Cache for 5 seconds
logger.info(
"PluginAdapter initialized: display=%dx%d",
self.display_width, self.display_height
)
def get_content(self, plugin: 'BasePlugin', plugin_id: str) -> Optional[List[Image.Image]]:
"""
Get scrollable content from a plugin.
Tries get_vegas_content() first, falls back to display capture.
Args:
plugin: Plugin instance to get content from
plugin_id: Plugin identifier for logging
Returns:
List of PIL Images representing plugin content, or None if no content
"""
logger.info(
"[%s] Getting content (class=%s)",
plugin_id, plugin.__class__.__name__
)
# Check cache first
cached = self._get_cached(plugin_id)
if cached is not None:
total_width = sum(img.width for img in cached)
logger.info(
"[%s] Using cached content: %d images, %dpx total",
plugin_id, len(cached), total_width
)
return cached
# Try native Vegas content method first
has_native = hasattr(plugin, 'get_vegas_content')
logger.info("[%s] Has get_vegas_content: %s", plugin_id, has_native)
if has_native:
content = self._get_native_content(plugin, plugin_id)
if content:
total_width = sum(img.width for img in content)
logger.info(
"[%s] Native content SUCCESS: %d images, %dpx total",
plugin_id, len(content), total_width
)
self._cache_content(plugin_id, content)
return content
logger.info("[%s] Native content returned None", plugin_id)
# Try to get scroll_helper's cached image (for scrolling plugins like stocks/odds)
has_scroll_helper = hasattr(plugin, 'scroll_helper')
logger.info("[%s] Has scroll_helper: %s", plugin_id, has_scroll_helper)
content = self._get_scroll_helper_content(plugin, plugin_id)
if content:
total_width = sum(img.width for img in content)
logger.info(
"[%s] ScrollHelper content SUCCESS: %d images, %dpx total",
plugin_id, len(content), total_width
)
self._cache_content(plugin_id, content)
return content
if has_scroll_helper:
logger.info("[%s] ScrollHelper content returned None", plugin_id)
# Fall back to display capture
logger.info("[%s] Trying fallback display capture...", plugin_id)
content = self._capture_display_content(plugin, plugin_id)
if content:
total_width = sum(img.width for img in content)
logger.info(
"[%s] Fallback capture SUCCESS: %d images, %dpx total",
plugin_id, len(content), total_width
)
self._cache_content(plugin_id, content)
return content
logger.warning(
"[%s] NO CONTENT from any method (native=%s, scroll_helper=%s, fallback=tried)",
plugin_id, has_native, has_scroll_helper
)
return None
def _get_native_content(
self, plugin: 'BasePlugin', plugin_id: str
) -> Optional[List[Image.Image]]:
"""
Get content via plugin's native get_vegas_content() method.
Args:
plugin: Plugin instance
plugin_id: Plugin identifier
Returns:
List of images or None
"""
try:
logger.info("[%s] Native: calling get_vegas_content()", plugin_id)
result = plugin.get_vegas_content()
if result is None:
logger.info("[%s] Native: get_vegas_content() returned None", plugin_id)
return None
# Normalize to list
if isinstance(result, Image.Image):
images = [result]
logger.info(
"[%s] Native: got single Image %dx%d",
plugin_id, result.width, result.height
)
elif isinstance(result, (list, tuple)):
images = list(result)
logger.info(
"[%s] Native: got %d items in list/tuple",
plugin_id, len(images)
)
else:
logger.warning(
"[%s] Native: unexpected return type: %s",
plugin_id, type(result).__name__
)
return None
# Validate images
valid_images = []
for i, img in enumerate(images):
if not isinstance(img, Image.Image):
logger.warning(
"[%s] Native: item[%d] is not an Image: %s",
plugin_id, i, type(img).__name__
)
continue
logger.info(
"[%s] Native: item[%d] is %dx%d, mode=%s",
plugin_id, i, img.width, img.height, img.mode
)
# Ensure correct height
if img.height != self.display_height:
logger.info(
"[%s] Native: resizing item[%d]: %dx%d -> %dx%d",
plugin_id, i, img.width, img.height,
img.width, self.display_height
)
img = img.resize(
(img.width, self.display_height),
Image.Resampling.LANCZOS
)
# Convert to RGB if needed
if img.mode != 'RGB':
img = img.convert('RGB')
valid_images.append(img)
if valid_images:
total_width = sum(img.width for img in valid_images)
logger.info(
"[%s] Native: SUCCESS - %d images, %dpx total width",
plugin_id, len(valid_images), total_width
)
return valid_images
logger.info("[%s] Native: no valid images after validation", plugin_id)
return None
except (AttributeError, TypeError, ValueError, OSError) as e:
logger.exception(
"[%s] Native: ERROR calling get_vegas_content(): %s",
plugin_id, e
)
return None
def _get_scroll_helper_content(
self, plugin: 'BasePlugin', plugin_id: str
) -> Optional[List[Image.Image]]:
"""
Get content from plugin's scroll_helper if available.
Many scrolling plugins (stocks, odds) use a ScrollHelper that caches
their full scrolling image. This method extracts that image for Vegas
mode instead of falling back to single-frame capture.
Args:
plugin: Plugin instance
plugin_id: Plugin identifier
Returns:
List with the cached scroll image, or None if not available
"""
try:
# Check for scroll_helper with cached_image
scroll_helper = getattr(plugin, 'scroll_helper', None)
if scroll_helper is None:
logger.debug("[%s] No scroll_helper attribute", plugin_id)
return None
logger.info(
"[%s] Found scroll_helper: %s",
plugin_id, type(scroll_helper).__name__
)
cached_image = getattr(scroll_helper, 'cached_image', None)
if cached_image is None:
logger.info(
"[%s] scroll_helper.cached_image is None, triggering content generation",
plugin_id
)
# Try to trigger scroll content generation
cached_image = self._trigger_scroll_content_generation(
plugin, plugin_id, scroll_helper
)
if cached_image is None:
return None
if not isinstance(cached_image, Image.Image):
logger.info(
"[%s] scroll_helper.cached_image is not an Image: %s",
plugin_id, type(cached_image).__name__
)
return None
logger.info(
"[%s] scroll_helper.cached_image found: %dx%d, mode=%s",
plugin_id, cached_image.width, cached_image.height, cached_image.mode
)
# Copy the image to prevent modification
img = cached_image.copy()
# Ensure correct height
if img.height != self.display_height:
logger.info(
"[%s] Resizing scroll_helper content: %dx%d -> %dx%d",
plugin_id, img.width, img.height,
img.width, self.display_height
)
img = img.resize(
(img.width, self.display_height),
Image.Resampling.LANCZOS
)
# Convert to RGB if needed
if img.mode != 'RGB':
img = img.convert('RGB')
logger.info(
"[%s] ScrollHelper content ready: %dx%d",
plugin_id, img.width, img.height
)
return [img]
except (AttributeError, TypeError, ValueError, OSError):
logger.exception("[%s] Error getting scroll_helper content", plugin_id)
return None
def _trigger_scroll_content_generation(
self, plugin: 'BasePlugin', plugin_id: str, scroll_helper: Any
) -> Optional[Image.Image]:
"""
Trigger scroll content generation for plugins that haven't built it yet.
Tries multiple approaches:
1. _create_scrolling_display() - stocks plugin pattern
2. display(force_clear=True) - general pattern that populates scroll cache
Args:
plugin: Plugin instance
plugin_id: Plugin identifier
scroll_helper: Plugin's scroll_helper instance
Returns:
The generated cached_image or None
"""
original_image = None
try:
# Save display state to restore after
original_image = self.display_manager.image.copy()
# Method 1: Try _create_scrolling_display (stocks pattern)
if hasattr(plugin, '_create_scrolling_display'):
logger.info(
"[%s] Triggering via _create_scrolling_display()",
plugin_id
)
try:
plugin._create_scrolling_display()
cached_image = getattr(scroll_helper, 'cached_image', None)
if cached_image is not None and isinstance(cached_image, Image.Image):
logger.info(
"[%s] _create_scrolling_display() SUCCESS: %dx%d",
plugin_id, cached_image.width, cached_image.height
)
return cached_image
except (AttributeError, TypeError, ValueError, OSError):
logger.exception(
"[%s] _create_scrolling_display() failed", plugin_id
)
# Method 2: Try display(force_clear=True) which typically builds scroll content
if hasattr(plugin, 'display'):
logger.info(
"[%s] Triggering via display(force_clear=True)",
plugin_id
)
try:
self.display_manager.clear()
plugin.display(force_clear=True)
cached_image = getattr(scroll_helper, 'cached_image', None)
if cached_image is not None and isinstance(cached_image, Image.Image):
logger.info(
"[%s] display(force_clear=True) SUCCESS: %dx%d",
plugin_id, cached_image.width, cached_image.height
)
return cached_image
logger.info(
"[%s] display(force_clear=True) did not populate cached_image",
plugin_id
)
except (AttributeError, TypeError, ValueError, OSError):
logger.exception(
"[%s] display(force_clear=True) failed", plugin_id
)
logger.info(
"[%s] Could not trigger scroll content generation",
plugin_id
)
return None
except (AttributeError, TypeError, ValueError, OSError):
logger.exception("[%s] Error triggering scroll content", plugin_id)
return None
finally:
# Restore original display state
if original_image is not None:
self.display_manager.image = original_image
def _capture_display_content(
self, plugin: 'BasePlugin', plugin_id: str
) -> Optional[List[Image.Image]]:
"""
Capture content by calling plugin.display() and grabbing the frame.
Args:
plugin: Plugin instance
plugin_id: Plugin identifier
Returns:
List with single captured image, or None
"""
original_image = None
try:
# Save current display state
original_image = self.display_manager.image.copy()
logger.info("[%s] Fallback: saved original display state", plugin_id)
# Ensure plugin has fresh data before capturing
has_update_data = hasattr(plugin, 'update_data')
logger.info("[%s] Fallback: has update_data=%s", plugin_id, has_update_data)
if has_update_data:
try:
plugin.update_data()
logger.info("[%s] Fallback: update_data() called", plugin_id)
except (AttributeError, RuntimeError, OSError):
logger.exception("[%s] Fallback: update_data() failed", plugin_id)
# Clear and call plugin display
self.display_manager.clear()
logger.info("[%s] Fallback: display cleared, calling display()", plugin_id)
# First try without force_clear (some plugins behave better this way)
try:
plugin.display()
logger.info("[%s] Fallback: display() called successfully", plugin_id)
except TypeError:
# Plugin may require force_clear argument
logger.info("[%s] Fallback: display() failed, trying with force_clear=True", plugin_id)
plugin.display(force_clear=True)
# Capture the result
captured = self.display_manager.image.copy()
logger.info(
"[%s] Fallback: captured frame %dx%d, mode=%s",
plugin_id, captured.width, captured.height, captured.mode
)
# Check if captured image has content (not all black)
is_blank, bright_ratio = self._is_blank_image(captured, return_ratio=True)
logger.info(
"[%s] Fallback: brightness check - %.3f%% bright pixels (threshold=0.5%%)",
plugin_id, bright_ratio * 100
)
if is_blank:
logger.info(
"[%s] Fallback: first capture blank, retrying with force_clear",
plugin_id
)
# Try once more with force_clear=True
self.display_manager.clear()
plugin.display(force_clear=True)
captured = self.display_manager.image.copy()
is_blank, bright_ratio = self._is_blank_image(captured, return_ratio=True)
logger.info(
"[%s] Fallback: retry brightness - %.3f%% bright pixels",
plugin_id, bright_ratio * 100
)
if is_blank:
logger.warning(
"[%s] Fallback: BLANK IMAGE after retry (%.3f%% bright, size=%dx%d)",
plugin_id, bright_ratio * 100,
captured.width, captured.height
)
return None
# Convert to RGB if needed
if captured.mode != 'RGB':
captured = captured.convert('RGB')
logger.info(
"[%s] Fallback: SUCCESS - captured %dx%d",
plugin_id, captured.width, captured.height
)
return [captured]
except (AttributeError, TypeError, ValueError, OSError, RuntimeError) as e:
logger.exception(
"[%s] Fallback: ERROR capturing display: %s",
plugin_id, e
)
return None
finally:
# Always restore original image to prevent display corruption
if original_image is not None:
self.display_manager.image = original_image
logger.debug("[%s] Fallback: restored original display state", plugin_id)
def _is_blank_image(
self, img: Image.Image, return_ratio: bool = False
) -> Union[bool, Tuple[bool, float]]:
"""
Check if an image is essentially blank (all black or nearly so).
Uses histogram-based detection which is more reliable than
point sampling for content that may be positioned anywhere.
Args:
img: Image to check
return_ratio: If True, return tuple of (is_blank, bright_ratio)
Returns:
True if image is blank, or tuple (is_blank, bright_ratio) if return_ratio=True
"""
# Convert to RGB for consistent checking
if img.mode != 'RGB':
img = img.convert('RGB')
# Use histogram to check for any non-black content
# This is more reliable than point sampling
histogram = img.histogram()
# RGB histogram: 256 values per channel
# Check if there's any significant brightness in any channel
total_bright_pixels = 0
threshold = 15 # Minimum brightness to count as "content"
for channel_offset in [0, 256, 512]: # R, G, B
for brightness in range(threshold, 256):
total_bright_pixels += histogram[channel_offset + brightness]
# If less than 0.5% of pixels have any brightness, consider blank
total_pixels = img.width * img.height
bright_ratio = total_bright_pixels / (total_pixels * 3) # Normalize across channels
is_blank = bright_ratio < 0.005 # Less than 0.5% bright pixels
if return_ratio:
return is_blank, bright_ratio
return is_blank
def _get_cached(self, plugin_id: str) -> Optional[List[Image.Image]]:
"""Get cached content if still valid."""
with self._cache_lock:
if plugin_id not in self._content_cache:
return None
cached_time, content = self._content_cache[plugin_id]
if time.time() - cached_time > self._cache_ttl:
del self._content_cache[plugin_id]
return None
return content
def _cache_content(self, plugin_id: str, content: List[Image.Image]) -> None:
"""Cache content for a plugin."""
# Make copies to prevent mutation (done outside lock to minimize hold time)
cached_content = [img.copy() for img in content]
with self._cache_lock:
# Periodic cleanup of expired entries to prevent memory leak
self._cleanup_expired_cache_locked()
self._content_cache[plugin_id] = (time.time(), cached_content)
def _cleanup_expired_cache_locked(self) -> None:
"""Remove expired entries from cache. Must be called with _cache_lock held."""
current_time = time.time()
expired_keys = [
key for key, (cached_time, _) in self._content_cache.items()
if current_time - cached_time > self._cache_ttl
]
for key in expired_keys:
del self._content_cache[key]
def invalidate_cache(self, plugin_id: Optional[str] = None) -> None:
"""
Invalidate cached content.
Args:
plugin_id: Specific plugin to invalidate, or None for all
"""
with self._cache_lock:
if plugin_id:
self._content_cache.pop(plugin_id, None)
else:
self._content_cache.clear()
def get_content_type(self, plugin: 'BasePlugin', plugin_id: str) -> str:
"""
Get the type of content a plugin provides.
Args:
plugin: Plugin instance
plugin_id: Plugin identifier
Returns:
'multi' for multiple items, 'static' for single frame, 'none' for excluded
"""
if hasattr(plugin, 'get_vegas_content_type'):
try:
return plugin.get_vegas_content_type()
except (AttributeError, TypeError, ValueError):
logger.exception(
"Error calling get_vegas_content_type() on %s",
plugin_id
)
# Default to static for plugins without explicit type
return 'static'
def cleanup(self) -> None:
"""Clean up resources."""
with self._cache_lock:
self._content_cache.clear()
logger.debug("PluginAdapter cleanup complete")

View File

@@ -0,0 +1,399 @@
"""
Render Pipeline for Vegas Mode
Handles high-FPS (125 FPS) rendering with double-buffering for smooth scrolling.
Uses the existing ScrollHelper for numpy-optimized scroll operations.
"""
import logging
import time
import threading
from collections import deque
from typing import Optional, List, Any, Dict, Deque, TYPE_CHECKING
from PIL import Image
import numpy as np
from src.common.scroll_helper import ScrollHelper
from src.vegas_mode.config import VegasModeConfig
from src.vegas_mode.stream_manager import StreamManager, ContentSegment
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class RenderPipeline:
"""
High-performance render pipeline for Vegas scroll mode.
Key responsibilities:
- Compose content segments into scrollable image
- Manage scroll position and velocity
- Handle 125 FPS rendering loop
- Double-buffer for hot-swap during updates
- Track scroll cycle completion
"""
def __init__(
self,
config: VegasModeConfig,
display_manager: Any,
stream_manager: StreamManager
):
"""
Initialize the render pipeline.
Args:
config: Vegas mode configuration
display_manager: DisplayManager for rendering
stream_manager: StreamManager for content
"""
self.config = config
self.display_manager = display_manager
self.stream_manager = stream_manager
# Display dimensions (handle both property and method access patterns)
self.display_width = (
display_manager.width() if callable(display_manager.width)
else display_manager.width
)
self.display_height = (
display_manager.height() if callable(display_manager.height)
else display_manager.height
)
# ScrollHelper for optimized scrolling
self.scroll_helper = ScrollHelper(
self.display_width,
self.display_height,
logger
)
# Configure scroll helper
self._configure_scroll_helper()
# Double-buffer for composed images
self._active_scroll_image: Optional[Image.Image] = None
self._staging_scroll_image: Optional[Image.Image] = None
self._buffer_lock = threading.Lock()
# Render state
self._is_rendering = False
self._cycle_complete = False
self._segments_in_scroll: List[str] = [] # Plugin IDs in current scroll
# Timing
self._last_frame_time = 0.0
self._frame_interval = config.get_frame_interval()
self._cycle_start_time = 0.0
# Statistics
self.stats = {
'frames_rendered': 0,
'scroll_cycles': 0,
'composition_count': 0,
'hot_swaps': 0,
'avg_frame_time_ms': 0.0,
}
self._frame_times: Deque[float] = deque(maxlen=100) # Efficient fixed-size buffer
logger.info(
"RenderPipeline initialized: %dx%d @ %d FPS",
self.display_width, self.display_height, config.target_fps
)
def _configure_scroll_helper(self) -> None:
"""Configure ScrollHelper with current settings."""
self.scroll_helper.set_frame_based_scrolling(self.config.frame_based_scrolling)
self.scroll_helper.set_scroll_delay(self.config.scroll_delay)
# Config scroll_speed is always pixels per second, but ScrollHelper
# interprets it differently based on frame_based_scrolling mode:
# - Frame-based: pixels per frame step
# - Time-based: pixels per second
if self.config.frame_based_scrolling:
# Convert pixels/second to pixels/frame
# pixels_per_frame = pixels_per_second * seconds_per_frame
pixels_per_frame = self.config.scroll_speed * self.config.scroll_delay
self.scroll_helper.set_scroll_speed(pixels_per_frame)
else:
self.scroll_helper.set_scroll_speed(self.config.scroll_speed)
self.scroll_helper.set_dynamic_duration_settings(
enabled=self.config.dynamic_duration_enabled,
min_duration=self.config.min_cycle_duration,
max_duration=self.config.max_cycle_duration,
buffer=0.1 # 10% buffer
)
def compose_scroll_content(self) -> bool:
"""
Compose content from stream manager into scrollable image.
Returns:
True if composition successful
"""
try:
# Get all buffered content
images = self.stream_manager.get_all_content_for_composition()
if not images:
logger.warning("No content available for composition")
return False
# Add separator gaps between images
content_with_gaps = []
for i, img in enumerate(images):
content_with_gaps.append(img)
# Create scrolling image via ScrollHelper
self.scroll_helper.create_scrolling_image(
content_items=content_with_gaps,
item_gap=self.config.separator_width,
element_gap=0
)
# Verify scroll image was created successfully
if not self.scroll_helper.cached_image:
logger.error("ScrollHelper failed to create cached image")
return False
# Store reference to composed image
with self._buffer_lock:
self._active_scroll_image = self.scroll_helper.cached_image
# Track which plugins are in this scroll (get safely via buffer status)
self._segments_in_scroll = self.stream_manager.get_active_plugin_ids()
self.stats['composition_count'] += 1
self._cycle_start_time = time.time()
self._cycle_complete = False
logger.info(
"Composed scroll image: %dx%d, %d plugins, %d items",
self.scroll_helper.cached_image.width if self.scroll_helper.cached_image else 0,
self.display_height,
len(self._segments_in_scroll),
len(images)
)
return True
except (ValueError, TypeError, OSError, RuntimeError):
# Expected errors from image operations, scroll helper, or bad data
logger.exception("Error composing scroll content")
return False
def render_frame(self) -> bool:
"""
Render a single frame to the display.
Should be called at ~125 FPS (8ms intervals).
Returns:
True if frame was rendered, False if no content
"""
frame_start = time.time()
try:
if not self.scroll_helper.cached_image:
return False
# Update scroll position
self.scroll_helper.update_scroll_position()
# Check if cycle is complete
if self.scroll_helper.is_scroll_complete():
if not self._cycle_complete:
self._cycle_complete = True
self.stats['scroll_cycles'] += 1
logger.info(
"Scroll cycle complete after %.1fs",
time.time() - self._cycle_start_time
)
# Get visible portion
visible_frame = self.scroll_helper.get_visible_portion()
if not visible_frame:
return False
# Render to display
self.display_manager.image = visible_frame
self.display_manager.update_display()
# Update scrolling state
self.display_manager.set_scrolling_state(True)
# Track statistics
self.stats['frames_rendered'] += 1
frame_time = time.time() - frame_start
self._track_frame_time(frame_time)
return True
except (ValueError, TypeError, OSError, RuntimeError):
# Expected errors from scroll helper or display manager operations
logger.exception("Error rendering frame")
return False
def _track_frame_time(self, frame_time: float) -> None:
"""Track frame timing for statistics."""
self._frame_times.append(frame_time) # deque with maxlen auto-removes old entries
if self._frame_times:
self.stats['avg_frame_time_ms'] = (
sum(self._frame_times) / len(self._frame_times) * 1000
)
def is_cycle_complete(self) -> bool:
"""Check if current scroll cycle is complete."""
return self._cycle_complete
def should_recompose(self) -> bool:
"""
Check if scroll content should be recomposed.
Returns True when:
- Cycle is complete and we should start fresh
- Staging buffer has new content
"""
if self._cycle_complete:
return True
# Check if we need more content in the buffer
buffer_status = self.stream_manager.get_buffer_status()
if buffer_status['staging_count'] > 0:
return True
return False
def hot_swap_content(self) -> bool:
"""
Hot-swap to new composed content.
Called when staging buffer has updated content.
Swaps atomically to prevent visual glitches.
Returns:
True if swap occurred
"""
try:
# Process any pending updates
self.stream_manager.process_updates()
self.stream_manager.swap_buffers()
# Recompose with updated content
if self.compose_scroll_content():
self.stats['hot_swaps'] += 1
logger.debug("Hot-swap completed")
return True
return False
except (ValueError, TypeError, OSError, RuntimeError):
# Expected errors from stream manager or composition operations
logger.exception("Error during hot-swap")
return False
def start_new_cycle(self) -> bool:
"""
Start a new scroll cycle.
Fetches fresh content and recomposes.
Returns:
True if new cycle started successfully
"""
# Reset scroll position
self.scroll_helper.reset_scroll()
self._cycle_complete = False
# Clear buffer from previous cycle so new content is fetched
self.stream_manager.advance_cycle()
# Refresh stream content (picks up plugin list changes)
self.stream_manager.refresh()
# Reinitialize stream (fills buffer with fresh content)
if not self.stream_manager.initialize():
logger.warning("Failed to reinitialize stream for new cycle")
return False
# Compose new scroll content
return self.compose_scroll_content()
def get_current_scroll_info(self) -> Dict[str, Any]:
"""Get current scroll state information."""
scroll_info = self.scroll_helper.get_scroll_info()
return {
**scroll_info,
'cycle_complete': self._cycle_complete,
'plugins_in_scroll': self._segments_in_scroll,
'stats': self.stats.copy(),
}
def get_scroll_position(self) -> int:
"""
Get current scroll position.
Used by coordinator to save position before static pause.
Returns:
Current scroll position in pixels
"""
return int(self.scroll_helper.scroll_position)
def set_scroll_position(self, position: int) -> None:
"""
Set scroll position.
Used by coordinator to restore position after static pause.
Args:
position: Scroll position in pixels
"""
self.scroll_helper.scroll_position = float(position)
def update_config(self, new_config: VegasModeConfig) -> None:
"""
Update render pipeline configuration.
Args:
new_config: New configuration to apply
"""
old_fps = self.config.target_fps
self.config = new_config
self._frame_interval = new_config.get_frame_interval()
# Reconfigure scroll helper
self._configure_scroll_helper()
if old_fps != new_config.target_fps:
logger.info("FPS target updated: %d -> %d", old_fps, new_config.target_fps)
def reset(self) -> None:
"""Reset the render pipeline state."""
self.scroll_helper.reset_scroll()
self.scroll_helper.clear_cache()
with self._buffer_lock:
self._active_scroll_image = None
self._staging_scroll_image = None
self._cycle_complete = False
self._segments_in_scroll = []
self._frame_times = deque(maxlen=100)
self.display_manager.set_scrolling_state(False)
logger.info("RenderPipeline reset")
def cleanup(self) -> None:
"""Clean up resources."""
self.reset()
self.display_manager.set_scrolling_state(False)
logger.debug("RenderPipeline cleanup complete")
def get_dynamic_duration(self) -> float:
"""Get the calculated dynamic duration for current content."""
return float(self.scroll_helper.get_dynamic_duration())

View File

@@ -0,0 +1,554 @@
"""
Stream Manager for Vegas Mode
Manages plugin content streaming with look-ahead buffering. Maintains a queue
of plugin content that's ready to be rendered, prefetching 1-2 plugins ahead
of the current scroll position.
Supports three display modes:
- SCROLL: Continuous scrolling content
- FIXED_SEGMENT: Fixed block that scrolls by
- STATIC: Pause scroll to display (marked for coordinator handling)
"""
import logging
import threading
import time
from typing import Optional, List, Dict, Any, Deque, Tuple, TYPE_CHECKING
from collections import deque
from dataclasses import dataclass, field
from PIL import Image
from src.vegas_mode.config import VegasModeConfig
from src.vegas_mode.plugin_adapter import PluginAdapter
from src.plugin_system.base_plugin import VegasDisplayMode
if TYPE_CHECKING:
from src.plugin_system.base_plugin import BasePlugin
from src.plugin_system.plugin_manager import PluginManager
logger = logging.getLogger(__name__)
@dataclass
class ContentSegment:
"""Represents a segment of scrollable content from a plugin."""
plugin_id: str
images: List[Image.Image]
total_width: int
display_mode: VegasDisplayMode = field(default=VegasDisplayMode.FIXED_SEGMENT)
fetched_at: float = field(default_factory=time.time)
is_stale: bool = False
@property
def image_count(self) -> int:
return len(self.images)
@property
def is_static(self) -> bool:
"""Check if this segment should trigger a static pause."""
return self.display_mode == VegasDisplayMode.STATIC
class StreamManager:
"""
Manages streaming of plugin content for Vegas scroll mode.
Key responsibilities:
- Maintain ordered list of plugins to stream
- Prefetch content 1-2 plugins ahead of current position
- Handle plugin data updates via double-buffer swap
- Manage content lifecycle and staleness
"""
def __init__(
self,
config: VegasModeConfig,
plugin_manager: 'PluginManager',
plugin_adapter: PluginAdapter
):
"""
Initialize the stream manager.
Args:
config: Vegas mode configuration
plugin_manager: Plugin manager for accessing plugins
plugin_adapter: Adapter for getting plugin content
"""
self.config = config
self.plugin_manager = plugin_manager
self.plugin_adapter = plugin_adapter
# Content queue (double-buffered)
self._active_buffer: Deque[ContentSegment] = deque()
self._staging_buffer: Deque[ContentSegment] = deque()
self._buffer_lock = threading.RLock() # RLock for reentrant access
# Plugin rotation state
self._ordered_plugins: List[str] = []
self._current_index: int = 0
self._prefetch_index: int = 0
# Update tracking
self._pending_updates: Dict[str, bool] = {}
self._last_refresh: float = 0.0
self._refresh_interval: float = 30.0 # Refresh plugin list every 30s
# Statistics
self.stats = {
'segments_fetched': 0,
'segments_served': 0,
'buffer_swaps': 0,
'fetch_errors': 0,
}
logger.info("StreamManager initialized with buffer_ahead=%d", config.buffer_ahead)
def initialize(self) -> bool:
"""
Initialize the stream manager with current plugin list.
Returns:
True if initialized successfully with at least one plugin
"""
self._refresh_plugin_list()
if not self._ordered_plugins:
logger.warning("No plugins available for Vegas scroll")
return False
# Prefetch initial content
self._prefetch_content(count=min(self.config.buffer_ahead + 1, len(self._ordered_plugins)))
logger.info(
"StreamManager initialized with %d plugins, %d segments buffered",
len(self._ordered_plugins), len(self._active_buffer)
)
return len(self._active_buffer) > 0
def get_next_segment(self) -> Optional[ContentSegment]:
"""
Get the next content segment for rendering.
Returns:
ContentSegment or None if buffer is empty
"""
with self._buffer_lock:
if not self._active_buffer:
# Try to fetch more content
self._prefetch_content(count=1)
if not self._active_buffer:
return None
segment = self._active_buffer.popleft()
self.stats['segments_served'] += 1
# Trigger prefetch to maintain buffer
self._ensure_buffer_filled()
return segment
def peek_next_segment(self) -> Optional[ContentSegment]:
"""
Peek at the next segment without removing it.
Returns:
ContentSegment or None if buffer is empty
"""
with self._buffer_lock:
if self._active_buffer:
return self._active_buffer[0]
return None
def get_buffer_status(self) -> Dict[str, Any]:
"""Get current buffer status for monitoring."""
with self._buffer_lock:
return {
'active_count': len(self._active_buffer),
'staging_count': len(self._staging_buffer),
'total_plugins': len(self._ordered_plugins),
'current_index': self._current_index,
'prefetch_index': self._prefetch_index,
'stats': self.stats.copy(),
}
def get_active_plugin_ids(self) -> List[str]:
"""
Get list of plugin IDs currently in the active buffer.
Thread-safe accessor for render pipeline.
Returns:
List of plugin IDs in buffer order
"""
with self._buffer_lock:
return [seg.plugin_id for seg in self._active_buffer]
def mark_plugin_updated(self, plugin_id: str) -> None:
"""
Mark a plugin as having updated data.
Called when a plugin's data changes. Triggers content refresh
for that plugin in the staging buffer.
Args:
plugin_id: Plugin that was updated
"""
with self._buffer_lock:
self._pending_updates[plugin_id] = True
logger.debug("Plugin %s marked for update", plugin_id)
def process_updates(self) -> None:
"""
Process pending plugin updates.
Performs in-place update of segments in the active buffer,
preserving non-updated plugins and their order.
"""
with self._buffer_lock:
if not self._pending_updates:
return
updated_plugins = list(self._pending_updates.keys())
self._pending_updates.clear()
# Fetch fresh content for each updated plugin (outside lock for slow ops)
refreshed_segments = {}
for plugin_id in updated_plugins:
self.plugin_adapter.invalidate_cache(plugin_id)
segment = self._fetch_plugin_content(plugin_id)
if segment:
refreshed_segments[plugin_id] = segment
# In-place merge: replace segments in active buffer
with self._buffer_lock:
# Build new buffer preserving order, replacing updated segments
new_buffer: Deque[ContentSegment] = deque()
seen_plugins: set = set()
for segment in self._active_buffer:
if segment.plugin_id in refreshed_segments:
# Replace with refreshed segment (only once per plugin)
if segment.plugin_id not in seen_plugins:
new_buffer.append(refreshed_segments[segment.plugin_id])
seen_plugins.add(segment.plugin_id)
# Skip duplicate entries for same plugin
else:
# Keep non-updated segment
new_buffer.append(segment)
self._active_buffer = new_buffer
logger.debug("Processed in-place updates for %d plugins", len(updated_plugins))
def swap_buffers(self) -> None:
"""
Swap active and staging buffers.
Called when staging buffer has updated content ready.
"""
with self._buffer_lock:
if self._staging_buffer:
# True swap: staging becomes active, old active is discarded
self._active_buffer, self._staging_buffer = self._staging_buffer, deque()
self.stats['buffer_swaps'] += 1
logger.debug("Swapped buffers, active now has %d segments", len(self._active_buffer))
def refresh(self) -> None:
"""
Refresh the plugin list and content.
Called periodically to pick up new plugins or config changes.
"""
current_time = time.time()
if current_time - self._last_refresh < self._refresh_interval:
return
self._last_refresh = current_time
old_count = len(self._ordered_plugins)
self._refresh_plugin_list()
if len(self._ordered_plugins) != old_count:
logger.info(
"Plugin list refreshed: %d -> %d plugins",
old_count, len(self._ordered_plugins)
)
def _refresh_plugin_list(self) -> None:
"""Refresh the ordered list of plugins from plugin manager."""
logger.info("=" * 60)
logger.info("REFRESHING PLUGIN LIST FOR VEGAS SCROLL")
logger.info("=" * 60)
# Get all enabled plugins
available_plugins = []
if hasattr(self.plugin_manager, 'plugins'):
logger.info(
"Checking %d loaded plugins for Vegas scroll",
len(self.plugin_manager.plugins)
)
for plugin_id, plugin in self.plugin_manager.plugins.items():
has_enabled = hasattr(plugin, 'enabled')
is_enabled = getattr(plugin, 'enabled', False)
logger.info(
"[%s] class=%s, has_enabled=%s, enabled=%s",
plugin_id, plugin.__class__.__name__, has_enabled, is_enabled
)
if has_enabled and is_enabled:
# Check vegas content type - skip 'none' unless in STATIC mode
content_type = self.plugin_adapter.get_content_type(plugin, plugin_id)
# Also check display mode - STATIC plugins should be included
# even if their content_type is 'none'
display_mode = VegasDisplayMode.FIXED_SEGMENT
try:
display_mode = plugin.get_vegas_display_mode()
except Exception:
# Plugin error should not abort refresh; use default mode
logger.exception(
"[%s] (%s) get_vegas_display_mode() failed, using default",
plugin_id, plugin.__class__.__name__
)
logger.info(
"[%s] content_type=%s, display_mode=%s",
plugin_id, content_type, display_mode.value
)
if content_type != 'none' or display_mode == VegasDisplayMode.STATIC:
available_plugins.append(plugin_id)
logger.info("[%s] --> INCLUDED in Vegas scroll", plugin_id)
else:
logger.info("[%s] --> EXCLUDED from Vegas scroll", plugin_id)
else:
logger.info("[%s] --> SKIPPED (not enabled)", plugin_id)
else:
logger.warning(
"plugin_manager does not have plugins attribute: %s",
type(self.plugin_manager).__name__
)
# Apply ordering from config (outside lock for potentially slow operation)
ordered_plugins = self.config.get_ordered_plugins(available_plugins)
logger.info(
"Vegas scroll plugin list: %d available -> %d ordered",
len(available_plugins), len(ordered_plugins)
)
logger.info("Ordered plugins: %s", ordered_plugins)
# Atomically update shared state under lock to avoid races with prefetchers
with self._buffer_lock:
self._ordered_plugins = ordered_plugins
# Reset indices if needed
if self._current_index >= len(self._ordered_plugins):
self._current_index = 0
if self._prefetch_index >= len(self._ordered_plugins):
self._prefetch_index = 0
logger.info("=" * 60)
def _prefetch_content(self, count: int = 1) -> None:
"""
Prefetch content for upcoming plugins.
Args:
count: Number of plugins to prefetch
"""
with self._buffer_lock:
if not self._ordered_plugins:
return
for _ in range(count):
if len(self._active_buffer) >= self.config.buffer_ahead + 1:
break
# Ensure index is valid (guard against empty list)
num_plugins = len(self._ordered_plugins)
if num_plugins == 0:
break
plugin_id = self._ordered_plugins[self._prefetch_index]
# Release lock for potentially slow content fetch
self._buffer_lock.release()
try:
segment = self._fetch_plugin_content(plugin_id)
finally:
self._buffer_lock.acquire()
if segment:
self._active_buffer.append(segment)
# Revalidate num_plugins after reacquiring lock (may have changed)
num_plugins = len(self._ordered_plugins)
if num_plugins == 0:
break
# Advance prefetch index (thread-safe within lock)
self._prefetch_index = (self._prefetch_index + 1) % num_plugins
def _fetch_plugin_content(self, plugin_id: str) -> Optional[ContentSegment]:
"""
Fetch content from a specific plugin.
Args:
plugin_id: Plugin to fetch from
Returns:
ContentSegment or None if fetch failed
"""
try:
logger.info("=" * 60)
logger.info("[%s] FETCHING CONTENT", plugin_id)
logger.info("=" * 60)
# Get plugin instance
if not hasattr(self.plugin_manager, 'plugins'):
logger.warning("[%s] plugin_manager has no plugins attribute", plugin_id)
return None
plugin = self.plugin_manager.plugins.get(plugin_id)
if not plugin:
logger.warning("[%s] Plugin not found in plugin_manager.plugins", plugin_id)
return None
logger.info(
"[%s] Plugin found: class=%s, enabled=%s",
plugin_id, plugin.__class__.__name__, getattr(plugin, 'enabled', 'N/A')
)
# Get display mode from plugin
display_mode = VegasDisplayMode.FIXED_SEGMENT
try:
display_mode = plugin.get_vegas_display_mode()
logger.info("[%s] Display mode: %s", plugin_id, display_mode.value)
except (AttributeError, TypeError) as e:
logger.info(
"[%s] get_vegas_display_mode() not available: %s (using FIXED_SEGMENT)",
plugin_id, e
)
# For STATIC mode, we create a placeholder segment
# The actual content will be displayed by coordinator during pause
if display_mode == VegasDisplayMode.STATIC:
# Create minimal placeholder - coordinator handles actual display
segment = ContentSegment(
plugin_id=plugin_id,
images=[], # No images needed for static pause
total_width=0,
display_mode=display_mode
)
self.stats['segments_fetched'] += 1
logger.info(
"[%s] Created STATIC placeholder (pause trigger)",
plugin_id
)
return segment
# Get content via adapter for SCROLL/FIXED_SEGMENT modes
logger.info("[%s] Calling plugin_adapter.get_content()...", plugin_id)
images = self.plugin_adapter.get_content(plugin, plugin_id)
if not images:
logger.warning("[%s] NO CONTENT RETURNED from plugin_adapter", plugin_id)
return None
# Calculate total width
total_width = sum(img.width for img in images)
segment = ContentSegment(
plugin_id=plugin_id,
images=images,
total_width=total_width,
display_mode=display_mode
)
self.stats['segments_fetched'] += 1
logger.info(
"[%s] SEGMENT CREATED: %d images, %dpx total, mode=%s",
plugin_id, len(images), total_width, display_mode.value
)
logger.info("=" * 60)
return segment
except Exception:
logger.exception("[%s] ERROR fetching content", plugin_id)
self.stats['fetch_errors'] += 1
return None
def _refresh_plugin_content(self, plugin_id: str) -> None:
"""
Refresh content for a specific plugin into staging buffer.
Args:
plugin_id: Plugin to refresh
"""
# Invalidate cached content
self.plugin_adapter.invalidate_cache(plugin_id)
# Fetch fresh content
segment = self._fetch_plugin_content(plugin_id)
if segment:
with self._buffer_lock:
self._staging_buffer.append(segment)
logger.debug("Refreshed content for %s in staging buffer", plugin_id)
def _ensure_buffer_filled(self) -> None:
"""Ensure buffer has enough content prefetched."""
if len(self._active_buffer) < self.config.buffer_ahead:
needed = self.config.buffer_ahead - len(self._active_buffer)
self._prefetch_content(count=needed)
def get_all_content_for_composition(self) -> List[Image.Image]:
"""
Get all buffered content as a flat list of images.
Used when composing the full scroll image.
Skips STATIC segments as they don't have images to compose.
Returns:
List of all images in buffer order
"""
all_images = []
with self._buffer_lock:
for segment in self._active_buffer:
# Skip STATIC segments - they trigger pauses, not scroll content
if segment.display_mode != VegasDisplayMode.STATIC:
all_images.extend(segment.images)
return all_images
def advance_cycle(self) -> None:
"""
Advance to next cycle by clearing the active buffer.
Called when a scroll cycle completes to allow fresh content
to be fetched for the next cycle. Does not reset indices,
so prefetching continues from the current position in the
plugin order.
"""
with self._buffer_lock:
consumed_count = len(self._active_buffer)
self._active_buffer.clear()
logger.debug("Advanced cycle, cleared %d segments", consumed_count)
def reset(self) -> None:
"""Reset the stream manager state."""
with self._buffer_lock:
self._active_buffer.clear()
self._staging_buffer.clear()
self._current_index = 0
self._prefetch_index = 0
self._pending_updates.clear()
self.plugin_adapter.invalidate_cache()
logger.info("StreamManager reset")
def cleanup(self) -> None:
"""Clean up resources."""
self.reset()
self.plugin_adapter.cleanup()
logger.debug("StreamManager cleanup complete")