""" Starlark Apps Plugin for LEDMatrix Manages and displays Starlark (.star) apps from Tronbyte/Tidbyt community. Provides seamless widget import without modification. API Version: 1.0.0 """ import json import os import re import time import fcntl from pathlib import Path from typing import Dict, Any, Optional, List, Tuple from PIL import Image from src.plugin_system.base_plugin import BasePlugin, VegasDisplayMode from src.logging_config import get_logger from pixlet_renderer import PixletRenderer from frame_extractor import FrameExtractor logger = get_logger(__name__) class StarlarkApp: """Represents a single installed Starlark app.""" def __init__(self, app_id: str, app_dir: Path, manifest: Dict[str, Any]): """ Initialize a Starlark app instance. Args: app_id: Unique identifier for this app app_dir: Directory containing the app files manifest: App metadata from manifest """ self.app_id = app_id self.app_dir = app_dir self.manifest = manifest self.star_file = app_dir / manifest.get("star_file", f"{app_id}.star") self.config_file = app_dir / "config.json" self.schema_file = app_dir / "schema.json" self.cache_file = app_dir / "cached_render.webp" # Load app configuration and schema self.config = self._load_config() self.schema = self._load_schema() # Merge schema defaults into config for any missing fields self._merge_schema_defaults() # Runtime state self.frames: Optional[List[Tuple[Image.Image, int]]] = None self.current_frame_index = 0 self.last_frame_time = 0 self.last_render_time = 0 def _load_config(self) -> Dict[str, Any]: """Load app configuration from config.json.""" if self.config_file.exists(): try: with open(self.config_file, 'r') as f: return json.load(f) except (OSError, json.JSONDecodeError) as e: logger.warning(f"Could not load config for {self.app_id}: {e}") return {} def _load_schema(self) -> Optional[Dict[str, Any]]: """Load app schema from schema.json.""" if self.schema_file.exists(): try: with open(self.schema_file, 'r') as f: return json.load(f) except (OSError, json.JSONDecodeError) as e: logger.warning(f"Could not load schema for {self.app_id}: {e}") return None def _merge_schema_defaults(self) -> None: """ Merge schema default values into config for any missing fields. This ensures existing apps get defaults when schemas are updated with new fields. """ if not self.schema: return # Get fields from schema (handles both 'fields' and 'schema' keys) fields = self.schema.get('fields') or self.schema.get('schema') or [] defaults_added = False for field in fields: if isinstance(field, dict) and 'id' in field and 'default' in field: field_id = field['id'] # Only add if not already present in config if field_id not in self.config: self.config[field_id] = field['default'] defaults_added = True logger.debug(f"Added default value for {self.app_id}.{field_id}: {field['default']}") # Save config if we added any defaults if defaults_added: self.save_config() def _validate_config(self) -> Optional[str]: """ Validate config values to prevent injection and ensure data integrity. Returns: Error message if validation fails, None if valid """ for key, value in self.config.items(): # Validate key format if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]{0,63}$', key): return f"Invalid config key format: {key}" # Validate location fields (JSON format) if isinstance(value, str) and value.strip().startswith('{'): try: loc = json.loads(value) except json.JSONDecodeError as e: return f"Invalid JSON for key {key}: {e}" # Validate lat/lng if present try: if 'lat' in loc: lat = float(loc['lat']) if not -90 <= lat <= 90: return f"Latitude {lat} out of range [-90, 90] for key {key}" if 'lng' in loc: lng = float(loc['lng']) if not -180 <= lng <= 180: return f"Longitude {lng} out of range [-180, 180] for key {key}" except ValueError as e: return f"Invalid numeric value for {key}: {e}" return None def save_config(self) -> bool: """Save current configuration to file with validation.""" try: # Validate config before saving error = self._validate_config() if error: logger.error(f"Config validation failed for {self.app_id}: {error}") return False with open(self.config_file, 'w') as f: json.dump(self.config, f, indent=2) return True except Exception as e: logger.exception(f"Could not save config for {self.app_id}: {e}") return False def is_enabled(self) -> bool: """Check if app is enabled.""" return self.manifest.get("enabled", True) def get_render_interval(self) -> int: """Get render interval in seconds.""" default = 300 try: value = self.manifest.get("render_interval", default) interval = int(value) except (ValueError, TypeError): interval = default # Clamp to safe range: min 5, max 3600 return max(5, min(interval, 3600)) def get_display_duration(self) -> int: """Get display duration in seconds.""" default = 15 try: value = self.manifest.get("display_duration", default) duration = int(value) except (ValueError, TypeError): duration = default # Clamp to safe range: min 1, max 600 return max(1, min(duration, 600)) def should_render(self, current_time: float) -> bool: """Check if app should be re-rendered based on interval.""" interval = self.get_render_interval() return (current_time - self.last_render_time) >= interval class StarlarkAppsPlugin(BasePlugin): """ Starlark Apps Manager plugin. Manages Starlark (.star) apps and renders them using Pixlet. Each installed app becomes a dynamic display mode. """ def __init__(self, plugin_id: str, config: Dict[str, Any], display_manager, cache_manager, plugin_manager): """Initialize the Starlark Apps plugin.""" super().__init__(plugin_id, config, display_manager, cache_manager, plugin_manager) # Initialize components self.pixlet = PixletRenderer( pixlet_path=config.get("pixlet_path"), timeout=config.get("render_timeout", 30) ) self.extractor = FrameExtractor( default_frame_delay=config.get("default_frame_delay", 50) ) # App storage self.apps_dir = self._get_apps_directory() self.manifest_file = self.apps_dir / "manifest.json" self.apps: Dict[str, StarlarkApp] = {} # Display state self.current_app: Optional[StarlarkApp] = None self.last_update_check = 0 # Check Pixlet availability if not self.pixlet.is_available(): self.logger.error("Pixlet not available - Starlark apps will not work") self.logger.error("Install Pixlet or place bundled binary in bin/pixlet/") else: version = self.pixlet.get_version() self.logger.info(f"Pixlet available: {version}") # Calculate optimal magnification based on display size self.calculated_magnify = self._calculate_optimal_magnify() if self.calculated_magnify > 1: self.logger.info(f"Display size: {self.display_manager.matrix.width}x{self.display_manager.matrix.height}, " f"recommended magnify: {self.calculated_magnify}") # Load installed apps self._load_installed_apps() self.logger.info(f"Starlark Apps plugin initialized with {len(self.apps)} apps") @property def modes(self) -> List[str]: """ Return list of display modes (one per installed Starlark app). This allows each installed app to appear as a separate display mode in the schedule/rotation system. Returns: List of app IDs that can be used as display modes """ # Return list of enabled app IDs as display modes return [app.app_id for app in self.apps.values() if app.is_enabled()] def validate_config(self) -> bool: """ Validate plugin configuration. Ensures required configuration values are valid for Starlark apps. Returns: True if configuration is valid, False otherwise """ # Call parent validation first if not super().validate_config(): return False # Validate magnify range (0-8) if "magnify" in self.config: magnify = self.config["magnify"] if not isinstance(magnify, int) or magnify < 0 or magnify > 8: self.logger.error("magnify must be an integer between 0 and 8") return False # Validate render_timeout if "render_timeout" in self.config: timeout = self.config["render_timeout"] if not isinstance(timeout, (int, float)) or timeout < 5 or timeout > 120: self.logger.error("render_timeout must be a number between 5 and 120") return False # Validate cache_ttl if "cache_ttl" in self.config: ttl = self.config["cache_ttl"] if not isinstance(ttl, (int, float)) or ttl < 60 or ttl > 3600: self.logger.error("cache_ttl must be a number between 60 and 3600") return False # Validate scale_method if "scale_method" in self.config: method = self.config["scale_method"] valid_methods = ["nearest", "bilinear", "bicubic", "lanczos"] if method not in valid_methods: self.logger.error(f"scale_method must be one of: {', '.join(valid_methods)}") return False # Validate default_frame_delay if "default_frame_delay" in self.config: delay = self.config["default_frame_delay"] if not isinstance(delay, (int, float)) or delay < 16 or delay > 1000: self.logger.error("default_frame_delay must be a number between 16 and 1000") return False return True def _calculate_optimal_magnify(self) -> int: """ Calculate optimal magnification factor based on display dimensions. Tronbyte apps are designed for 64x32 displays. This calculates what magnification would best fit the current display. Returns: Recommended magnify value (1-8) """ try: display_width = self.display_manager.matrix.width display_height = self.display_manager.matrix.height # Tronbyte native resolution NATIVE_WIDTH = 64 NATIVE_HEIGHT = 32 # Calculate scale factors for width and height width_scale = display_width / NATIVE_WIDTH height_scale = display_height / NATIVE_HEIGHT # Use the smaller scale to ensure content fits # (prevents overflow on one dimension) scale_factor = min(width_scale, height_scale) # Round down to get integer magnify value magnify = int(scale_factor) # Clamp to reasonable range (1-8) magnify = max(1, min(8, magnify)) self.logger.debug(f"Display: {display_width}x{display_height}, " f"Native: {NATIVE_WIDTH}x{NATIVE_HEIGHT}, " f"Calculated magnify: {magnify}") return magnify except Exception as e: self.logger.warning(f"Could not calculate magnify: {e}") return 1 def get_magnify_recommendation(self) -> Dict[str, Any]: """ Get detailed magnification recommendation for current display. Returns: Dictionary with recommendation details """ try: display_width = self.display_manager.matrix.width display_height = self.display_manager.matrix.height NATIVE_WIDTH = 64 NATIVE_HEIGHT = 32 width_scale = display_width / NATIVE_WIDTH height_scale = display_height / NATIVE_HEIGHT # Calculate for different magnify values recommendations = [] for magnify in range(1, 9): render_width = NATIVE_WIDTH * magnify render_height = NATIVE_HEIGHT * magnify # Check if this magnify fits perfectly perfect_fit = (render_width == display_width and render_height == display_height) # Check if scaling is needed needs_scaling = (render_width != display_width or render_height != display_height) # Calculate quality score (1-100) if perfect_fit: quality_score = 100 elif not needs_scaling: quality_score = 95 else: # Score based on how close to display size width_ratio = min(render_width, display_width) / max(render_width, display_width) height_ratio = min(render_height, display_height) / max(render_height, display_height) quality_score = int((width_ratio + height_ratio) / 2 * 100) recommendations.append({ 'magnify': magnify, 'render_size': f"{render_width}x{render_height}", 'perfect_fit': perfect_fit, 'needs_scaling': needs_scaling, 'quality_score': quality_score, 'recommended': magnify == self.calculated_magnify }) return { 'display_size': f"{display_width}x{display_height}", 'native_size': f"{NATIVE_WIDTH}x{NATIVE_HEIGHT}", 'calculated_magnify': self.calculated_magnify, 'width_scale': round(width_scale, 2), 'height_scale': round(height_scale, 2), 'recommendations': recommendations } except Exception as e: self.logger.exception(f"Error getting magnify recommendation: {e}") return { 'display_size': 'unknown', 'calculated_magnify': 1, 'recommendations': [] } def _get_effective_magnify(self) -> int: """ Get the effective magnify value to use for rendering. Priority: 1. User-configured magnify (if valid and in range 1-8) 2. Auto-calculated magnify Returns: Magnify value to use """ config_magnify = self.config.get("magnify") # Validate and clamp config_magnify if config_magnify is not None: try: # Convert to int if possible config_magnify = int(config_magnify) # Clamp to safe range (1-8) if 1 <= config_magnify <= 8: return config_magnify except (ValueError, TypeError): # Non-numeric value, fall through to calculated pass # Fall back to auto-calculated value return self.calculated_magnify def _get_apps_directory(self) -> Path: """Get the directory for storing Starlark apps.""" try: # Try to find project root current_dir = Path(__file__).resolve().parent project_root = current_dir.parent.parent apps_dir = project_root / "starlark-apps" except Exception: # Fallback to current working directory apps_dir = Path.cwd() / "starlark-apps" # Create directory if it doesn't exist apps_dir.mkdir(parents=True, exist_ok=True) return apps_dir def _sanitize_app_id(self, app_id: str) -> str: """ Sanitize app_id into a safe slug for use in file paths. Args: app_id: Original app identifier Returns: Sanitized slug containing only [a-z0-9_.-] characters """ if not app_id: raise ValueError("app_id cannot be empty") # Replace invalid characters with underscore # Allow only: lowercase letters, digits, underscore, period, hyphen safe_slug = re.sub(r'[^a-z0-9_.-]', '_', app_id.lower()) # Remove leading/trailing dots, underscores, or hyphens safe_slug = safe_slug.strip('._-') # Ensure it's not empty after sanitization if not safe_slug: raise ValueError(f"app_id '{app_id}' becomes empty after sanitization") return safe_slug def _verify_path_safety(self, path: Path, base_dir: Path) -> None: """ Verify that a path is within the base directory to prevent path traversal. Args: path: Path to verify base_dir: Base directory that path must be within Raises: ValueError: If path escapes the base directory """ try: resolved_path = path.resolve() resolved_base = base_dir.resolve() # Check if path is relative to base directory if not resolved_path.is_relative_to(resolved_base): raise ValueError( f"Path traversal detected: {resolved_path} is not within {resolved_base}" ) except (ValueError, AttributeError) as e: # AttributeError for Python < 3.9 where is_relative_to doesn't exist # Fallback: check if resolved path starts with resolved base resolved_path = path.resolve() resolved_base = base_dir.resolve() try: resolved_path.relative_to(resolved_base) except ValueError: raise ValueError( f"Path traversal detected: {resolved_path} is not within {resolved_base}" ) from e def _load_installed_apps(self) -> None: """Load all installed apps from manifest.""" if not self.manifest_file.exists(): # Create initial manifest self._save_manifest({"apps": {}}) return try: with open(self.manifest_file, 'r') as f: manifest = json.load(f) apps_data = manifest.get("apps", {}) for app_id, app_manifest in apps_data.items(): try: # Sanitize app_id to prevent path traversal safe_app_id = self._sanitize_app_id(app_id) app_dir = (self.apps_dir / safe_app_id).resolve() # Verify path safety self._verify_path_safety(app_dir, self.apps_dir) except ValueError as e: self.logger.warning(f"Invalid app_id '{app_id}': {e}") continue if not app_dir.exists(): self.logger.warning(f"App directory missing: {app_id}") continue try: # Use safe_app_id for internal storage to match directory structure app = StarlarkApp(safe_app_id, app_dir, app_manifest) self.apps[safe_app_id] = app self.logger.debug(f"Loaded app: {app_id} (sanitized: {safe_app_id})") except Exception as e: self.logger.exception(f"Error loading app {app_id}: {e}") self.logger.info(f"Loaded {len(self.apps)} Starlark apps") except Exception as e: self.logger.exception(f"Error loading apps manifest: {e}") def _save_manifest(self, manifest: Dict[str, Any]) -> bool: """ Save apps manifest to file with file locking to prevent race conditions. Acquires exclusive lock on manifest file before writing to prevent concurrent modifications. """ temp_file = None lock_fd = None try: # Create parent directory if needed self.manifest_file.parent.mkdir(parents=True, exist_ok=True) # Open manifest file for locking (create if doesn't exist, don't truncate) # Use os.open with O_CREAT | O_RDWR to create if missing, but don't truncate lock_fd = os.open(str(self.manifest_file), os.O_CREAT | os.O_RDWR, 0o644) # Acquire exclusive lock on manifest file BEFORE creating temp file # This serializes all writers and prevents concurrent races fcntl.flock(lock_fd, fcntl.LOCK_EX) try: # Now that we hold the lock, create and write temp file temp_file = self.manifest_file.with_suffix('.tmp') with open(temp_file, 'w') as f: json.dump(manifest, f, indent=2) f.flush() os.fsync(f.fileno()) # Ensure data is written to disk # Atomic rename (overwrites destination) while still holding lock temp_file.replace(self.manifest_file) return True finally: # Release lock fcntl.flock(lock_fd, fcntl.LOCK_UN) os.close(lock_fd) except (OSError, IOError, json.JSONDecodeError, ValueError) as e: self.logger.exception("Error saving manifest while writing manifest file", exc_info=True) # Clean up temp file if it exists if temp_file is not None and temp_file.exists(): try: temp_file.unlink() except Exception as cleanup_exc: self.logger.warning(f"Failed to clean up temp file {temp_file}: {cleanup_exc}") # Clean up lock fd if still open if lock_fd is not None: try: os.close(lock_fd) except Exception as cleanup_exc: self.logger.warning(f"Failed to close lock file descriptor: {cleanup_exc}") return False def _update_manifest_safe(self, updater_fn) -> bool: """ Safely update manifest with file locking to prevent race conditions. Holds exclusive lock for entire read-modify-write cycle. Args: updater_fn: Function that takes manifest dict and modifies it in-place Returns: True if successful, False otherwise """ lock_fd = None temp_file = None try: # Create parent directory if needed self.manifest_file.parent.mkdir(parents=True, exist_ok=True) # Open manifest file for locking (create if doesn't exist, don't truncate) lock_fd = os.open(str(self.manifest_file), os.O_CREAT | os.O_RDWR, 0o644) # Acquire exclusive lock for entire read-modify-write cycle fcntl.flock(lock_fd, fcntl.LOCK_EX) try: # Read current manifest while holding exclusive lock if self.manifest_file.exists() and self.manifest_file.stat().st_size > 0: with open(self.manifest_file, 'r') as f: manifest = json.load(f) else: # Empty or non-existent file, start with default structure manifest = {"apps": {}} # Apply updates while still holding lock updater_fn(manifest) # Write back to temp file, then atomic replace (still holding lock) temp_file = self.manifest_file.with_suffix('.tmp') with open(temp_file, 'w') as f: json.dump(manifest, f, indent=2) f.flush() os.fsync(f.fileno()) # Atomic rename while still holding lock temp_file.replace(self.manifest_file) return True finally: # Release lock fcntl.flock(lock_fd, fcntl.LOCK_UN) os.close(lock_fd) except (OSError, IOError, json.JSONDecodeError, ValueError) as e: self.logger.exception("Error updating manifest during read-modify-write cycle", exc_info=True) # Clean up temp file if it exists if temp_file is not None and temp_file.exists(): try: temp_file.unlink() except Exception as cleanup_exc: self.logger.warning(f"Failed to clean up temp file {temp_file}: {cleanup_exc}") # Clean up lock fd if still open if lock_fd is not None: try: os.close(lock_fd) except Exception as cleanup_exc: self.logger.warning(f"Failed to close lock file descriptor: {cleanup_exc}") return False def update(self) -> None: """Update method - check if apps need re-rendering.""" current_time = time.time() # Check apps that need re-rendering based on their intervals if self.config.get("auto_refresh_apps", True): for app in self.apps.values(): if app.is_enabled() and app.should_render(current_time): self._render_app(app, force=False) def display(self, force_clear: bool = False) -> None: """ Display current Starlark app. This method is called during the display rotation. Displays frames from the currently active app. """ try: if force_clear: self.display_manager.clear() # If no current app, try to select one if not self.current_app: self._select_next_app() if not self.current_app: # No apps available self.logger.debug("No Starlark apps to display") return # Render app if needed if not self.current_app.frames: success = self._render_app(self.current_app, force=True) if not success: self.logger.error(f"Failed to render app: {self.current_app.app_id}") return # Display current frame self._display_frame() except Exception as e: self.logger.error(f"Error displaying Starlark app: {e}") def _select_next_app(self) -> None: """Select the next enabled app for display.""" enabled_apps = [app for app in self.apps.values() if app.is_enabled()] if not enabled_apps: self.current_app = None return # Simple rotation - could be enhanced with priorities if self.current_app and self.current_app in enabled_apps: current_idx = enabled_apps.index(self.current_app) next_idx = (current_idx + 1) % len(enabled_apps) self.current_app = enabled_apps[next_idx] else: self.current_app = enabled_apps[0] self.logger.debug(f"Selected app for display: {self.current_app.app_id}") def _render_app(self, app: StarlarkApp, force: bool = False) -> bool: """ Render a Starlark app using Pixlet. Args: app: App to render force: Force render even if cached Returns: True if successful """ try: current_time = time.time() # Check cache use_cache = self.config.get("cache_rendered_output", True) cache_ttl = self.config.get("cache_ttl", 300) if (not force and use_cache and app.cache_file.exists() and (current_time - app.last_render_time) < cache_ttl): # Use cached render self.logger.debug(f"Using cached render for: {app.app_id}") return self._load_frames_from_cache(app) # Render with Pixlet self.logger.info(f"Rendering app: {app.app_id}") # Get effective magnification factor (config or auto-calculated) magnify = self._get_effective_magnify() self.logger.debug(f"Using magnify={magnify} for {app.app_id}") # Filter out LEDMatrix-internal timing keys before passing to pixlet INTERNAL_KEYS = {'render_interval', 'display_duration'} pixlet_config = {k: v for k, v in app.config.items() if k not in INTERNAL_KEYS} success, error = self.pixlet.render( star_file=str(app.star_file), output_path=str(app.cache_file), config=pixlet_config, magnify=magnify ) if not success: self.logger.error(f"Pixlet render failed: {error}") return False # Extract frames success = self._load_frames_from_cache(app) if success: app.last_render_time = current_time return success except Exception as e: self.logger.error(f"Error rendering app {app.app_id}: {e}") return False def _load_frames_from_cache(self, app: StarlarkApp) -> bool: """Load frames from cached WebP file.""" try: success, frames, error = self.extractor.load_webp(str(app.cache_file)) if not success: self.logger.error(f"Frame extraction failed: {error}") return False # Scale frames if needed if self.config.get("scale_output", True): width = self.display_manager.matrix.width height = self.display_manager.matrix.height # Get scaling method from config scale_method_str = self.config.get("scale_method", "nearest") scale_method_map = { "nearest": Image.Resampling.NEAREST, "bilinear": Image.Resampling.BILINEAR, "bicubic": Image.Resampling.BICUBIC, "lanczos": Image.Resampling.LANCZOS } scale_method = scale_method_map.get(scale_method_str, Image.Resampling.NEAREST) # Check if we should center instead of scale if self.config.get("center_small_output", False): frames = self.extractor.center_frames(frames, width, height) else: frames = self.extractor.scale_frames(frames, width, height, scale_method) # Optimize frames to limit memory usage (max_frames=None means no limit) max_frames = self.config.get("max_frames") if max_frames is not None: frames = self.extractor.optimize_frames(frames, max_frames=max_frames) app.frames = frames app.current_frame_index = 0 app.last_frame_time = time.time() self.logger.debug(f"Loaded {len(frames)} frames for {app.app_id}") return True except Exception as e: self.logger.error(f"Error loading frames for {app.app_id}: {e}") return False def _display_frame(self) -> None: """Display the current frame of the current app.""" if not self.current_app or not self.current_app.frames: return try: current_time = time.time() frame, delay_ms = self.current_app.frames[self.current_app.current_frame_index] # Set frame on display manager self.display_manager.image = frame self.display_manager.update_display() # Check if it's time to advance to next frame delay_seconds = delay_ms / 1000.0 if (current_time - self.current_app.last_frame_time) >= delay_seconds: self.current_app.current_frame_index = ( (self.current_app.current_frame_index + 1) % len(self.current_app.frames) ) self.current_app.last_frame_time = current_time except Exception as e: self.logger.error(f"Error displaying frame: {e}") def install_app(self, app_id: str, star_file_path: str, metadata: Optional[Dict[str, Any]] = None, assets_dir: Optional[str] = None) -> bool: """ Install a new Starlark app. Args: app_id: Unique identifier for the app star_file_path: Path to .star file to install metadata: Optional metadata (name, description, etc.) assets_dir: Optional directory containing assets (images/, sources/, etc.) Returns: True if successful """ try: import shutil # Sanitize app_id to prevent path traversal safe_app_id = self._sanitize_app_id(app_id) # Create app directory with resolved path app_dir = (self.apps_dir / safe_app_id).resolve() # Verify path safety BEFORE creating directories self._verify_path_safety(app_dir, self.apps_dir) app_dir.mkdir(parents=True, exist_ok=True) # Copy .star file with sanitized app_id star_dest = app_dir / f"{safe_app_id}.star" # Verify star_dest path safety self._verify_path_safety(star_dest, self.apps_dir) shutil.copy2(star_file_path, star_dest) # Copy asset directories if provided (images/, sources/, etc.) if assets_dir and Path(assets_dir).exists(): assets_path = Path(assets_dir) for item in assets_path.iterdir(): if item.is_dir(): # Copy entire directory (e.g., images/, sources/) dest_dir = app_dir / item.name # Verify dest_dir path safety self._verify_path_safety(dest_dir, self.apps_dir) if dest_dir.exists(): shutil.rmtree(dest_dir) shutil.copytree(item, dest_dir) self.logger.debug(f"Copied assets directory: {item.name}") self.logger.info(f"Installed assets for {app_id}") # Create app manifest entry app_manifest = { "name": metadata.get("name", app_id) if metadata else app_id, "original_id": app_id, # Store original for reference "star_file": f"{safe_app_id}.star", "enabled": True, "render_interval": metadata.get("render_interval", 300) if metadata else 300, "display_duration": metadata.get("display_duration", 15) if metadata else 15 } # Try to extract schema _, schema, _ = self.pixlet.extract_schema(str(star_dest)) if schema: schema_path = app_dir / "schema.json" # Verify schema path safety self._verify_path_safety(schema_path, self.apps_dir) with open(schema_path, 'w') as f: json.dump(schema, f, indent=2) # Create default config — pre-populate with schema defaults default_config = {} if schema: fields = schema.get('fields') or schema.get('schema') or [] for field in fields: if isinstance(field, dict) and 'id' in field and 'default' in field: default_config[field['id']] = field['default'] config_path = app_dir / "config.json" # Verify config path safety self._verify_path_safety(config_path, self.apps_dir) with open(config_path, 'w') as f: json.dump(default_config, f, indent=2) # Update manifest (use safe_app_id as key to match directory) def update_fn(manifest): manifest["apps"][safe_app_id] = app_manifest if not self._update_manifest_safe(update_fn): self.logger.error(f"Failed to update manifest for {app_id}") return False # Create app instance (use safe_app_id for internal key, original for display) app = StarlarkApp(safe_app_id, app_dir, app_manifest) self.apps[safe_app_id] = app self.logger.info(f"Installed Starlark app: {app_id} (sanitized: {safe_app_id})") return True except Exception as e: self.logger.error(f"Error installing app {app_id}: {e}") return False def uninstall_app(self, app_id: str) -> bool: """ Uninstall a Starlark app. Args: app_id: App to uninstall Returns: True if successful """ try: import shutil if app_id not in self.apps: self.logger.warning(f"App not found: {app_id}") return False # Remove from current app if selected if self.current_app and self.current_app.app_id == app_id: self.current_app = None # Get app reference before removing from dict app = self.apps.get(app_id) # Update manifest FIRST (before modifying filesystem) def update_fn(manifest): if app_id in manifest["apps"]: del manifest["apps"][app_id] if not self._update_manifest_safe(update_fn): self.logger.error(f"Failed to update manifest when uninstalling {app_id}") return False # Remove from apps dict self.apps.pop(app_id) # Remove directory (after manifest update succeeds) if app and app.app_dir.exists(): shutil.rmtree(app.app_dir) self.logger.info(f"Uninstalled Starlark app: {app_id}") return True except Exception as e: self.logger.error(f"Error uninstalling app {app_id}: {e}") return False def get_display_duration(self) -> float: """Get display duration for current app.""" if self.current_app: return float(self.current_app.get_display_duration()) return self.config.get('display_duration', 15.0) # ─── Vegas Mode Integration ────────────────────────────────────── def get_vegas_content(self) -> Optional[List[Image.Image]]: """Return rendered frames from enabled starlark apps for vegas scroll.""" images = [] for app in self.apps.values(): if not app.is_enabled(): continue # Use cached frames if available if app.frames: images.extend([frame for frame, delay in app.frames]) else: # Try to render and extract frames if self._render_app(app): if app.frames: images.extend([frame for frame, delay in app.frames]) return images if images else None def get_vegas_content_type(self) -> str: """Indicate the type of content for Vegas scroll.""" return "multi" def get_vegas_display_mode(self) -> VegasDisplayMode: """Get the display mode for Vegas scroll integration.""" return VegasDisplayMode.FIXED_SEGMENT def get_info(self) -> Dict[str, Any]: """Return plugin info for web UI.""" info = super().get_info() info.update({ 'pixlet_available': self.pixlet.is_available(), 'pixlet_version': self.pixlet.get_version(), 'installed_apps': len(self.apps), 'enabled_apps': len([a for a in self.apps.values() if a.is_enabled()]), 'current_app': self.current_app.app_id if self.current_app else None, 'apps': { app_id: { 'name': app.manifest.get('name', app_id), 'enabled': app.is_enabled(), 'has_frames': app.frames is not None } for app_id, app in self.apps.items() } }) return info