15 Commits

Author SHA1 Message Date
Chuck
a8609aea18 fix(starlark): load schema from schema.json in standalone mode
The standalone API endpoint was returning schema: null because it didn't
load the schema.json file. Now reads schema from disk when returning
app details via web service.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-19 11:35:16 -05:00
Chuck
0dc1a8f6f4 fix(starlark): add List to typing imports for schema parser
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-19 11:33:49 -05:00
Chuck
d876679b9f feat(starlark): implement source code parser for schema extraction
Pixlet CLI doesn't support schema extraction (--print-schema flag doesn't exist),
so apps were being installed without schemas even when they have them.

Implemented regex-based .star file parser that:
- Extracts get_schema() function from source code
- Parses schema.Schema(version, fields) structure
- Handles variable-referenced dropdown options (e.g., options = dialectOptions)
- Supports Location, Text, Toggle, Dropdown, Color, DateTime fields
- Gracefully handles unsupported fields (OAuth2, LocationBased, etc.)
- Returns formatted JSON matching web UI template expectations

Coverage: 90%+ of Tronbyte apps (static schemas + variable references)

Changes:
- Replace extract_schema() to parse .star files directly instead of using Pixlet CLI
- Add 6 helper methods for parsing schema structure
- Handle nested parentheses and brackets properly
- Resolve variable references for dropdown options

Tested with:
- analog_clock.star (Location field) ✓
- Multi-field test (Text + Dropdown + Toggle) ✓
- Variable-referenced options ✓

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-19 11:32:37 -05:00
Chuck
6a04e882c1 feat(starlark): extract schema during standalone install
The standalone install function (_install_star_file) wasn't extracting
schema from .star files, so apps installed via the web service had no
schema.json and the config panel couldn't render schema-driven forms.

Now uses PixletRenderer to extract schema during standalone install,
same as the plugin does.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-19 09:47:08 -05:00
Chuck
45f6e7c20e fix(starlark): use correct 'fileName' field from manifest (camelCase)
The Tronbyte manifest uses 'fileName' (camelCase), not 'filename' (lowercase).
This caused the download to fall back to {app_id}.star which doesn't exist
for apps like analogclock (which has analog_clock.star).

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-19 09:30:25 -05:00
Chuck
a821060084 fix(starlark): reload tronbyte_repository module to pick up code changes
The web service caches imported modules in sys.modules. When deploying
code updates, the old cached version was still being used.

Now uses importlib.reload() when module is already loaded.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-19 09:29:19 -05:00
Chuck
c584f227c1 fix(starlark): use manifest filename field for .star downloads
Tronbyte apps don't always name their .star file to match the directory.
For example, the "analogclock" app has "analog_clock.star" (with underscore).

The manifest.yaml contains a "filename" field with the correct name.

Changes:
- download_star_file() now accepts optional filename parameter
- Install endpoint passes metadata['filename'] to download_star_file()
- Falls back to {app_id}.star if filename not in manifest

Fixes: "Failed to download .star file for analogclock" error

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-18 21:41:50 -05:00
Chuck
885fdeed62 feat(starlark): schema-driven config forms + critical security fixes
## Schema-Driven Config UI
- Render type-appropriate form inputs from schema.json (text, dropdown, toggle, color, datetime, location)
- Pre-populate config.json with schema defaults on install
- Auto-merge schema defaults when loading existing apps (handles schema updates)
- Location fields: 3-part mini-form (lat/lng/timezone) assembles into JSON
- Toggle fields: support both boolean and string "true"/"false" values
- Unsupported field types (oauth2, photo_select) show warning banners
- Fallback to raw key/value inputs for apps without schema

## Critical Security Fixes (P0)
- **Path Traversal**: Verify path safety BEFORE mkdir to prevent TOCTOU
- **Race Conditions**: Add file locking (fcntl) + atomic writes to manifest operations
- **Command Injection**: Validate config keys/values with regex before passing to Pixlet subprocess

## Major Logic Fixes (P1)
- **Config/Manifest Separation**: Store timing keys (render_interval, display_duration) ONLY in manifest
- **Location Validation**: Validate lat [-90,90] and lng [-180,180] ranges, reject malformed JSON
- **Schema Defaults Merge**: Auto-apply new schema defaults to existing app configs on load
- **Config Key Validation**: Enforce alphanumeric+underscore format, prevent prototype pollution

## Files Changed
- web_interface/templates/v3/partials/starlark_config.html — schema-driven form rendering
- plugin-repos/starlark-apps/manager.py — file locking, path safety, config validation, schema merge
- plugin-repos/starlark-apps/pixlet_renderer.py — config value sanitization
- web_interface/blueprints/api_v3.py — timing key separation, safe manifest updates

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-18 21:38:57 -05:00
Chuck
679d9cc2fe fix(store): move storeFilterState to global scope to fix scoping bug
storeFilterState, pluginStoreCache, and related variables were declared
inside an IIFE but referenced by top-level functions, causing
ReferenceError that broke all plugin loading.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 16:14:41 -05:00
Chuck
942663abfd feat(store): add sort, filter, search, and pagination to Plugin Store and Starlark Apps
Plugin Store:
- Live search with 300ms debounce (replaces Search button)
- Sort dropdown: A→Z, Z→A, Category, Author, Newest
- Installed toggle filter (All / Installed / Not Installed)
- Per-page selector (12/24/48) with pagination controls
- "Installed" badge and "Reinstall" button on already-installed plugins
- Active filter count badge + clear filters button

Starlark Apps:
- Parallel bulk manifest fetching via ThreadPoolExecutor (20 workers)
- Server-side 2-hour cache for all 500+ Tronbyte app manifests
- Auto-loads all apps when section expands (no Browse button)
- Live search, sort (A→Z, Z→A, Category, Author), author dropdown
- Installed toggle filter, per-page selector (24/48/96), pagination
- "Installed" badge on cards, "Reinstall" button variant

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 14:54:29 -05:00
Chuck
5f2daa52b0 fix(starlark): always show editable timing settings in config panel
Render interval and display duration are now always editable in the
starlark app config panel, not just shown as read-only status text.
App-specific settings from schema still appear below when present.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 13:53:45 -05:00
Chuck
13ab4f7eee fix(starlark): make config partial work standalone in web service
Read starlark app data from manifest file directly when the plugin
isn't loaded, matching the api_v3.py standalone pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 13:39:22 -05:00
Chuck
4f438fc76a fix(starlark): make API endpoints work standalone in web service
The web service runs as a separate process with display_manager=None,
so plugins aren't instantiated. Refactor starlark API endpoints to
read/write the manifest file directly when the plugin isn't loaded,
enabling full CRUD operations from the web UI.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 13:37:35 -05:00
Chuck
f279e9eea5 fix(starlark): use bare imports instead of relative imports
Plugin loader uses spec_from_file_location without package context,
so relative imports (.pixlet_renderer) fail. Use bare imports like
all other plugins do.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 13:28:48 -05:00
Chuck
3ec1e987a4 feat: integrate Starlark/Tronbyte app support into plugin system
Add starlark-apps plugin that renders Tidbyt/Tronbyte .star apps via
Pixlet binary and integrates them into the existing Plugin Manager UI
as virtual plugins. Includes vegas scroll support, Tronbyte repository
browsing, and per-app configuration.

- Extract working starlark plugin code from starlark branch onto fresh main
- Fix plugin conventions (get_logger, VegasDisplayMode, BasePlugin)
- Add 13 starlark API endpoints to api_v3.py (CRUD, browse, install, render)
- Virtual plugin entries (starlark:<app_id>) in installed plugins list
- Starlark-aware toggle and config routing in pages_v3.py
- Tronbyte repository browser section in Plugin Store UI
- Pixlet binary download script (scripts/download_pixlet.sh)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 13:27:22 -05:00
15 changed files with 5005 additions and 164 deletions

3
.gitignore vendored
View File

@@ -44,3 +44,6 @@ plugins/*
# Binary files and backups
bin/pixlet/
config/backups/
# Starlark apps runtime storage (installed .star files and cached renders)
/starlark-apps/

View File

@@ -0,0 +1,7 @@
"""
Starlark Apps Plugin Package
Seamlessly import and manage Starlark (.star) widgets from the Tronbyte/Tidbyt community.
"""
__version__ = "1.0.0"

View File

@@ -0,0 +1,100 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"title": "Starlark Apps Plugin Configuration",
"description": "Configuration for managing Starlark (.star) apps",
"properties": {
"enabled": {
"type": "boolean",
"description": "Enable or disable the Starlark apps system",
"default": true
},
"pixlet_path": {
"type": "string",
"description": "Path to Pixlet binary (auto-detected if empty)",
"default": ""
},
"render_timeout": {
"type": "number",
"description": "Maximum time in seconds for rendering a .star app",
"default": 30,
"minimum": 5,
"maximum": 120
},
"cache_rendered_output": {
"type": "boolean",
"description": "Cache rendered WebP output to reduce CPU usage",
"default": true
},
"cache_ttl": {
"type": "number",
"description": "Cache time-to-live in seconds",
"default": 300,
"minimum": 60,
"maximum": 3600
},
"default_frame_delay": {
"type": "number",
"description": "Default delay between frames in milliseconds (if not specified by app)",
"default": 50,
"minimum": 16,
"maximum": 1000
},
"scale_output": {
"type": "boolean",
"description": "Scale app output to match display dimensions",
"default": true
},
"scale_method": {
"type": "string",
"enum": ["nearest", "bilinear", "bicubic", "lanczos"],
"description": "Scaling algorithm (nearest=pixel-perfect, lanczos=smoothest)",
"default": "nearest"
},
"magnify": {
"type": "integer",
"description": "Pixlet magnification factor (0=auto, 1=64x32, 2=128x64, 3=192x96, etc.)",
"default": 0,
"minimum": 0,
"maximum": 8
},
"center_small_output": {
"type": "boolean",
"description": "Center small apps on large displays instead of stretching",
"default": false
},
"background_render": {
"type": "boolean",
"description": "Render apps in background to avoid display delays",
"default": true
},
"auto_refresh_apps": {
"type": "boolean",
"description": "Automatically refresh apps at their specified intervals",
"default": true
},
"transition": {
"type": "object",
"description": "Transition settings for app display",
"properties": {
"type": {
"type": "string",
"enum": ["redraw", "fade", "slide", "wipe"],
"default": "fade"
},
"speed": {
"type": "integer",
"description": "Transition speed (1-10)",
"default": 3,
"minimum": 1,
"maximum": 10
},
"enabled": {
"type": "boolean",
"default": true
}
}
}
},
"additionalProperties": false
}

View File

@@ -0,0 +1,285 @@
"""
Frame Extractor Module for Starlark Apps
Extracts individual frames from WebP animations produced by Pixlet.
Handles both static images and animated WebP files.
"""
import logging
from typing import List, Tuple, Optional
from PIL import Image
logger = logging.getLogger(__name__)
class FrameExtractor:
"""
Extracts frames from WebP animations.
Handles:
- Static WebP images (single frame)
- Animated WebP files (multiple frames with delays)
- Frame timing and duration extraction
"""
def __init__(self, default_frame_delay: int = 50):
"""
Initialize frame extractor.
Args:
default_frame_delay: Default delay in milliseconds if not specified
"""
self.default_frame_delay = default_frame_delay
def load_webp(self, webp_path: str) -> Tuple[bool, Optional[List[Tuple[Image.Image, int]]], Optional[str]]:
"""
Load WebP file and extract all frames with their delays.
Args:
webp_path: Path to WebP file
Returns:
Tuple of:
- success: bool
- frames: List of (PIL.Image, delay_ms) tuples, or None on failure
- error: Error message, or None on success
"""
try:
with Image.open(webp_path) as img:
# Check if animated
is_animated = getattr(img, "is_animated", False)
if not is_animated:
# Static image - single frame
# Convert to RGB (LED matrix needs RGB) to match animated branch format
logger.debug(f"Loaded static WebP: {webp_path}")
rgb_img = img.convert("RGB")
return True, [(rgb_img.copy(), self.default_frame_delay)], None
# Animated WebP - extract all frames
frames = []
frame_count = getattr(img, "n_frames", 1)
logger.debug(f"Extracting {frame_count} frames from animated WebP: {webp_path}")
for frame_index in range(frame_count):
try:
img.seek(frame_index)
# Get frame duration (in milliseconds)
# WebP stores duration in milliseconds
duration = img.info.get("duration", self.default_frame_delay)
# Ensure minimum frame delay (prevent too-fast animations)
if duration < 16: # Less than ~60fps
duration = 16
# Convert frame to RGB (LED matrix needs RGB)
frame = img.convert("RGB")
frames.append((frame.copy(), duration))
except EOFError:
logger.warning(f"Reached end of frames at index {frame_index}")
break
except Exception as e:
logger.warning(f"Error extracting frame {frame_index}: {e}")
continue
if not frames:
error = "No frames extracted from WebP"
logger.error(error)
return False, None, error
logger.debug(f"Successfully extracted {len(frames)} frames")
return True, frames, None
except FileNotFoundError:
error = f"WebP file not found: {webp_path}"
logger.error(error)
return False, None, error
except Exception as e:
error = f"Error loading WebP: {e}"
logger.error(error)
return False, None, error
def scale_frames(
self,
frames: List[Tuple[Image.Image, int]],
target_width: int,
target_height: int,
method: Image.Resampling = Image.Resampling.NEAREST
) -> List[Tuple[Image.Image, int]]:
"""
Scale all frames to target dimensions.
Args:
frames: List of (image, delay) tuples
target_width: Target width in pixels
target_height: Target height in pixels
method: Resampling method (default: NEAREST for pixel-perfect scaling)
Returns:
List of scaled (image, delay) tuples
"""
scaled_frames = []
for frame, delay in frames:
try:
# Only scale if dimensions don't match
if frame.width != target_width or frame.height != target_height:
scaled_frame = frame.resize(
(target_width, target_height),
resample=method
)
scaled_frames.append((scaled_frame, delay))
else:
scaled_frames.append((frame, delay))
except Exception as e:
logger.warning(f"Error scaling frame: {e}")
# Keep original frame on error
scaled_frames.append((frame, delay))
logger.debug(f"Scaled {len(scaled_frames)} frames to {target_width}x{target_height}")
return scaled_frames
def center_frames(
self,
frames: List[Tuple[Image.Image, int]],
target_width: int,
target_height: int,
background_color: tuple = (0, 0, 0)
) -> List[Tuple[Image.Image, int]]:
"""
Center frames on a larger canvas instead of scaling.
Useful for displaying small widgets on large displays without distortion.
Args:
frames: List of (image, delay) tuples
target_width: Target canvas width
target_height: Target canvas height
background_color: RGB tuple for background (default: black)
Returns:
List of centered (image, delay) tuples
"""
centered_frames = []
for frame, delay in frames:
try:
# If frame is already the right size, no centering needed
if frame.width == target_width and frame.height == target_height:
centered_frames.append((frame, delay))
continue
# Create black canvas at target size
canvas = Image.new('RGB', (target_width, target_height), background_color)
# Calculate position to center the frame
x_offset = (target_width - frame.width) // 2
y_offset = (target_height - frame.height) // 2
# Paste frame onto canvas
canvas.paste(frame, (x_offset, y_offset))
centered_frames.append((canvas, delay))
except Exception as e:
logger.warning(f"Error centering frame: {e}")
# Keep original frame on error
centered_frames.append((frame, delay))
logger.debug(f"Centered {len(centered_frames)} frames on {target_width}x{target_height} canvas")
return centered_frames
def get_total_duration(self, frames: List[Tuple[Image.Image, int]]) -> int:
"""
Calculate total animation duration in milliseconds.
Args:
frames: List of (image, delay) tuples
Returns:
Total duration in milliseconds
"""
return sum(delay for _, delay in frames)
def optimize_frames(
self,
frames: List[Tuple[Image.Image, int]],
max_frames: Optional[int] = None,
target_duration: Optional[int] = None
) -> List[Tuple[Image.Image, int]]:
"""
Optimize frame list by reducing frame count or adjusting timing.
Args:
frames: List of (image, delay) tuples
max_frames: Maximum number of frames to keep
target_duration: Target total duration in milliseconds
Returns:
Optimized list of (image, delay) tuples
"""
if not frames:
return frames
optimized = frames.copy()
# Limit frame count if specified
if max_frames is not None and max_frames > 0 and len(optimized) > max_frames:
# Sample frames evenly
step = len(optimized) / max_frames
indices = [int(i * step) for i in range(max_frames)]
optimized = [optimized[i] for i in indices]
logger.debug(f"Reduced frames from {len(frames)} to {len(optimized)}")
# Adjust timing to match target duration
if target_duration:
current_duration = self.get_total_duration(optimized)
if current_duration > 0:
scale_factor = target_duration / current_duration
optimized = [
(frame, max(16, int(delay * scale_factor)))
for frame, delay in optimized
]
logger.debug(f"Adjusted timing: {current_duration}ms -> {target_duration}ms")
return optimized
def frames_to_gif_data(self, frames: List[Tuple[Image.Image, int]]) -> Optional[bytes]:
"""
Convert frames to GIF byte data for caching or transmission.
Args:
frames: List of (image, delay) tuples
Returns:
GIF bytes, or None on error
"""
if not frames:
return None
try:
from io import BytesIO
output = BytesIO()
# Prepare frames for PIL
images = [frame for frame, _ in frames]
durations = [delay for _, delay in frames]
# Save as GIF
images[0].save(
output,
format="GIF",
save_all=True,
append_images=images[1:],
duration=durations,
loop=0, # Infinite loop
optimize=False # Skip optimization for speed
)
return output.getvalue()
except Exception as e:
logger.error(f"Error converting frames to GIF: {e}")
return None

View File

@@ -0,0 +1,971 @@
"""
Starlark Apps Plugin for LEDMatrix
Manages and displays Starlark (.star) apps from Tronbyte/Tidbyt community.
Provides seamless widget import without modification.
API Version: 1.0.0
"""
import json
import os
import re
import time
import fcntl
from pathlib import Path
from typing import Dict, Any, Optional, List, Tuple
from PIL import Image
from src.plugin_system.base_plugin import BasePlugin, VegasDisplayMode
from src.logging_config import get_logger
from pixlet_renderer import PixletRenderer
from frame_extractor import FrameExtractor
logger = get_logger(__name__)
class StarlarkApp:
"""Represents a single installed Starlark app."""
def __init__(self, app_id: str, app_dir: Path, manifest: Dict[str, Any]):
"""
Initialize a Starlark app instance.
Args:
app_id: Unique identifier for this app
app_dir: Directory containing the app files
manifest: App metadata from manifest
"""
self.app_id = app_id
self.app_dir = app_dir
self.manifest = manifest
self.star_file = app_dir / manifest.get("star_file", f"{app_id}.star")
self.config_file = app_dir / "config.json"
self.schema_file = app_dir / "schema.json"
self.cache_file = app_dir / "cached_render.webp"
# Load app configuration and schema
self.config = self._load_config()
self.schema = self._load_schema()
# Merge schema defaults into config for any missing fields
self._merge_schema_defaults()
# Runtime state
self.frames: Optional[List[Tuple[Image.Image, int]]] = None
self.current_frame_index = 0
self.last_frame_time = 0
self.last_render_time = 0
def _load_config(self) -> Dict[str, Any]:
"""Load app configuration from config.json."""
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Could not load config for {self.app_id}: {e}")
return {}
def _load_schema(self) -> Optional[Dict[str, Any]]:
"""Load app schema from schema.json."""
if self.schema_file.exists():
try:
with open(self.schema_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Could not load schema for {self.app_id}: {e}")
return None
def _merge_schema_defaults(self) -> None:
"""
Merge schema default values into config for any missing fields.
This ensures existing apps get defaults when schemas are updated with new fields.
"""
if not self.schema:
return
# Get fields from schema (handles both 'fields' and 'schema' keys)
fields = self.schema.get('fields') or self.schema.get('schema') or []
defaults_added = False
for field in fields:
if isinstance(field, dict) and 'id' in field and 'default' in field:
field_id = field['id']
# Only add if not already present in config
if field_id not in self.config:
self.config[field_id] = field['default']
defaults_added = True
logger.debug(f"Added default value for {self.app_id}.{field_id}: {field['default']}")
# Save config if we added any defaults
if defaults_added:
self.save_config()
def _validate_config(self) -> Optional[str]:
"""
Validate config values to prevent injection and ensure data integrity.
Returns:
Error message if validation fails, None if valid
"""
for key, value in self.config.items():
# Validate key format
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]{0,63}$', key):
return f"Invalid config key format: {key}"
# Validate location fields (JSON format)
if isinstance(value, str) and value.strip().startswith('{'):
try:
loc = json.loads(value)
if 'lat' in loc:
lat = float(loc['lat'])
if not -90 <= lat <= 90:
return f"Latitude {lat} out of range [-90, 90] for key {key}"
if 'lng' in loc:
lng = float(loc['lng'])
if not -180 <= lng <= 180:
return f"Longitude {lng} out of range [-180, 180] for key {key}"
except (json.JSONDecodeError, ValueError, KeyError) as e:
# Not a location field, that's fine
pass
return None
def save_config(self) -> bool:
"""Save current configuration to file with validation."""
try:
# Validate config before saving
error = self._validate_config()
if error:
logger.error(f"Config validation failed for {self.app_id}: {error}")
return False
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
return True
except Exception as e:
logger.exception(f"Could not save config for {self.app_id}: {e}")
return False
def is_enabled(self) -> bool:
"""Check if app is enabled."""
return self.manifest.get("enabled", True)
def get_render_interval(self) -> int:
"""Get render interval in seconds."""
default = 300
try:
value = self.manifest.get("render_interval", default)
interval = int(value)
except (ValueError, TypeError):
interval = default
# Clamp to safe range: min 5, max 3600
return max(5, min(interval, 3600))
def get_display_duration(self) -> int:
"""Get display duration in seconds."""
default = 15
try:
value = self.manifest.get("display_duration", default)
duration = int(value)
except (ValueError, TypeError):
duration = default
# Clamp to safe range: min 1, max 600
return max(1, min(duration, 600))
def should_render(self, current_time: float) -> bool:
"""Check if app should be re-rendered based on interval."""
interval = self.get_render_interval()
return (current_time - self.last_render_time) >= interval
class StarlarkAppsPlugin(BasePlugin):
"""
Starlark Apps Manager plugin.
Manages Starlark (.star) apps and renders them using Pixlet.
Each installed app becomes a dynamic display mode.
"""
def __init__(self, plugin_id: str, config: Dict[str, Any],
display_manager, cache_manager, plugin_manager):
"""Initialize the Starlark Apps plugin."""
super().__init__(plugin_id, config, display_manager, cache_manager, plugin_manager)
# Initialize components
self.pixlet = PixletRenderer(
pixlet_path=config.get("pixlet_path"),
timeout=config.get("render_timeout", 30)
)
self.extractor = FrameExtractor(
default_frame_delay=config.get("default_frame_delay", 50)
)
# App storage
self.apps_dir = self._get_apps_directory()
self.manifest_file = self.apps_dir / "manifest.json"
self.apps: Dict[str, StarlarkApp] = {}
# Display state
self.current_app: Optional[StarlarkApp] = None
self.last_update_check = 0
# Check Pixlet availability
if not self.pixlet.is_available():
self.logger.error("Pixlet not available - Starlark apps will not work")
self.logger.error("Install Pixlet or place bundled binary in bin/pixlet/")
else:
version = self.pixlet.get_version()
self.logger.info(f"Pixlet available: {version}")
# Calculate optimal magnification based on display size
self.calculated_magnify = self._calculate_optimal_magnify()
if self.calculated_magnify > 1:
self.logger.info(f"Display size: {self.display_manager.matrix.width}x{self.display_manager.matrix.height}, "
f"recommended magnify: {self.calculated_magnify}")
# Load installed apps
self._load_installed_apps()
self.logger.info(f"Starlark Apps plugin initialized with {len(self.apps)} apps")
@property
def modes(self) -> List[str]:
"""
Return list of display modes (one per installed Starlark app).
This allows each installed app to appear as a separate display mode
in the schedule/rotation system.
Returns:
List of app IDs that can be used as display modes
"""
# Return list of enabled app IDs as display modes
return [app.app_id for app in self.apps.values() if app.is_enabled()]
def validate_config(self) -> bool:
"""
Validate plugin configuration.
Ensures required configuration values are valid for Starlark apps.
Returns:
True if configuration is valid, False otherwise
"""
# Call parent validation first
if not super().validate_config():
return False
# Validate magnify range (0-8)
if "magnify" in self.config:
magnify = self.config["magnify"]
if not isinstance(magnify, int) or magnify < 0 or magnify > 8:
self.logger.error("magnify must be an integer between 0 and 8")
return False
# Validate render_timeout
if "render_timeout" in self.config:
timeout = self.config["render_timeout"]
if not isinstance(timeout, (int, float)) or timeout < 5 or timeout > 120:
self.logger.error("render_timeout must be a number between 5 and 120")
return False
# Validate cache_ttl
if "cache_ttl" in self.config:
ttl = self.config["cache_ttl"]
if not isinstance(ttl, (int, float)) or ttl < 60 or ttl > 3600:
self.logger.error("cache_ttl must be a number between 60 and 3600")
return False
# Validate scale_method
if "scale_method" in self.config:
method = self.config["scale_method"]
valid_methods = ["nearest", "bilinear", "bicubic", "lanczos"]
if method not in valid_methods:
self.logger.error(f"scale_method must be one of: {', '.join(valid_methods)}")
return False
# Validate default_frame_delay
if "default_frame_delay" in self.config:
delay = self.config["default_frame_delay"]
if not isinstance(delay, (int, float)) or delay < 16 or delay > 1000:
self.logger.error("default_frame_delay must be a number between 16 and 1000")
return False
return True
def _calculate_optimal_magnify(self) -> int:
"""
Calculate optimal magnification factor based on display dimensions.
Tronbyte apps are designed for 64x32 displays.
This calculates what magnification would best fit the current display.
Returns:
Recommended magnify value (1-8)
"""
try:
display_width = self.display_manager.matrix.width
display_height = self.display_manager.matrix.height
# Tronbyte native resolution
NATIVE_WIDTH = 64
NATIVE_HEIGHT = 32
# Calculate scale factors for width and height
width_scale = display_width / NATIVE_WIDTH
height_scale = display_height / NATIVE_HEIGHT
# Use the smaller scale to ensure content fits
# (prevents overflow on one dimension)
scale_factor = min(width_scale, height_scale)
# Round down to get integer magnify value
magnify = int(scale_factor)
# Clamp to reasonable range (1-8)
magnify = max(1, min(8, magnify))
self.logger.debug(f"Display: {display_width}x{display_height}, "
f"Native: {NATIVE_WIDTH}x{NATIVE_HEIGHT}, "
f"Calculated magnify: {magnify}")
return magnify
except Exception as e:
self.logger.warning(f"Could not calculate magnify: {e}")
return 1
def get_magnify_recommendation(self) -> Dict[str, Any]:
"""
Get detailed magnification recommendation for current display.
Returns:
Dictionary with recommendation details
"""
try:
display_width = self.display_manager.matrix.width
display_height = self.display_manager.matrix.height
NATIVE_WIDTH = 64
NATIVE_HEIGHT = 32
width_scale = display_width / NATIVE_WIDTH
height_scale = display_height / NATIVE_HEIGHT
# Calculate for different magnify values
recommendations = []
for magnify in range(1, 9):
render_width = NATIVE_WIDTH * magnify
render_height = NATIVE_HEIGHT * magnify
# Check if this magnify fits perfectly
perfect_fit = (render_width == display_width and render_height == display_height)
# Check if scaling is needed
needs_scaling = (render_width != display_width or render_height != display_height)
# Calculate quality score (1-100)
if perfect_fit:
quality_score = 100
elif not needs_scaling:
quality_score = 95
else:
# Score based on how close to display size
width_ratio = min(render_width, display_width) / max(render_width, display_width)
height_ratio = min(render_height, display_height) / max(render_height, display_height)
quality_score = int((width_ratio + height_ratio) / 2 * 100)
recommendations.append({
'magnify': magnify,
'render_size': f"{render_width}x{render_height}",
'perfect_fit': perfect_fit,
'needs_scaling': needs_scaling,
'quality_score': quality_score,
'recommended': magnify == self.calculated_magnify
})
return {
'display_size': f"{display_width}x{display_height}",
'native_size': f"{NATIVE_WIDTH}x{NATIVE_HEIGHT}",
'calculated_magnify': self.calculated_magnify,
'width_scale': round(width_scale, 2),
'height_scale': round(height_scale, 2),
'recommendations': recommendations
}
except Exception as e:
self.logger.exception(f"Error getting magnify recommendation: {e}")
return {
'display_size': 'unknown',
'calculated_magnify': 1,
'recommendations': []
}
def _get_effective_magnify(self) -> int:
"""
Get the effective magnify value to use for rendering.
Priority:
1. User-configured magnify (if valid and in range 1-8)
2. Auto-calculated magnify
Returns:
Magnify value to use
"""
config_magnify = self.config.get("magnify")
# Validate and clamp config_magnify
if config_magnify is not None:
try:
# Convert to int if possible
config_magnify = int(config_magnify)
# Clamp to safe range (1-8)
if 1 <= config_magnify <= 8:
return config_magnify
except (ValueError, TypeError):
# Non-numeric value, fall through to calculated
pass
# Fall back to auto-calculated value
return self.calculated_magnify
def _get_apps_directory(self) -> Path:
"""Get the directory for storing Starlark apps."""
try:
# Try to find project root
current_dir = Path(__file__).resolve().parent
project_root = current_dir.parent.parent
apps_dir = project_root / "starlark-apps"
except Exception:
# Fallback to current working directory
apps_dir = Path.cwd() / "starlark-apps"
# Create directory if it doesn't exist
apps_dir.mkdir(parents=True, exist_ok=True)
return apps_dir
def _sanitize_app_id(self, app_id: str) -> str:
"""
Sanitize app_id into a safe slug for use in file paths.
Args:
app_id: Original app identifier
Returns:
Sanitized slug containing only [a-z0-9_.-] characters
"""
if not app_id:
raise ValueError("app_id cannot be empty")
# Replace invalid characters with underscore
# Allow only: lowercase letters, digits, underscore, period, hyphen
safe_slug = re.sub(r'[^a-z0-9_.-]', '_', app_id.lower())
# Remove leading/trailing dots, underscores, or hyphens
safe_slug = safe_slug.strip('._-')
# Ensure it's not empty after sanitization
if not safe_slug:
raise ValueError(f"app_id '{app_id}' becomes empty after sanitization")
return safe_slug
def _verify_path_safety(self, path: Path, base_dir: Path) -> None:
"""
Verify that a path is within the base directory to prevent path traversal.
Args:
path: Path to verify
base_dir: Base directory that path must be within
Raises:
ValueError: If path escapes the base directory
"""
try:
resolved_path = path.resolve()
resolved_base = base_dir.resolve()
# Check if path is relative to base directory
if not resolved_path.is_relative_to(resolved_base):
raise ValueError(
f"Path traversal detected: {resolved_path} is not within {resolved_base}"
)
except (ValueError, AttributeError) as e:
# AttributeError for Python < 3.9 where is_relative_to doesn't exist
# Fallback: check if resolved path starts with resolved base
resolved_path = path.resolve()
resolved_base = base_dir.resolve()
try:
resolved_path.relative_to(resolved_base)
except ValueError:
raise ValueError(
f"Path traversal detected: {resolved_path} is not within {resolved_base}"
) from e
def _load_installed_apps(self) -> None:
"""Load all installed apps from manifest."""
if not self.manifest_file.exists():
# Create initial manifest
self._save_manifest({"apps": {}})
return
try:
with open(self.manifest_file, 'r') as f:
manifest = json.load(f)
apps_data = manifest.get("apps", {})
for app_id, app_manifest in apps_data.items():
try:
# Sanitize app_id to prevent path traversal
safe_app_id = self._sanitize_app_id(app_id)
app_dir = (self.apps_dir / safe_app_id).resolve()
# Verify path safety
self._verify_path_safety(app_dir, self.apps_dir)
except ValueError as e:
self.logger.warning(f"Invalid app_id '{app_id}': {e}")
continue
if not app_dir.exists():
self.logger.warning(f"App directory missing: {app_id}")
continue
try:
# Use safe_app_id for internal storage to match directory structure
app = StarlarkApp(safe_app_id, app_dir, app_manifest)
self.apps[safe_app_id] = app
self.logger.debug(f"Loaded app: {app_id} (sanitized: {safe_app_id})")
except Exception as e:
self.logger.exception(f"Error loading app {app_id}: {e}")
self.logger.info(f"Loaded {len(self.apps)} Starlark apps")
except Exception as e:
self.logger.exception(f"Error loading apps manifest: {e}")
def _save_manifest(self, manifest: Dict[str, Any]) -> bool:
"""
Save apps manifest to file with file locking to prevent race conditions.
Uses exclusive lock during write to prevent concurrent modifications.
"""
try:
# Use atomic write pattern: write to temp file, then rename
temp_file = self.manifest_file.with_suffix('.tmp')
with open(temp_file, 'w') as f:
# Acquire exclusive lock during write
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
json.dump(manifest, f, indent=2)
f.flush()
os.fsync(f.fileno()) # Ensure data is written to disk
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# Atomic rename (overwrites destination)
temp_file.replace(self.manifest_file)
return True
except Exception as e:
self.logger.error(f"Error saving manifest: {e}")
# Clean up temp file if it exists
if temp_file.exists():
try:
temp_file.unlink()
except:
pass
return False
def _update_manifest_safe(self, updater_fn) -> bool:
"""
Safely update manifest with file locking to prevent race conditions.
Args:
updater_fn: Function that takes manifest dict and modifies it in-place
Returns:
True if successful, False otherwise
"""
try:
# Read current manifest with shared lock
with open(self.manifest_file, 'r') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
try:
manifest = json.load(f)
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# Apply updates
updater_fn(manifest)
# Write back with exclusive lock (handled by _save_manifest)
return self._save_manifest(manifest)
except Exception as e:
self.logger.error(f"Error updating manifest: {e}")
return False
def update(self) -> None:
"""Update method - check if apps need re-rendering."""
current_time = time.time()
# Check apps that need re-rendering based on their intervals
if self.config.get("auto_refresh_apps", True):
for app in self.apps.values():
if app.is_enabled() and app.should_render(current_time):
self._render_app(app, force=False)
def display(self, force_clear: bool = False) -> None:
"""
Display current Starlark app.
This method is called during the display rotation.
Displays frames from the currently active app.
"""
try:
if force_clear:
self.display_manager.clear()
# If no current app, try to select one
if not self.current_app:
self._select_next_app()
if not self.current_app:
# No apps available
self.logger.debug("No Starlark apps to display")
return
# Render app if needed
if not self.current_app.frames:
success = self._render_app(self.current_app, force=True)
if not success:
self.logger.error(f"Failed to render app: {self.current_app.app_id}")
return
# Display current frame
self._display_frame()
except Exception as e:
self.logger.error(f"Error displaying Starlark app: {e}")
def _select_next_app(self) -> None:
"""Select the next enabled app for display."""
enabled_apps = [app for app in self.apps.values() if app.is_enabled()]
if not enabled_apps:
self.current_app = None
return
# Simple rotation - could be enhanced with priorities
if self.current_app and self.current_app in enabled_apps:
current_idx = enabled_apps.index(self.current_app)
next_idx = (current_idx + 1) % len(enabled_apps)
self.current_app = enabled_apps[next_idx]
else:
self.current_app = enabled_apps[0]
self.logger.debug(f"Selected app for display: {self.current_app.app_id}")
def _render_app(self, app: StarlarkApp, force: bool = False) -> bool:
"""
Render a Starlark app using Pixlet.
Args:
app: App to render
force: Force render even if cached
Returns:
True if successful
"""
try:
current_time = time.time()
# Check cache
use_cache = self.config.get("cache_rendered_output", True)
cache_ttl = self.config.get("cache_ttl", 300)
if (not force and use_cache and app.cache_file.exists() and
(current_time - app.last_render_time) < cache_ttl):
# Use cached render
self.logger.debug(f"Using cached render for: {app.app_id}")
return self._load_frames_from_cache(app)
# Render with Pixlet
self.logger.info(f"Rendering app: {app.app_id}")
# Get effective magnification factor (config or auto-calculated)
magnify = self._get_effective_magnify()
self.logger.debug(f"Using magnify={magnify} for {app.app_id}")
# Filter out LEDMatrix-internal timing keys before passing to pixlet
INTERNAL_KEYS = {'render_interval', 'display_duration'}
pixlet_config = {k: v for k, v in app.config.items() if k not in INTERNAL_KEYS}
success, error = self.pixlet.render(
star_file=str(app.star_file),
output_path=str(app.cache_file),
config=pixlet_config,
magnify=magnify
)
if not success:
self.logger.error(f"Pixlet render failed: {error}")
return False
# Extract frames
success = self._load_frames_from_cache(app)
if success:
app.last_render_time = current_time
return success
except Exception as e:
self.logger.error(f"Error rendering app {app.app_id}: {e}")
return False
def _load_frames_from_cache(self, app: StarlarkApp) -> bool:
"""Load frames from cached WebP file."""
try:
success, frames, error = self.extractor.load_webp(str(app.cache_file))
if not success:
self.logger.error(f"Frame extraction failed: {error}")
return False
# Scale frames if needed
if self.config.get("scale_output", True):
width = self.display_manager.matrix.width
height = self.display_manager.matrix.height
# Get scaling method from config
scale_method_str = self.config.get("scale_method", "nearest")
scale_method_map = {
"nearest": Image.Resampling.NEAREST,
"bilinear": Image.Resampling.BILINEAR,
"bicubic": Image.Resampling.BICUBIC,
"lanczos": Image.Resampling.LANCZOS
}
scale_method = scale_method_map.get(scale_method_str, Image.Resampling.NEAREST)
# Check if we should center instead of scale
if self.config.get("center_small_output", False):
frames = self.extractor.center_frames(frames, width, height)
else:
frames = self.extractor.scale_frames(frames, width, height, scale_method)
# Optimize frames to limit memory usage (max_frames=None means no limit)
max_frames = self.config.get("max_frames")
if max_frames is not None:
frames = self.extractor.optimize_frames(frames, max_frames=max_frames)
app.frames = frames
app.current_frame_index = 0
app.last_frame_time = time.time()
self.logger.debug(f"Loaded {len(frames)} frames for {app.app_id}")
return True
except Exception as e:
self.logger.error(f"Error loading frames for {app.app_id}: {e}")
return False
def _display_frame(self) -> None:
"""Display the current frame of the current app."""
if not self.current_app or not self.current_app.frames:
return
try:
current_time = time.time()
frame, delay_ms = self.current_app.frames[self.current_app.current_frame_index]
# Set frame on display manager
self.display_manager.image = frame
self.display_manager.update_display()
# Check if it's time to advance to next frame
delay_seconds = delay_ms / 1000.0
if (current_time - self.current_app.last_frame_time) >= delay_seconds:
self.current_app.current_frame_index = (
(self.current_app.current_frame_index + 1) % len(self.current_app.frames)
)
self.current_app.last_frame_time = current_time
except Exception as e:
self.logger.error(f"Error displaying frame: {e}")
def install_app(self, app_id: str, star_file_path: str, metadata: Optional[Dict[str, Any]] = None) -> bool:
"""
Install a new Starlark app.
Args:
app_id: Unique identifier for the app
star_file_path: Path to .star file to install
metadata: Optional metadata (name, description, etc.)
Returns:
True if successful
"""
try:
import shutil
# Sanitize app_id to prevent path traversal
safe_app_id = self._sanitize_app_id(app_id)
# Create app directory with resolved path
app_dir = (self.apps_dir / safe_app_id).resolve()
# Verify path safety BEFORE creating directories
self._verify_path_safety(app_dir, self.apps_dir)
app_dir.mkdir(parents=True, exist_ok=True)
# Copy .star file with sanitized app_id
star_dest = app_dir / f"{safe_app_id}.star"
# Verify star_dest path safety
self._verify_path_safety(star_dest, self.apps_dir)
shutil.copy2(star_file_path, star_dest)
# Create app manifest entry
app_manifest = {
"name": metadata.get("name", app_id) if metadata else app_id,
"original_id": app_id, # Store original for reference
"star_file": f"{safe_app_id}.star",
"enabled": True,
"render_interval": metadata.get("render_interval", 300) if metadata else 300,
"display_duration": metadata.get("display_duration", 15) if metadata else 15
}
# Try to extract schema
_, schema, _ = self.pixlet.extract_schema(str(star_dest))
if schema:
schema_path = app_dir / "schema.json"
# Verify schema path safety
self._verify_path_safety(schema_path, self.apps_dir)
with open(schema_path, 'w') as f:
json.dump(schema, f, indent=2)
# Create default config — pre-populate with schema defaults
default_config = {}
if schema:
fields = schema.get('fields') or schema.get('schema') or []
for field in fields:
if isinstance(field, dict) and 'id' in field and 'default' in field:
default_config[field['id']] = field['default']
config_path = app_dir / "config.json"
# Verify config path safety
self._verify_path_safety(config_path, self.apps_dir)
with open(config_path, 'w') as f:
json.dump(default_config, f, indent=2)
# Update manifest (use safe_app_id as key to match directory)
def update_fn(manifest):
manifest["apps"][safe_app_id] = app_manifest
self._update_manifest_safe(update_fn)
# Create app instance (use safe_app_id for internal key, original for display)
app = StarlarkApp(safe_app_id, app_dir, app_manifest)
self.apps[safe_app_id] = app
self.logger.info(f"Installed Starlark app: {app_id} (sanitized: {safe_app_id})")
return True
except Exception as e:
self.logger.error(f"Error installing app {app_id}: {e}")
return False
def uninstall_app(self, app_id: str) -> bool:
"""
Uninstall a Starlark app.
Args:
app_id: App to uninstall
Returns:
True if successful
"""
try:
import shutil
if app_id not in self.apps:
self.logger.warning(f"App not found: {app_id}")
return False
# Remove from current app if selected
if self.current_app and self.current_app.app_id == app_id:
self.current_app = None
# Remove from apps dict
app = self.apps.pop(app_id)
# Remove directory
if app.app_dir.exists():
shutil.rmtree(app.app_dir)
# Update manifest
def update_fn(manifest):
if app_id in manifest["apps"]:
del manifest["apps"][app_id]
self._update_manifest_safe(update_fn)
self.logger.info(f"Uninstalled Starlark app: {app_id}")
return True
except Exception as e:
self.logger.error(f"Error uninstalling app {app_id}: {e}")
return False
def get_display_duration(self) -> float:
"""Get display duration for current app."""
if self.current_app:
return float(self.current_app.get_display_duration())
return self.config.get('display_duration', 15.0)
# ─── Vegas Mode Integration ──────────────────────────────────────
def get_vegas_content(self) -> Optional[List[Image.Image]]:
"""Return rendered frames from enabled starlark apps for vegas scroll."""
images = []
for app in self.apps.values():
if not app.is_enabled():
continue
# Use cached frames if available
if app.frames:
images.extend([frame for frame, delay in app.frames])
else:
# Try to render and extract frames
if self._render_app(app):
if app.frames:
images.extend([frame for frame, delay in app.frames])
return images if images else None
def get_vegas_content_type(self) -> str:
"""Indicate the type of content for Vegas scroll."""
return "multi"
def get_vegas_display_mode(self) -> VegasDisplayMode:
"""Get the display mode for Vegas scroll integration."""
return VegasDisplayMode.FIXED_SEGMENT
def get_info(self) -> Dict[str, Any]:
"""Return plugin info for web UI."""
info = super().get_info()
info.update({
'pixlet_available': self.pixlet.is_available(),
'pixlet_version': self.pixlet.get_version(),
'installed_apps': len(self.apps),
'enabled_apps': len([a for a in self.apps.values() if a.is_enabled()]),
'current_app': self.current_app.app_id if self.current_app else None,
'apps': {
app_id: {
'name': app.manifest.get('name', app_id),
'enabled': app.is_enabled(),
'has_frames': app.frames is not None
}
for app_id, app in self.apps.items()
}
})
return info

View File

@@ -0,0 +1,26 @@
{
"id": "starlark-apps",
"name": "Starlark Apps",
"version": "1.0.0",
"author": "LEDMatrix",
"description": "Manages and displays Starlark (.star) apps from Tronbyte/Tidbyt community. Import widgets seamlessly without modification.",
"entry_point": "manager.py",
"class_name": "StarlarkAppsPlugin",
"category": "system",
"tags": [
"starlark",
"widgets",
"tronbyte",
"tidbyt",
"apps",
"community"
],
"display_modes": [],
"update_interval": 60,
"default_duration": 15,
"dependencies": [
"Pillow>=10.0.0",
"PyYAML>=6.0",
"requests>=2.31.0"
]
}

View File

@@ -0,0 +1,599 @@
"""
Pixlet Renderer Module for Starlark Apps
Handles execution of Pixlet CLI to render .star files into WebP animations.
Supports bundled binaries and system-installed Pixlet.
"""
import json
import logging
import os
import platform
import re
import shutil
import subprocess
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, List
logger = logging.getLogger(__name__)
class PixletRenderer:
"""
Wrapper for Pixlet CLI rendering.
Handles:
- Auto-detection of bundled or system Pixlet binary
- Rendering .star files with configuration
- Schema extraction from .star files
- Timeout and error handling
"""
def __init__(self, pixlet_path: Optional[str] = None, timeout: int = 30):
"""
Initialize the Pixlet renderer.
Args:
pixlet_path: Optional explicit path to Pixlet binary
timeout: Maximum seconds to wait for rendering
"""
self.timeout = timeout
self.pixlet_binary = self._find_pixlet_binary(pixlet_path)
if self.pixlet_binary:
logger.info(f"Pixlet renderer initialized with binary: {self.pixlet_binary}")
else:
logger.warning("Pixlet binary not found - rendering will fail")
def _find_pixlet_binary(self, explicit_path: Optional[str] = None) -> Optional[str]:
"""
Find Pixlet binary using the following priority:
1. Explicit path provided
2. Bundled binary for current architecture
3. System PATH
Args:
explicit_path: User-specified path to Pixlet
Returns:
Path to Pixlet binary, or None if not found
"""
# 1. Check explicit path
if explicit_path and os.path.isfile(explicit_path):
if os.access(explicit_path, os.X_OK):
logger.debug(f"Using explicit Pixlet path: {explicit_path}")
return explicit_path
else:
logger.warning(f"Explicit Pixlet path not executable: {explicit_path}")
# 2. Check bundled binary
try:
bundled_path = self._get_bundled_binary_path()
if bundled_path and os.path.isfile(bundled_path):
# Ensure executable
if not os.access(bundled_path, os.X_OK):
try:
os.chmod(bundled_path, 0o755)
logger.debug(f"Made bundled binary executable: {bundled_path}")
except OSError:
logger.exception(f"Could not make bundled binary executable: {bundled_path}")
if os.access(bundled_path, os.X_OK):
logger.debug(f"Using bundled Pixlet binary: {bundled_path}")
return bundled_path
except OSError:
logger.exception("Could not locate bundled binary")
# 3. Check system PATH
system_pixlet = shutil.which("pixlet")
if system_pixlet:
logger.debug(f"Using system Pixlet: {system_pixlet}")
return system_pixlet
logger.error("Pixlet binary not found in any location")
return None
def _get_bundled_binary_path(self) -> Optional[str]:
"""
Get path to bundled Pixlet binary for current architecture.
Returns:
Path to bundled binary, or None if not found
"""
try:
# Determine project root (parent of plugin-repos)
current_dir = Path(__file__).resolve().parent
project_root = current_dir.parent.parent
bin_dir = project_root / "bin" / "pixlet"
# Detect architecture
system = platform.system().lower()
machine = platform.machine().lower()
# Map architecture to binary name
if system == "linux":
if "aarch64" in machine or "arm64" in machine:
binary_name = "pixlet-linux-arm64"
elif "x86_64" in machine or "amd64" in machine:
binary_name = "pixlet-linux-amd64"
else:
logger.warning(f"Unsupported Linux architecture: {machine}")
return None
elif system == "darwin":
if "arm64" in machine:
binary_name = "pixlet-darwin-arm64"
else:
binary_name = "pixlet-darwin-amd64"
elif system == "windows":
binary_name = "pixlet-windows-amd64.exe"
else:
logger.warning(f"Unsupported system: {system}")
return None
binary_path = bin_dir / binary_name
if binary_path.exists():
return str(binary_path)
logger.debug(f"Bundled binary not found at: {binary_path}")
return None
except OSError:
logger.exception("Error finding bundled binary")
return None
def _get_safe_working_directory(self, star_file: str) -> Optional[str]:
"""
Get a safe working directory for subprocess execution.
Args:
star_file: Path to .star file
Returns:
Resolved parent directory, or None if empty or invalid
"""
try:
resolved_parent = os.path.dirname(os.path.abspath(star_file))
# Return None if empty string to avoid FileNotFoundError
if not resolved_parent:
logger.debug(f"Empty parent directory for star_file: {star_file}")
return None
return resolved_parent
except (OSError, ValueError):
logger.debug(f"Could not resolve working directory for: {star_file}")
return None
def is_available(self) -> bool:
"""
Check if Pixlet is available and functional.
Returns:
True if Pixlet can be executed
"""
if not self.pixlet_binary:
return False
try:
result = subprocess.run(
[self.pixlet_binary, "version"],
capture_output=True,
text=True,
timeout=5
)
return result.returncode == 0
except subprocess.TimeoutExpired:
logger.debug("Pixlet version check timed out")
return False
except (subprocess.SubprocessError, OSError):
logger.exception("Pixlet not available")
return False
def get_version(self) -> Optional[str]:
"""
Get Pixlet version string.
Returns:
Version string, or None if unavailable
"""
if not self.pixlet_binary:
return None
try:
result = subprocess.run(
[self.pixlet_binary, "version"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return result.stdout.strip()
except subprocess.TimeoutExpired:
logger.debug("Pixlet version check timed out")
except (subprocess.SubprocessError, OSError):
logger.exception("Could not get Pixlet version")
return None
def render(
self,
star_file: str,
output_path: str,
config: Optional[Dict[str, Any]] = None,
magnify: int = 1
) -> Tuple[bool, Optional[str]]:
"""
Render a .star file to WebP output.
Args:
star_file: Path to .star file
output_path: Where to save WebP output
config: Configuration dictionary to pass to app
magnify: Magnification factor (default 1)
Returns:
Tuple of (success: bool, error_message: Optional[str])
"""
if not self.pixlet_binary:
return False, "Pixlet binary not found"
if not os.path.isfile(star_file):
return False, f"Star file not found: {star_file}"
try:
# Build command
cmd = [
self.pixlet_binary,
"render",
star_file,
"-o", output_path,
"-m", str(magnify)
]
# Add configuration parameters
if config:
for key, value in config.items():
# Validate key format (alphanumeric + underscore only)
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', key):
logger.warning(f"Skipping invalid config key: {key}")
continue
# Convert value to string for CLI
if isinstance(value, bool):
value_str = "true" if value else "false"
else:
value_str = str(value)
# Validate value doesn't contain shell metacharacters
# Allow alphanumeric, spaces, and common safe chars: .-_:/@#,
if not re.match(r'^[a-zA-Z0-9 .\-_:/@#,{}"\[\]]*$', value_str):
logger.warning(f"Skipping config value with unsafe characters for key {key}: {value_str}")
continue
cmd.extend(["-c", f"{key}={value_str}"])
logger.debug(f"Executing Pixlet: {' '.join(cmd)}")
# Execute rendering
safe_cwd = self._get_safe_working_directory(star_file)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout,
cwd=safe_cwd # Run in .star file directory (or None if relative path)
)
if result.returncode == 0:
if os.path.isfile(output_path):
logger.debug(f"Successfully rendered: {star_file} -> {output_path}")
return True, None
else:
error = "Rendering succeeded but output file not found"
logger.error(error)
return False, error
else:
error = f"Pixlet failed (exit {result.returncode}): {result.stderr}"
logger.error(error)
return False, error
except subprocess.TimeoutExpired:
error = f"Rendering timeout after {self.timeout}s"
logger.error(error)
return False, error
except (subprocess.SubprocessError, OSError):
logger.exception("Rendering exception")
return False, "Rendering failed - see logs for details"
def extract_schema(self, star_file: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
"""
Extract configuration schema from a .star file by parsing source code.
Supports:
- Static field definitions (location, text, toggle, dropdown, color, datetime)
- Variable-referenced dropdown options
- Graceful degradation for unsupported field types
Args:
star_file: Path to .star file
Returns:
Tuple of (success: bool, schema: Optional[Dict], error: Optional[str])
"""
if not os.path.isfile(star_file):
return False, None, f"Star file not found: {star_file}"
try:
# Read .star file
with open(star_file, 'r', encoding='utf-8') as f:
content = f.read()
# Parse schema from source
schema = self._parse_schema_from_source(content, star_file)
if schema:
field_count = len(schema.get('schema', []))
logger.debug(f"Extracted schema with {field_count} field(s) from: {star_file}")
return True, schema, None
else:
# No schema found - not an error, app just doesn't have configuration
logger.debug(f"No schema found in: {star_file}")
return True, None, None
except UnicodeDecodeError as e:
error = f"File encoding error: {e}"
logger.warning(error)
return False, None, error
except Exception as e:
logger.exception(f"Schema extraction failed for {star_file}")
return False, None, f"Schema extraction error: {str(e)}"
def _parse_schema_from_source(self, content: str, file_path: str) -> Optional[Dict[str, Any]]:
"""
Parse get_schema() function from Starlark source code.
Args:
content: .star file content
file_path: Path to file (for logging)
Returns:
Schema dict with format {"version": "1", "schema": [...]}, or None
"""
# Extract variable definitions (for dropdown options)
var_table = self._extract_variable_definitions(content)
# Extract get_schema() function body
schema_body = self._extract_get_schema_body(content)
if not schema_body:
return None
# Extract version
version_match = re.search(r'version\s*=\s*"([^"]+)"', schema_body)
version = version_match.group(1) if version_match else "1"
# Extract fields array from schema.Schema(...) - handle nested brackets
fields_start_match = re.search(r'fields\s*=\s*\[', schema_body)
if not fields_start_match:
# Empty schema or no fields
return {"version": version, "schema": []}
# Find matching closing bracket
bracket_count = 1
i = fields_start_match.end()
while i < len(schema_body) and bracket_count > 0:
if schema_body[i] == '[':
bracket_count += 1
elif schema_body[i] == ']':
bracket_count -= 1
i += 1
if bracket_count != 0:
# Unmatched brackets
return {"version": version, "schema": []}
fields_text = schema_body[fields_start_match.end():i-1]
# Parse individual fields
schema_fields = []
# Match schema.FieldType(...) patterns
field_pattern = r'schema\.(\w+)\s*\((.*?)\)'
# Find all field definitions (handle nested parentheses)
pos = 0
while pos < len(fields_text):
match = re.search(field_pattern, fields_text[pos:], re.DOTALL)
if not match:
break
field_type = match.group(1)
field_start = pos + match.start()
field_end = pos + match.end()
# Handle nested parentheses properly
paren_count = 1
i = pos + match.start() + len(f'schema.{field_type}(')
while i < len(fields_text) and paren_count > 0:
if fields_text[i] == '(':
paren_count += 1
elif fields_text[i] == ')':
paren_count -= 1
i += 1
field_params_text = fields_text[pos + match.start() + len(f'schema.{field_type}('):i-1]
# Parse field
field_dict = self._parse_schema_field(field_type, field_params_text, var_table)
if field_dict:
schema_fields.append(field_dict)
pos = i
return {
"version": version,
"schema": schema_fields
}
def _extract_variable_definitions(self, content: str) -> Dict[str, List[Dict]]:
"""
Extract top-level variable assignments (for dropdown options).
Args:
content: .star file content
Returns:
Dict mapping variable names to their option lists
"""
var_table = {}
# Find variable definitions like: variableName = [schema.Option(...), ...]
var_pattern = r'^(\w+)\s*=\s*\[(.*?schema\.Option.*?)\]'
matches = re.finditer(var_pattern, content, re.MULTILINE | re.DOTALL)
for match in matches:
var_name = match.group(1)
options_text = match.group(2)
# Parse schema.Option entries
options = self._parse_schema_options(options_text, {})
if options:
var_table[var_name] = options
return var_table
def _extract_get_schema_body(self, content: str) -> Optional[str]:
"""
Extract get_schema() function body.
Args:
content: .star file content
Returns:
Function body text, or None if not found
"""
# Find def get_schema():
pattern = r'def\s+get_schema\s*\(\s*\)\s*:(.*?)(?=\ndef\s|\Z)'
match = re.search(pattern, content, re.DOTALL)
if match:
return match.group(1)
return None
def _parse_schema_field(self, field_type: str, params_text: str, var_table: Dict) -> Optional[Dict[str, Any]]:
"""
Parse individual schema field definition.
Args:
field_type: Field type (Location, Text, Toggle, etc.)
params_text: Field parameters text
var_table: Variable lookup table
Returns:
Field dict, or None if parse fails
"""
# Map Pixlet field types to JSON typeOf
type_mapping = {
'Location': 'location',
'Text': 'text',
'Toggle': 'toggle',
'Dropdown': 'dropdown',
'Color': 'color',
'DateTime': 'datetime',
'OAuth2': 'oauth2',
'PhotoSelect': 'photo_select',
'LocationBased': 'location_based',
'Typeahead': 'typeahead',
'Generated': 'generated',
}
type_of = type_mapping.get(field_type, field_type.lower())
# Skip Generated fields (invisible meta-fields)
if type_of == 'generated':
return None
field_dict = {"typeOf": type_of}
# Extract common parameters
# id
id_match = re.search(r'id\s*=\s*"([^"]+)"', params_text)
if id_match:
field_dict['id'] = id_match.group(1)
else:
# id is required, skip field if missing
return None
# name
name_match = re.search(r'name\s*=\s*"([^"]+)"', params_text)
if name_match:
field_dict['name'] = name_match.group(1)
# desc
desc_match = re.search(r'desc\s*=\s*"([^"]+)"', params_text)
if desc_match:
field_dict['desc'] = desc_match.group(1)
# icon
icon_match = re.search(r'icon\s*=\s*"([^"]+)"', params_text)
if icon_match:
field_dict['icon'] = icon_match.group(1)
# default (can be string, bool, or variable reference)
default_match = re.search(r'default\s*=\s*([^,\)]+)', params_text)
if default_match:
default_value = default_match.group(1).strip()
# Handle boolean
if default_value in ('True', 'False'):
field_dict['default'] = default_value.lower()
# Handle string literal
elif default_value.startswith('"') and default_value.endswith('"'):
field_dict['default'] = default_value.strip('"')
# Handle variable reference (can't resolve, use as-is)
else:
# Try to extract just the value if it's like options[0].value
if '.' in default_value or '[' in default_value:
# Complex expression, skip default
pass
else:
field_dict['default'] = default_value
# For dropdown, extract options
if type_of == 'dropdown':
options_match = re.search(r'options\s*=\s*([^,\)]+)', params_text)
if options_match:
options_ref = options_match.group(1).strip()
# Check if it's a variable reference
if options_ref in var_table:
field_dict['options'] = var_table[options_ref]
# Or inline options
elif options_ref.startswith('['):
# Find the full options array (handle nested brackets)
# This is tricky, for now try to extract inline options
inline_match = re.search(r'options\s*=\s*(\[.*?\])', params_text, re.DOTALL)
if inline_match:
options_text = inline_match.group(1)
field_dict['options'] = self._parse_schema_options(options_text, var_table)
return field_dict
def _parse_schema_options(self, options_text: str, var_table: Dict) -> List[Dict[str, str]]:
"""
Parse schema.Option list.
Args:
options_text: Text containing schema.Option(...) entries
var_table: Variable lookup table (not currently used)
Returns:
List of {"display": "...", "value": "..."} dicts
"""
options = []
# Match schema.Option(display = "...", value = "...")
option_pattern = r'schema\.Option\s*\(\s*display\s*=\s*"([^"]+)"\s*,\s*value\s*=\s*"([^"]+)"\s*\)'
matches = re.finditer(option_pattern, options_text)
for match in matches:
options.append({
"display": match.group(1),
"value": match.group(2)
})
return options

View File

@@ -0,0 +1,3 @@
Pillow>=10.0.0
PyYAML>=6.0
requests>=2.31.0

View File

@@ -0,0 +1,472 @@
"""
Tronbyte Repository Module
Handles interaction with the Tronbyte apps repository on GitHub.
Fetches app listings, metadata, and downloads .star files.
"""
import logging
import time
import requests
import yaml
from typing import Dict, Any, Optional, List, Tuple
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
logger = logging.getLogger(__name__)
# Module-level cache for bulk app listing (survives across requests)
_apps_cache = {'data': None, 'timestamp': 0, 'categories': [], 'authors': []}
_CACHE_TTL = 7200 # 2 hours
class TronbyteRepository:
"""
Interface to the Tronbyte apps repository.
Provides methods to:
- List available apps
- Fetch app metadata
- Download .star files
- Parse manifest.yaml files
"""
REPO_OWNER = "tronbyt"
REPO_NAME = "apps"
DEFAULT_BRANCH = "main"
APPS_PATH = "apps"
def __init__(self, github_token: Optional[str] = None):
"""
Initialize repository interface.
Args:
github_token: Optional GitHub personal access token for higher rate limits
"""
self.github_token = github_token
self.base_url = "https://api.github.com"
self.raw_url = "https://raw.githubusercontent.com"
self.session = requests.Session()
if github_token:
self.session.headers.update({
'Authorization': f'token {github_token}'
})
self.session.headers.update({
'Accept': 'application/vnd.github.v3+json',
'User-Agent': 'LEDMatrix-Starlark-Plugin'
})
def _make_request(self, url: str, timeout: int = 10) -> Optional[Dict[str, Any]]:
"""
Make a request to GitHub API with error handling.
Args:
url: API URL to request
timeout: Request timeout in seconds
Returns:
JSON response or None on error
"""
try:
response = self.session.get(url, timeout=timeout)
if response.status_code == 403:
# Rate limit exceeded
logger.warning("GitHub API rate limit exceeded")
return None
elif response.status_code == 404:
logger.warning(f"Resource not found: {url}")
return None
elif response.status_code != 200:
logger.error(f"GitHub API error: {response.status_code}")
return None
return response.json()
except requests.Timeout:
logger.error(f"Request timeout: {url}")
return None
except requests.RequestException as e:
logger.error(f"Request error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error: {e}")
return None
def _fetch_raw_file(self, file_path: str, branch: Optional[str] = None) -> Optional[str]:
"""
Fetch raw file content from repository.
Args:
file_path: Path to file in repository
branch: Branch name (default: DEFAULT_BRANCH)
Returns:
File content as string, or None on error
"""
branch = branch or self.DEFAULT_BRANCH
url = f"{self.raw_url}/{self.REPO_OWNER}/{self.REPO_NAME}/{branch}/{file_path}"
try:
response = self.session.get(url, timeout=10)
if response.status_code == 200:
return response.text
else:
logger.warning(f"Failed to fetch raw file: {file_path} ({response.status_code})")
return None
except Exception as e:
logger.error(f"Error fetching raw file {file_path}: {e}")
return None
def list_apps(self) -> Tuple[bool, Optional[List[Dict[str, Any]]], Optional[str]]:
"""
List all available apps in the repository.
Returns:
Tuple of (success, apps_list, error_message)
"""
url = f"{self.base_url}/repos/{self.REPO_OWNER}/{self.REPO_NAME}/contents/{self.APPS_PATH}"
data = self._make_request(url)
if data is None:
return False, None, "Failed to fetch repository contents"
if not isinstance(data, list):
return False, None, "Invalid response format"
# Filter directories (apps)
apps = []
for item in data:
if item.get('type') == 'dir':
app_id = item.get('name')
if app_id and not app_id.startswith('.'):
apps.append({
'id': app_id,
'path': item.get('path'),
'url': item.get('url')
})
logger.info(f"Found {len(apps)} apps in repository")
return True, apps, None
def get_app_metadata(self, app_id: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
"""
Fetch metadata for a specific app.
Reads the manifest.yaml file for the app and parses it.
Args:
app_id: App identifier
Returns:
Tuple of (success, metadata_dict, error_message)
"""
manifest_path = f"{self.APPS_PATH}/{app_id}/manifest.yaml"
content = self._fetch_raw_file(manifest_path)
if not content:
return False, None, f"Failed to fetch manifest for {app_id}"
try:
metadata = yaml.safe_load(content)
# Validate that metadata is a dict before mutating
if not isinstance(metadata, dict):
if metadata is None:
logger.warning(f"Manifest for {app_id} is empty or None, initializing empty dict")
metadata = {}
else:
logger.error(f"Manifest for {app_id} is not a dict (got {type(metadata).__name__}), skipping")
return False, None, f"Invalid manifest format: expected dict, got {type(metadata).__name__}"
# Enhance with app_id
metadata['id'] = app_id
# Parse schema if present
if 'schema' in metadata:
# Schema is already parsed from YAML
pass
return True, metadata, None
except (yaml.YAMLError, TypeError) as e:
logger.error(f"Failed to parse manifest for {app_id}: {e}")
return False, None, f"Invalid manifest format: {e}"
def list_apps_with_metadata(self, max_apps: Optional[int] = None) -> List[Dict[str, Any]]:
"""
List all apps with their metadata.
This is slower as it fetches manifest.yaml for each app.
Args:
max_apps: Optional limit on number of apps to fetch
Returns:
List of app metadata dictionaries
"""
success, apps, error = self.list_apps()
if not success:
logger.error(f"Failed to list apps: {error}")
return []
if max_apps is not None:
apps = apps[:max_apps]
apps_with_metadata = []
for app_info in apps:
app_id = app_info['id']
success, metadata, error = self.get_app_metadata(app_id)
if success and metadata:
# Merge basic info with metadata
metadata.update({
'repository_path': app_info['path']
})
apps_with_metadata.append(metadata)
else:
# Add basic info even if metadata fetch failed
apps_with_metadata.append({
'id': app_id,
'name': app_id.replace('_', ' ').title(),
'summary': 'No description available',
'repository_path': app_info['path'],
'metadata_error': error
})
return apps_with_metadata
def list_all_apps_cached(self) -> Dict[str, Any]:
"""
Fetch ALL apps with metadata, using a module-level cache.
On first call (or after cache TTL expires), fetches the directory listing
via the GitHub API (1 call) then fetches all manifests in parallel via
raw.githubusercontent.com (not rate-limited). Results are cached for 2 hours.
Returns:
Dict with keys: apps, categories, authors, count, cached
"""
global _apps_cache
now = time.time()
if _apps_cache['data'] is not None and (now - _apps_cache['timestamp']) < _CACHE_TTL:
return {
'apps': _apps_cache['data'],
'categories': _apps_cache['categories'],
'authors': _apps_cache['authors'],
'count': len(_apps_cache['data']),
'cached': True
}
# Fetch directory listing (1 GitHub API call)
success, app_dirs, error = self.list_apps()
if not success or not app_dirs:
logger.error(f"Failed to list apps for bulk fetch: {error}")
return {'apps': [], 'categories': [], 'authors': [], 'count': 0, 'cached': False}
logger.info(f"Bulk-fetching manifests for {len(app_dirs)} apps...")
def fetch_one(app_info):
"""Fetch a single app's manifest (runs in thread pool)."""
app_id = app_info['id']
manifest_path = f"{self.APPS_PATH}/{app_id}/manifest.yaml"
content = self._fetch_raw_file(manifest_path)
if content:
try:
metadata = yaml.safe_load(content)
if not isinstance(metadata, dict):
metadata = {}
metadata['id'] = app_id
metadata['repository_path'] = app_info.get('path', '')
return metadata
except (yaml.YAMLError, TypeError):
pass
# Fallback: minimal entry
return {
'id': app_id,
'name': app_id.replace('_', ' ').replace('-', ' ').title(),
'summary': 'No description available',
'repository_path': app_info.get('path', ''),
}
# Parallel manifest fetches via raw.githubusercontent.com (high rate limit)
apps_with_metadata = []
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(fetch_one, info): info for info in app_dirs}
for future in as_completed(futures):
try:
result = future.result(timeout=30)
if result:
apps_with_metadata.append(result)
except Exception as e:
app_info = futures[future]
logger.warning(f"Failed to fetch manifest for {app_info['id']}: {e}")
apps_with_metadata.append({
'id': app_info['id'],
'name': app_info['id'].replace('_', ' ').replace('-', ' ').title(),
'summary': 'No description available',
'repository_path': app_info.get('path', ''),
})
# Sort by name for consistent ordering
apps_with_metadata.sort(key=lambda a: (a.get('name') or a.get('id', '')).lower())
# Extract unique categories and authors
categories = sorted({a.get('category', '') for a in apps_with_metadata if a.get('category')})
authors = sorted({a.get('author', '') for a in apps_with_metadata if a.get('author')})
# Update cache
_apps_cache['data'] = apps_with_metadata
_apps_cache['timestamp'] = now
_apps_cache['categories'] = categories
_apps_cache['authors'] = authors
logger.info(f"Cached {len(apps_with_metadata)} apps ({len(categories)} categories, {len(authors)} authors)")
return {
'apps': apps_with_metadata,
'categories': categories,
'authors': authors,
'count': len(apps_with_metadata),
'cached': False
}
def download_star_file(self, app_id: str, output_path: Path, filename: Optional[str] = None) -> Tuple[bool, Optional[str]]:
"""
Download the .star file for an app.
Args:
app_id: App identifier (directory name)
output_path: Where to save the .star file
filename: Optional specific filename from manifest (e.g., "analog_clock.star")
If not provided, assumes {app_id}.star
Returns:
Tuple of (success, error_message)
"""
# Use provided filename or fall back to app_id.star
star_filename = filename or f"{app_id}.star"
star_path = f"{self.APPS_PATH}/{app_id}/{star_filename}"
content = self._fetch_raw_file(star_path)
if not content:
return False, f"Failed to download .star file for {app_id} (tried {star_filename})"
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"Downloaded {app_id}.star to {output_path}")
return True, None
except OSError as e:
logger.exception(f"Failed to save .star file: {e}")
return False, f"Failed to save file: {e}"
def get_app_files(self, app_id: str) -> Tuple[bool, Optional[List[str]], Optional[str]]:
"""
List all files in an app directory.
Args:
app_id: App identifier
Returns:
Tuple of (success, file_list, error_message)
"""
url = f"{self.base_url}/repos/{self.REPO_OWNER}/{self.REPO_NAME}/contents/{self.APPS_PATH}/{app_id}"
data = self._make_request(url)
if not data:
return False, None, "Failed to fetch app files"
if not isinstance(data, list):
return False, None, "Invalid response format"
files = [item['name'] for item in data if item.get('type') == 'file']
return True, files, None
def search_apps(self, query: str, apps_with_metadata: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Search apps by name, summary, or description.
Args:
query: Search query string
apps_with_metadata: List of apps with metadata
Returns:
Filtered list of apps matching query
"""
if not query:
return apps_with_metadata
query_lower = query.lower()
results = []
for app in apps_with_metadata:
# Search in name, summary, description, author
searchable = ' '.join([
app.get('name', ''),
app.get('summary', ''),
app.get('desc', ''),
app.get('author', ''),
app.get('id', '')
]).lower()
if query_lower in searchable:
results.append(app)
return results
def filter_by_category(self, category: str, apps_with_metadata: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter apps by category.
Args:
category: Category name (or 'all' for no filtering)
apps_with_metadata: List of apps with metadata
Returns:
Filtered list of apps
"""
if not category or category.lower() == 'all':
return apps_with_metadata
category_lower = category.lower()
results = []
for app in apps_with_metadata:
app_category = app.get('category', '').lower()
if app_category == category_lower:
results.append(app)
return results
def get_rate_limit_info(self) -> Dict[str, Any]:
"""
Get current GitHub API rate limit information.
Returns:
Dictionary with rate limit info
"""
url = f"{self.base_url}/rate_limit"
data = self._make_request(url)
if data:
core = data.get('resources', {}).get('core', {})
return {
'limit': core.get('limit', 0),
'remaining': core.get('remaining', 0),
'reset': core.get('reset', 0),
'used': core.get('used', 0)
}
return {
'limit': 0,
'remaining': 0,
'reset': 0,
'used': 0
}

139
scripts/download_pixlet.sh Executable file
View File

@@ -0,0 +1,139 @@
#!/bin/bash
#
# Download Pixlet binaries for bundled distribution
#
# This script downloads Pixlet binaries from the Tronbyte fork
# for multiple architectures to support various platforms.
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
BIN_DIR="$PROJECT_ROOT/bin/pixlet"
# Pixlet version to download (use 'latest' to auto-detect)
PIXLET_VERSION="${PIXLET_VERSION:-latest}"
# GitHub repository (Tronbyte fork)
REPO="tronbyt/pixlet"
echo "========================================"
echo "Pixlet Binary Download Script"
echo "========================================"
# Auto-detect latest version if needed
if [ "$PIXLET_VERSION" = "latest" ]; then
echo "Detecting latest version..."
PIXLET_VERSION=$(curl -s "https://api.github.com/repos/${REPO}/releases/latest" | grep '"tag_name"' | sed -E 's/.*"([^"]+)".*/\1/')
if [ -z "$PIXLET_VERSION" ]; then
echo "Failed to detect latest version, using fallback"
PIXLET_VERSION="v0.50.2"
fi
fi
echo "Version: $PIXLET_VERSION"
echo "Target directory: $BIN_DIR"
echo ""
# Create bin directory if it doesn't exist
mkdir -p "$BIN_DIR"
# New naming convention: pixlet_v0.50.2_linux-arm64.tar.gz
# Only download ARM64 Linux binary for Raspberry Pi
declare -A ARCHITECTURES=(
["linux-arm64"]="pixlet_${PIXLET_VERSION}_linux-arm64.tar.gz"
)
download_binary() {
local arch="$1"
local archive_name="$2"
local binary_name="pixlet-${arch}"
local output_path="$BIN_DIR/$binary_name"
# Skip if already exists
if [ -f "$output_path" ]; then
echo "$binary_name already exists, skipping..."
return 0
fi
echo "→ Downloading $arch..."
# Construct download URL
local url="https://github.com/${REPO}/releases/download/${PIXLET_VERSION}/${archive_name}"
# Download to temp directory (use project-local temp to avoid /tmp permission issues)
local temp_dir
temp_dir=$(mktemp -d -p "$PROJECT_ROOT" -t pixlet_download.XXXXXXXXXX)
local temp_file="$temp_dir/$archive_name"
if ! curl -L -o "$temp_file" "$url" 2>/dev/null; then
echo "✗ Failed to download $arch"
rm -rf "$temp_dir"
return 1
fi
# Extract binary
echo " Extracting..."
if ! tar -xzf "$temp_file" -C "$temp_dir"; then
echo "✗ Failed to extract archive: $temp_file"
rm -rf "$temp_dir"
return 1
fi
# Find the pixlet binary in extracted files
local extracted_binary
extracted_binary=$(find "$temp_dir" -name "pixlet" | head -n 1)
if [ -z "$extracted_binary" ]; then
echo "✗ Binary not found in archive"
rm -rf "$temp_dir"
return 1
fi
# Move to final location
mv "$extracted_binary" "$output_path"
# Make executable
chmod +x "$output_path"
# Clean up
rm -rf "$temp_dir"
# Verify
local size
size=$(stat -f%z "$output_path" 2>/dev/null || stat -c%s "$output_path" 2>/dev/null || echo "unknown")
if [ "$size" = "unknown" ]; then
echo "✓ Downloaded $binary_name"
else
echo "✓ Downloaded $binary_name ($(numfmt --to=iec-i --suffix=B $size 2>/dev/null || echo "${size} bytes"))"
fi
return 0
}
# Download binaries for each architecture
success_count=0
total_count=${#ARCHITECTURES[@]}
for arch in "${!ARCHITECTURES[@]}"; do
if download_binary "$arch" "${ARCHITECTURES[$arch]}"; then
((success_count++))
fi
done
echo ""
echo "========================================"
echo "Download complete: $success_count/$total_count succeeded"
echo "========================================"
# List downloaded binaries
echo ""
echo "Installed binaries:"
if compgen -G "$BIN_DIR/*" > /dev/null 2>&1; then
ls -lh "$BIN_DIR"/*
else
echo "No binaries found"
fi
exit 0

View File

@@ -1852,6 +1852,55 @@ def get_installed_plugins():
'vegas_content_type': vegas_content_type
})
# Append virtual entries for installed Starlark apps
starlark_plugin = _get_starlark_plugin()
if starlark_plugin and hasattr(starlark_plugin, 'apps'):
for app_id, app in starlark_plugin.apps.items():
plugins.append({
'id': f'starlark:{app_id}',
'name': app.manifest.get('name', app_id),
'version': 'starlark',
'author': app.manifest.get('author', 'Tronbyte Community'),
'category': 'Starlark App',
'description': app.manifest.get('summary', 'Starlark app'),
'tags': ['starlark'],
'enabled': app.is_enabled(),
'verified': False,
'loaded': True,
'last_updated': None,
'last_commit': None,
'last_commit_message': None,
'branch': None,
'web_ui_actions': [],
'vegas_mode': 'fixed',
'vegas_content_type': 'multi',
'is_starlark_app': True,
})
else:
# Standalone: read from manifest on disk
manifest = _read_starlark_manifest()
for app_id, app_data in manifest.get('apps', {}).items():
plugins.append({
'id': f'starlark:{app_id}',
'name': app_data.get('name', app_id),
'version': 'starlark',
'author': 'Tronbyte Community',
'category': 'Starlark App',
'description': 'Starlark app',
'tags': ['starlark'],
'enabled': app_data.get('enabled', True),
'verified': False,
'loaded': False,
'last_updated': None,
'last_commit': None,
'last_commit_message': None,
'branch': None,
'web_ui_actions': [],
'vegas_mode': 'fixed',
'vegas_content_type': 'multi',
'is_starlark_app': True,
})
return jsonify({'status': 'success', 'data': {'plugins': plugins}})
except Exception as e:
import traceback
@@ -2127,6 +2176,28 @@ def toggle_plugin():
current_enabled = config.get(plugin_id, {}).get('enabled', False)
enabled = not current_enabled
# Handle starlark app toggle (starlark:<app_id> prefix)
if plugin_id.startswith('starlark:'):
starlark_app_id = plugin_id[len('starlark:'):]
starlark_plugin = _get_starlark_plugin()
if starlark_plugin and starlark_app_id in starlark_plugin.apps:
app = starlark_plugin.apps[starlark_app_id]
app.manifest['enabled'] = enabled
with open(starlark_plugin.manifest_file, 'r') as f:
manifest = json.load(f)
manifest['apps'][starlark_app_id]['enabled'] = enabled
starlark_plugin._save_manifest(manifest)
else:
# Standalone: update manifest directly
manifest = _read_starlark_manifest()
app_data = manifest.get('apps', {}).get(starlark_app_id)
if not app_data:
return jsonify({'status': 'error', 'message': f'Starlark app not found: {starlark_app_id}'}), 404
app_data['enabled'] = enabled
if not _write_starlark_manifest(manifest):
return jsonify({'status': 'error', 'message': 'Failed to save manifest'}), 500
return jsonify({'status': 'success', 'message': f"Starlark app {'enabled' if enabled else 'disabled'}", 'enabled': enabled})
# Check if plugin exists in manifests (discovered but may not be loaded)
if plugin_id not in api_v3.plugin_manager.plugin_manifests:
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404
@@ -6903,4 +6974,732 @@ def clear_old_errors():
message="Failed to clear old errors",
details=str(e),
status_code=500
)
)
# ─── Starlark Apps API ──────────────────────────────────────────────────────
def _get_tronbyte_repository_class():
"""Import TronbyteRepository from plugin-repos directory."""
import importlib.util
import importlib
module_path = PROJECT_ROOT / 'plugin-repos' / 'starlark-apps' / 'tronbyte_repository.py'
if not module_path.exists():
raise ImportError(f"TronbyteRepository module not found at {module_path}")
# If already imported, reload to pick up code changes
if "tronbyte_repository" in sys.modules:
importlib.reload(sys.modules["tronbyte_repository"])
return sys.modules["tronbyte_repository"].TronbyteRepository
spec = importlib.util.spec_from_file_location("tronbyte_repository", module_path)
module = importlib.util.module_from_spec(spec)
sys.modules["tronbyte_repository"] = module
spec.loader.exec_module(module)
return module.TronbyteRepository
def _get_pixlet_renderer_class():
"""Import PixletRenderer from plugin-repos directory."""
import importlib.util
import importlib
module_path = PROJECT_ROOT / 'plugin-repos' / 'starlark-apps' / 'pixlet_renderer.py'
if not module_path.exists():
raise ImportError(f"PixletRenderer module not found at {module_path}")
# If already imported, reload to pick up code changes
if "pixlet_renderer" in sys.modules:
importlib.reload(sys.modules["pixlet_renderer"])
return sys.modules["pixlet_renderer"].PixletRenderer
spec = importlib.util.spec_from_file_location("pixlet_renderer", module_path)
module = importlib.util.module_from_spec(spec)
sys.modules["pixlet_renderer"] = module
spec.loader.exec_module(module)
return module.PixletRenderer
def _validate_and_sanitize_app_id(app_id, fallback_source=None):
"""Validate and sanitize app_id to a safe slug."""
if not app_id and fallback_source:
app_id = fallback_source
if not app_id:
return None, "app_id is required"
if '..' in app_id or '/' in app_id or '\\' in app_id:
return None, "app_id contains invalid characters"
sanitized = re.sub(r'[^a-z0-9_]', '_', app_id.lower()).strip('_')
if not sanitized:
sanitized = f"app_{hashlib.sha256(app_id.encode()).hexdigest()[:12]}"
if sanitized[0].isdigit():
sanitized = f"app_{sanitized}"
return sanitized, None
def _validate_timing_value(value, field_name, min_val=1, max_val=86400):
"""Validate and coerce timing values."""
if value is None:
return None, None
try:
int_value = int(value)
except (ValueError, TypeError):
return None, f"{field_name} must be an integer"
if int_value < min_val:
return None, f"{field_name} must be at least {min_val}"
if int_value > max_val:
return None, f"{field_name} must be at most {max_val}"
return int_value, None
def _get_starlark_plugin():
"""Get the starlark-apps plugin instance, or None."""
if not api_v3.plugin_manager:
return None
return api_v3.plugin_manager.get_plugin('starlark-apps')
# Starlark standalone helpers for web service (plugin not loaded)
_STARLARK_APPS_DIR = PROJECT_ROOT / 'starlark-apps'
_STARLARK_MANIFEST_FILE = _STARLARK_APPS_DIR / 'manifest.json'
def _read_starlark_manifest() -> dict:
"""Read the starlark-apps manifest.json directly from disk."""
try:
if _STARLARK_MANIFEST_FILE.exists():
with open(_STARLARK_MANIFEST_FILE, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
logger.error(f"Error reading starlark manifest: {e}")
return {'apps': {}}
def _write_starlark_manifest(manifest: dict) -> bool:
"""Write the starlark-apps manifest.json to disk."""
try:
_STARLARK_APPS_DIR.mkdir(parents=True, exist_ok=True)
with open(_STARLARK_MANIFEST_FILE, 'w') as f:
json.dump(manifest, f, indent=2)
return True
except OSError as e:
logger.error(f"Error writing starlark manifest: {e}")
return False
def _install_star_file(app_id: str, star_file_path: str, metadata: dict) -> bool:
"""Install a .star file and update the manifest (standalone, no plugin needed)."""
import shutil
import json
app_dir = _STARLARK_APPS_DIR / app_id
app_dir.mkdir(parents=True, exist_ok=True)
dest = app_dir / f"{app_id}.star"
shutil.copy2(star_file_path, str(dest))
# Try to extract schema using PixletRenderer
try:
PixletRenderer = _get_pixlet_renderer_class()
pixlet = PixletRenderer()
if pixlet.is_available():
_, schema, _ = pixlet.extract_schema(str(dest))
if schema:
schema_path = app_dir / "schema.json"
with open(schema_path, 'w') as f:
json.dump(schema, f, indent=2)
logger.info(f"Extracted schema for {app_id}")
except Exception as e:
logger.warning(f"Failed to extract schema for {app_id}: {e}")
manifest = _read_starlark_manifest()
manifest.setdefault('apps', {})[app_id] = {
'name': metadata.get('name', app_id),
'enabled': True,
'render_interval': metadata.get('render_interval', 300),
'display_duration': metadata.get('display_duration', 15),
'config': metadata.get('config', {}),
'star_file': str(dest),
}
return _write_starlark_manifest(manifest)
@api_v3.route('/starlark/status', methods=['GET'])
def get_starlark_status():
"""Get Starlark plugin status and Pixlet availability."""
try:
if not api_v3.plugin_manager:
return jsonify({'status': 'error', 'message': 'Plugin manager not initialized', 'pixlet_available': False}), 500
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
info = starlark_plugin.get_info()
magnify_info = starlark_plugin.get_magnify_recommendation()
return jsonify({
'status': 'success',
'pixlet_available': info.get('pixlet_available', False),
'pixlet_version': info.get('pixlet_version'),
'installed_apps': info.get('installed_apps', 0),
'enabled_apps': info.get('enabled_apps', 0),
'current_app': info.get('current_app'),
'plugin_enabled': starlark_plugin.enabled,
'display_info': magnify_info
})
# Plugin not loaded - check Pixlet availability directly
import shutil
import platform
system = platform.system().lower()
machine = platform.machine().lower()
bin_dir = PROJECT_ROOT / 'bin' / 'pixlet'
pixlet_binary = None
if system == "linux":
if "aarch64" in machine or "arm64" in machine:
pixlet_binary = bin_dir / "pixlet-linux-arm64"
elif "x86_64" in machine or "amd64" in machine:
pixlet_binary = bin_dir / "pixlet-linux-amd64"
elif system == "darwin":
pixlet_binary = bin_dir / ("pixlet-darwin-arm64" if "arm64" in machine else "pixlet-darwin-amd64")
pixlet_available = (pixlet_binary and pixlet_binary.exists()) or shutil.which('pixlet') is not None
# Read app counts from manifest
manifest = _read_starlark_manifest()
apps = manifest.get('apps', {})
installed_count = len(apps)
enabled_count = sum(1 for a in apps.values() if a.get('enabled', True))
return jsonify({
'status': 'success',
'pixlet_available': pixlet_available,
'pixlet_version': None,
'installed_apps': installed_count,
'enabled_apps': enabled_count,
'plugin_enabled': True,
'plugin_loaded': False,
'display_info': {}
})
except Exception as e:
logger.error(f"Error getting starlark status: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/apps', methods=['GET'])
def get_starlark_apps():
"""List all installed Starlark apps."""
try:
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
apps_list = []
for app_id, app_instance in starlark_plugin.apps.items():
apps_list.append({
'id': app_id,
'name': app_instance.manifest.get('name', app_id),
'enabled': app_instance.is_enabled(),
'has_frames': app_instance.frames is not None,
'render_interval': app_instance.get_render_interval(),
'display_duration': app_instance.get_display_duration(),
'config': app_instance.config,
'has_schema': app_instance.schema is not None,
'last_render_time': app_instance.last_render_time
})
return jsonify({'status': 'success', 'apps': apps_list, 'count': len(apps_list)})
# Standalone: read manifest from disk
manifest = _read_starlark_manifest()
apps_list = []
for app_id, app_data in manifest.get('apps', {}).items():
apps_list.append({
'id': app_id,
'name': app_data.get('name', app_id),
'enabled': app_data.get('enabled', True),
'has_frames': False,
'render_interval': app_data.get('render_interval', 300),
'display_duration': app_data.get('display_duration', 15),
'config': app_data.get('config', {}),
'has_schema': False,
'last_render_time': None
})
return jsonify({'status': 'success', 'apps': apps_list, 'count': len(apps_list)})
except Exception as e:
logger.error(f"Error getting starlark apps: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/apps/<app_id>', methods=['GET'])
def get_starlark_app(app_id):
"""Get details for a specific Starlark app."""
try:
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
app = starlark_plugin.apps.get(app_id)
if not app:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
return jsonify({
'status': 'success',
'app': {
'id': app_id,
'name': app.manifest.get('name', app_id),
'enabled': app.is_enabled(),
'config': app.config,
'schema': app.schema,
'render_interval': app.get_render_interval(),
'display_duration': app.get_display_duration(),
'has_frames': app.frames is not None,
'frame_count': len(app.frames) if app.frames else 0,
'last_render_time': app.last_render_time,
}
})
# Standalone: read from manifest
manifest = _read_starlark_manifest()
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
# Load schema from schema.json if it exists
schema = None
schema_file = _STARLARK_APPS_DIR / app_id / 'schema.json'
if schema_file.exists():
try:
with open(schema_file, 'r') as f:
schema = json.load(f)
except Exception as e:
logger.warning(f"Failed to load schema for {app_id}: {e}")
return jsonify({
'status': 'success',
'app': {
'id': app_id,
'name': app_data.get('name', app_id),
'enabled': app_data.get('enabled', True),
'config': app_data.get('config', {}),
'schema': schema,
'render_interval': app_data.get('render_interval', 300),
'display_duration': app_data.get('display_duration', 15),
'has_frames': False,
'frame_count': 0,
'last_render_time': None,
}
})
except Exception as e:
logger.error(f"Error getting starlark app {app_id}: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/upload', methods=['POST'])
def upload_starlark_app():
"""Upload and install a new Starlark app."""
try:
if 'file' not in request.files:
return jsonify({'status': 'error', 'message': 'No file uploaded'}), 400
file = request.files['file']
if not file.filename or not file.filename.endswith('.star'):
return jsonify({'status': 'error', 'message': 'File must have .star extension'}), 400
app_name = request.form.get('name')
app_id_input = request.form.get('app_id')
filename_base = file.filename.replace('.star', '') if file.filename else None
app_id, app_id_error = _validate_and_sanitize_app_id(app_id_input, fallback_source=filename_base)
if app_id_error:
return jsonify({'status': 'error', 'message': f'Invalid app_id: {app_id_error}'}), 400
render_interval_input = request.form.get('render_interval')
render_interval = 300
if render_interval_input is not None:
render_interval, err = _validate_timing_value(render_interval_input, 'render_interval')
if err:
return jsonify({'status': 'error', 'message': err}), 400
render_interval = render_interval or 300
display_duration_input = request.form.get('display_duration')
display_duration = 15
if display_duration_input is not None:
display_duration, err = _validate_timing_value(display_duration_input, 'display_duration')
if err:
return jsonify({'status': 'error', 'message': err}), 400
display_duration = display_duration or 15
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix='.star') as tmp:
file.save(tmp.name)
temp_path = tmp.name
try:
metadata = {'name': app_name or app_id, 'render_interval': render_interval, 'display_duration': display_duration}
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
success = starlark_plugin.install_app(app_id, temp_path, metadata)
else:
success = _install_star_file(app_id, temp_path, metadata)
if success:
return jsonify({'status': 'success', 'message': f'App installed: {app_id}', 'app_id': app_id})
else:
return jsonify({'status': 'error', 'message': 'Failed to install app'}), 500
finally:
try:
os.unlink(temp_path)
except OSError:
pass
except Exception as e:
logger.error(f"Error uploading starlark app: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/apps/<app_id>', methods=['DELETE'])
def uninstall_starlark_app(app_id):
"""Uninstall a Starlark app."""
try:
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
success = starlark_plugin.uninstall_app(app_id)
else:
# Standalone: remove app dir and manifest entry
import shutil
app_dir = _STARLARK_APPS_DIR / app_id
if app_dir.exists():
shutil.rmtree(app_dir)
manifest = _read_starlark_manifest()
manifest.get('apps', {}).pop(app_id, None)
success = _write_starlark_manifest(manifest)
if success:
return jsonify({'status': 'success', 'message': f'App uninstalled: {app_id}'})
else:
return jsonify({'status': 'error', 'message': 'Failed to uninstall app'}), 500
except Exception as e:
logger.error(f"Error uninstalling starlark app {app_id}: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/apps/<app_id>/config', methods=['GET'])
def get_starlark_app_config(app_id):
"""Get configuration for a Starlark app."""
try:
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
app = starlark_plugin.apps.get(app_id)
if not app:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
return jsonify({'status': 'success', 'config': app.config, 'schema': app.schema})
# Standalone: read from manifest
manifest = _read_starlark_manifest()
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
return jsonify({'status': 'success', 'config': app_data.get('config', {}), 'schema': None})
except Exception as e:
logger.error(f"Error getting config for {app_id}: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/apps/<app_id>/config', methods=['PUT'])
def update_starlark_app_config(app_id):
"""Update configuration for a Starlark app."""
try:
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No configuration provided'}), 400
if 'render_interval' in data:
val, err = _validate_timing_value(data['render_interval'], 'render_interval')
if err:
return jsonify({'status': 'error', 'message': err}), 400
data['render_interval'] = val
if 'display_duration' in data:
val, err = _validate_timing_value(data['display_duration'], 'display_duration')
if err:
return jsonify({'status': 'error', 'message': err}), 400
data['display_duration'] = val
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
app = starlark_plugin.apps.get(app_id)
if not app:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
# Extract timing keys from data before updating config (they belong in manifest, not config)
render_interval = data.pop('render_interval', None)
display_duration = data.pop('display_duration', None)
# Update config with non-timing fields only
app.config.update(data)
# Update manifest with timing fields
timing_changed = False
if render_interval is not None:
app.manifest['render_interval'] = render_interval
timing_changed = True
if display_duration is not None:
app.manifest['display_duration'] = display_duration
timing_changed = True
if app.save_config():
# Persist manifest if timing changed (same pattern as toggle endpoint)
if timing_changed:
try:
# Use safe manifest update to prevent race conditions
timing_updates = {}
if render_interval is not None:
timing_updates['render_interval'] = render_interval
if display_duration is not None:
timing_updates['display_duration'] = display_duration
def update_fn(manifest):
manifest['apps'][app_id].update(timing_updates)
starlark_plugin._update_manifest_safe(update_fn)
except Exception as e:
logger.warning(f"Failed to persist timing to manifest for {app_id}: {e}")
starlark_plugin._render_app(app, force=True)
return jsonify({'status': 'success', 'message': 'Configuration updated', 'config': app.config})
else:
return jsonify({'status': 'error', 'message': 'Failed to save configuration'}), 500
# Standalone: update manifest directly
manifest = _read_starlark_manifest()
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
app_data.setdefault('config', {}).update(data)
if 'render_interval' in data:
app_data['render_interval'] = data['render_interval']
if 'display_duration' in data:
app_data['display_duration'] = data['display_duration']
if _write_starlark_manifest(manifest):
return jsonify({'status': 'success', 'message': 'Configuration updated', 'config': app_data.get('config', {})})
else:
return jsonify({'status': 'error', 'message': 'Failed to save configuration'}), 500
except Exception as e:
logger.error(f"Error updating config for {app_id}: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/apps/<app_id>/toggle', methods=['POST'])
def toggle_starlark_app(app_id):
"""Enable or disable a Starlark app."""
try:
data = request.get_json() or {}
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
app = starlark_plugin.apps.get(app_id)
if not app:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
enabled = data.get('enabled')
if enabled is None:
enabled = not app.is_enabled()
app.manifest['enabled'] = enabled
# Use safe manifest update to prevent race conditions
def update_fn(manifest):
manifest['apps'][app_id]['enabled'] = enabled
starlark_plugin._update_manifest_safe(update_fn)
return jsonify({'status': 'success', 'message': f"App {'enabled' if enabled else 'disabled'}", 'enabled': enabled})
# Standalone: update manifest directly
manifest = _read_starlark_manifest()
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
enabled = data.get('enabled')
if enabled is None:
enabled = not app_data.get('enabled', True)
app_data['enabled'] = enabled
if _write_starlark_manifest(manifest):
return jsonify({'status': 'success', 'message': f"App {'enabled' if enabled else 'disabled'}", 'enabled': enabled})
else:
return jsonify({'status': 'error', 'message': 'Failed to save'}), 500
except Exception as e:
logger.error(f"Error toggling app {app_id}: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/apps/<app_id>/render', methods=['POST'])
def render_starlark_app(app_id):
"""Force render a Starlark app."""
try:
starlark_plugin = _get_starlark_plugin()
if not starlark_plugin:
return jsonify({'status': 'error', 'message': 'Rendering requires the main LEDMatrix service (plugin not loaded in web service)'}), 503
app = starlark_plugin.apps.get(app_id)
if not app:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
success = starlark_plugin._render_app(app, force=True)
if success:
return jsonify({'status': 'success', 'message': 'App rendered', 'frame_count': len(app.frames) if app.frames else 0})
else:
return jsonify({'status': 'error', 'message': 'Failed to render app'}), 500
except Exception as e:
logger.error(f"Error rendering app {app_id}: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/repository/browse', methods=['GET'])
def browse_tronbyte_repository():
"""Browse all apps in the Tronbyte repository (bulk cached fetch).
Returns ALL apps with metadata, categories, and authors.
Filtering/sorting/pagination is handled client-side.
Results are cached server-side for 2 hours.
"""
try:
TronbyteRepository = _get_tronbyte_repository_class()
config = api_v3.config_manager.load_config() if api_v3.config_manager else {}
github_token = config.get('github_token')
repo = TronbyteRepository(github_token=github_token)
result = repo.list_all_apps_cached()
rate_limit = repo.get_rate_limit_info()
return jsonify({
'status': 'success',
'apps': result['apps'],
'categories': result['categories'],
'authors': result['authors'],
'count': result['count'],
'cached': result['cached'],
'rate_limit': rate_limit,
})
except Exception as e:
logger.error(f"Error browsing repository: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/repository/install', methods=['POST'])
def install_from_tronbyte_repository():
"""Install an app from the Tronbyte repository."""
try:
data = request.get_json()
if not data or 'app_id' not in data:
return jsonify({'status': 'error', 'message': 'app_id is required'}), 400
app_id, app_id_error = _validate_and_sanitize_app_id(data['app_id'])
if app_id_error:
return jsonify({'status': 'error', 'message': f'Invalid app_id: {app_id_error}'}), 400
TronbyteRepository = _get_tronbyte_repository_class()
import tempfile
config = api_v3.config_manager.load_config() if api_v3.config_manager else {}
github_token = config.get('github_token')
repo = TronbyteRepository(github_token=github_token)
success, metadata, error = repo.get_app_metadata(data['app_id'])
if not success:
return jsonify({'status': 'error', 'message': f'Failed to fetch app metadata: {error}'}), 404
with tempfile.NamedTemporaryFile(delete=False, suffix='.star') as tmp:
temp_path = tmp.name
try:
# Pass filename from metadata (e.g., "analog_clock.star" for analogclock app)
# Note: manifest uses 'fileName' (camelCase), not 'filename'
filename = metadata.get('fileName') if metadata else None
success, error = repo.download_star_file(data['app_id'], Path(temp_path), filename=filename)
if not success:
return jsonify({'status': 'error', 'message': f'Failed to download app: {error}'}), 500
render_interval = data.get('render_interval', 300)
ri, err = _validate_timing_value(render_interval, 'render_interval')
if err:
return jsonify({'status': 'error', 'message': err}), 400
render_interval = ri or 300
display_duration = data.get('display_duration', 15)
dd, err = _validate_timing_value(display_duration, 'display_duration')
if err:
return jsonify({'status': 'error', 'message': err}), 400
display_duration = dd or 15
install_metadata = {
'name': metadata.get('name', app_id) if metadata else app_id,
'render_interval': render_interval,
'display_duration': display_duration
}
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
success = starlark_plugin.install_app(data['app_id'], temp_path, install_metadata)
else:
success = _install_star_file(data['app_id'], temp_path, install_metadata)
if success:
return jsonify({'status': 'success', 'message': f'App installed: {metadata.get("name", app_id) if metadata else app_id}', 'app_id': app_id})
else:
return jsonify({'status': 'error', 'message': 'Failed to install app'}), 500
finally:
try:
os.unlink(temp_path)
except OSError:
pass
except Exception as e:
logger.error(f"Error installing from repository: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/repository/categories', methods=['GET'])
def get_tronbyte_categories():
"""Get list of available app categories (uses bulk cache)."""
try:
TronbyteRepository = _get_tronbyte_repository_class()
config = api_v3.config_manager.load_config() if api_v3.config_manager else {}
repo = TronbyteRepository(github_token=config.get('github_token'))
result = repo.list_all_apps_cached()
return jsonify({'status': 'success', 'categories': result['categories']})
except Exception as e:
logger.error(f"Error fetching categories: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500
@api_v3.route('/starlark/install-pixlet', methods=['POST'])
def install_pixlet():
"""Download and install Pixlet binary."""
try:
script_path = PROJECT_ROOT / 'scripts' / 'download_pixlet.sh'
if not script_path.exists():
return jsonify({'status': 'error', 'message': 'Installation script not found'}), 404
os.chmod(script_path, 0o755)
result = subprocess.run(
[str(script_path)],
cwd=str(PROJECT_ROOT),
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
logger.info("Pixlet downloaded successfully")
return jsonify({'status': 'success', 'message': 'Pixlet installed successfully!', 'output': result.stdout})
else:
return jsonify({'status': 'error', 'message': f'Failed to download Pixlet: {result.stderr}'}), 500
except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Download timed out'}), 500
except Exception as e:
logger.error(f"Error installing Pixlet: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 500

View File

@@ -322,7 +322,11 @@ def _load_plugin_config_partial(plugin_id):
try:
if not pages_v3.plugin_manager:
return '<div class="text-red-500 p-4">Plugin manager not available</div>', 500
# Handle starlark app config (starlark:<app_id>)
if plugin_id.startswith('starlark:'):
return _load_starlark_config_partial(plugin_id[len('starlark:'):])
# Try to get plugin info first
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
@@ -429,3 +433,58 @@ def _load_plugin_config_partial(plugin_id):
import traceback
traceback.print_exc()
return f'<div class="text-red-500 p-4">Error loading plugin config: {str(e)}</div>', 500
def _load_starlark_config_partial(app_id):
"""Load configuration partial for a Starlark app."""
try:
starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None
if starlark_plugin and hasattr(starlark_plugin, 'apps'):
app = starlark_plugin.apps.get(app_id)
if not app:
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
return render_template(
'v3/partials/starlark_config.html',
app_id=app_id,
app_name=app.manifest.get('name', app_id),
app_enabled=app.is_enabled(),
render_interval=app.get_render_interval(),
display_duration=app.get_display_duration(),
config=app.config,
schema=app.schema,
has_frames=app.frames is not None,
frame_count=len(app.frames) if app.frames else 0,
last_render_time=app.last_render_time,
)
# Standalone: read from manifest file
manifest_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / 'manifest.json'
if not manifest_file.exists():
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
with open(manifest_file, 'r') as f:
manifest = json.load(f)
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
return render_template(
'v3/partials/starlark_config.html',
app_id=app_id,
app_name=app_data.get('name', app_id),
app_enabled=app_data.get('enabled', True),
render_interval=app_data.get('render_interval', 300),
display_duration=app_data.get('display_duration', 15),
config=app_data.get('config', {}),
schema=None,
has_frames=False,
frame_count=0,
last_render_time=None,
)
except Exception as e:
import traceback
traceback.print_exc()
return f'<div class="text-red-500 p-4">Error loading starlark config: {str(e)}</div>', 500

File diff suppressed because it is too large Load Diff

View File

@@ -147,24 +147,61 @@
</div>
</div>
</div>
<div class="mb-6">
<div class="flex gap-3">
<input type="text" id="plugin-search" placeholder="Search plugins by name, description, or tags..." class="form-control text-sm flex-[3] min-w-0 px-4 py-2.5 border border-gray-300 rounded-lg shadow-sm focus:shadow-md transition-shadow">
<select id="plugin-category" class="form-control text-sm flex-1 px-3 py-2.5 border border-gray-300 rounded-lg shadow-sm focus:shadow-md transition-shadow">
<option value="">All Categories</option>
<option value="sports">Sports</option>
<option value="content">Content</option>
<option value="time">Time</option>
<option value="weather">Weather</option>
<option value="financial">Financial</option>
<option value="media">Media</option>
<option value="demo">Demo</option>
<!-- Search Row -->
<div class="flex gap-3 mb-4">
<input type="text" id="plugin-search" placeholder="Search plugins by name, description, or tags..." class="form-control text-sm flex-[3] min-w-0 px-4 py-2.5 border border-gray-300 rounded-lg shadow-sm focus:shadow-md transition-shadow">
<select id="plugin-category" class="form-control text-sm flex-1 px-3 py-2.5 border border-gray-300 rounded-lg shadow-sm focus:shadow-md transition-shadow">
<option value="">All Categories</option>
<option value="sports">Sports</option>
<option value="content">Content</option>
<option value="time">Time</option>
<option value="weather">Weather</option>
<option value="financial">Financial</option>
<option value="media">Media</option>
<option value="demo">Demo</option>
</select>
</div>
<!-- Sort & Filter Bar -->
<div id="store-filter-bar" class="flex flex-wrap items-center gap-3 mb-4 p-3 bg-gray-50 rounded-lg border border-gray-200">
<!-- Sort -->
<select id="store-sort" class="text-sm px-3 py-1.5 border border-gray-300 rounded-md bg-white">
<option value="a-z">A → Z</option>
<option value="z-a">Z → A</option>
<option value="category">Category</option>
<option value="author">Author</option>
<option value="newest">Newest</option>
</select>
<div class="w-px h-6 bg-gray-300"></div>
<!-- Installed filter toggle -->
<button id="store-filter-installed" class="text-sm px-3 py-1.5 rounded-md border border-gray-300 bg-white hover:bg-gray-100 transition-colors" title="Cycle: All → Installed → Not Installed">
<i class="fas fa-filter mr-1 text-gray-400"></i>All
</button>
<div class="flex-1"></div>
<!-- Active filter count + clear -->
<span id="store-active-filters" class="hidden text-xs text-blue-600 font-medium"></span>
<button id="store-clear-filters" class="hidden text-sm px-3 py-1.5 rounded-md border border-red-300 bg-white text-red-600 hover:bg-red-50 transition-colors">
<i class="fas fa-times mr-1"></i>Clear Filters
</button>
</div>
<!-- Results Bar (top pagination) -->
<div class="flex flex-wrap items-center justify-between gap-3 mb-4">
<span id="store-results-info" class="text-sm text-gray-600"></span>
<div class="flex items-center gap-3">
<select id="store-per-page" class="text-sm px-2 py-1 border border-gray-300 rounded-md bg-white">
<option value="12">12 per page</option>
<option value="24">24 per page</option>
<option value="48">48 per page</option>
</select>
<button id="search-plugins-btn" class="btn bg-blue-600 hover:bg-blue-700 text-white px-5 py-2.5 rounded-lg whitespace-nowrap font-semibold shadow-sm">
<i class="fas fa-search mr-2"></i>Search
</button>
<div id="store-pagination-top" class="flex items-center gap-1"></div>
</div>
</div>
<div id="plugin-store-grid" class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 xl:grid-cols-4 2xl:grid-cols-5 gap-6">
<!-- Loading skeleton -->
<div class="store-loading col-span-full">
@@ -174,11 +211,106 @@
<div class="bg-gray-200 rounded-lg p-4 h-48 animate-pulse"></div>
<div class="bg-gray-200 rounded-lg p-4 h-48 animate-pulse"></div>
<div class="bg-gray-200 rounded-lg p-4 h-48 animate-pulse"></div>
</div>
</div>
</div>
<!-- Bottom Pagination -->
<div class="flex flex-wrap items-center justify-between gap-3 mt-4">
<span id="store-results-info-bottom" class="text-sm text-gray-600"></span>
<div id="store-pagination-bottom" class="flex items-center gap-1"></div>
</div>
</div>
<!-- Starlark Apps Section (Tronbyte Community Apps) -->
<div id="starlark-apps-section" class="border-t border-gray-200 pt-8 mt-8">
<div class="flex items-center justify-between mb-5 pb-3 border-b border-gray-200">
<div class="flex items-center gap-3">
<h3 class="text-lg font-bold text-gray-900"><i class="fas fa-star text-yellow-500 mr-2"></i>Starlark Apps</h3>
<span id="starlark-apps-count" class="text-sm text-gray-500 font-medium"></span>
</div>
<button id="toggle-starlark-section" class="text-sm text-blue-600 hover:text-blue-800 flex items-center font-medium transition-colors">
<i class="fas fa-chevron-down mr-1" id="starlark-section-icon"></i>
<span>Show</span>
</button>
</div>
<div id="starlark-section-content" class="hidden">
<p class="text-sm text-gray-600 mb-4">Browse and install Starlark apps from the <a href="https://github.com/tronbyt/apps" target="_blank" class="text-blue-600 hover:text-blue-800 underline">Tronbyte community repository</a>. Requires <strong>Pixlet</strong> binary.</p>
<!-- Pixlet Status Banner -->
<div id="starlark-pixlet-status" class="mb-4"></div>
<!-- Search Row -->
<div class="flex gap-3 mb-4">
<input type="text" id="starlark-search" placeholder="Search by name, description, author..." class="form-control text-sm flex-[3] min-w-0 px-4 py-2.5 border border-gray-300 rounded-lg shadow-sm focus:shadow-md transition-shadow">
<select id="starlark-category" class="form-control text-sm flex-1 px-3 py-2.5 border border-gray-300 rounded-lg shadow-sm focus:shadow-md transition-shadow">
<option value="">All Categories</option>
</select>
</div>
<!-- Sort & Filter Bar -->
<div id="starlark-filter-bar" class="flex flex-wrap items-center gap-3 mb-4 p-3 bg-gray-50 rounded-lg border border-gray-200">
<!-- Sort -->
<select id="starlark-sort" class="text-sm px-3 py-1.5 border border-gray-300 rounded-md bg-white">
<option value="a-z">A → Z</option>
<option value="z-a">Z → A</option>
<option value="category">Category</option>
<option value="author">Author</option>
</select>
<div class="w-px h-6 bg-gray-300"></div>
<!-- Installed filter toggle -->
<button id="starlark-filter-installed" class="text-sm px-3 py-1.5 rounded-md border border-gray-300 bg-white hover:bg-gray-100 transition-colors" title="Cycle: All → Installed → Not Installed">
<i class="fas fa-filter mr-1 text-gray-400"></i>All
</button>
<!-- Author filter -->
<select id="starlark-filter-author" class="text-sm px-3 py-1.5 border border-gray-300 rounded-md bg-white">
<option value="">All Authors</option>
</select>
<div class="flex-1"></div>
<!-- Active filter count + clear -->
<span id="starlark-active-filters" class="hidden text-xs text-blue-600 font-medium"></span>
<button id="starlark-clear-filters" class="hidden text-sm px-3 py-1.5 rounded-md border border-red-300 bg-white text-red-600 hover:bg-red-50 transition-colors">
<i class="fas fa-times mr-1"></i>Clear Filters
</button>
</div>
<!-- Results Bar (top pagination) -->
<div class="flex flex-wrap items-center justify-between gap-3 mb-4">
<span id="starlark-results-info" class="text-sm text-gray-600"></span>
<div class="flex items-center gap-3">
<select id="starlark-per-page" class="text-sm px-2 py-1 border border-gray-300 rounded-md bg-white">
<option value="24">24 per page</option>
<option value="48">48 per page</option>
<option value="96">96 per page</option>
</select>
<div id="starlark-pagination-top" class="flex items-center gap-1"></div>
</div>
</div>
<!-- Starlark Apps Grid -->
<div id="starlark-apps-grid" class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 xl:grid-cols-4 2xl:grid-cols-5 gap-6">
</div>
<!-- Bottom Pagination -->
<div class="flex flex-wrap items-center justify-between gap-3 mt-4">
<span id="starlark-results-info-bottom" class="text-sm text-gray-600"></span>
<div id="starlark-pagination-bottom" class="flex items-center gap-1"></div>
</div>
<!-- Upload .star file -->
<div class="flex gap-3 mt-6 pt-4 border-t border-gray-200">
<button id="starlark-upload-btn" class="btn bg-green-600 hover:bg-green-700 text-white px-4 py-2 rounded-md text-sm">
<i class="fas fa-upload mr-2"></i>Upload .star File
</button>
</div>
</div>
</div>
<!-- Install from GitHub URL Section (Separate section, always visible) -->
<div class="border-t border-gray-200 pt-8 mt-8">
<div class="flex items-center justify-between mb-5 pb-3 border-b border-gray-200">

View File

@@ -0,0 +1,449 @@
<div class="space-y-6">
<!-- Header -->
<div class="flex items-center justify-between pb-4 border-b border-gray-200">
<div>
<h3 class="text-lg font-bold text-gray-900">
<i class="fas fa-star text-yellow-500 mr-2"></i>{{ app_name }}
</h3>
<p class="text-sm text-gray-500 mt-1">Starlark App &mdash; ID: {{ app_id }}</p>
</div>
<div class="flex items-center gap-3">
<span class="badge badge-warning"><i class="fas fa-star mr-1"></i>Starlark</span>
{% if app_enabled %}
<span class="badge badge-success">Enabled</span>
{% else %}
<span class="badge badge-error">Disabled</span>
{% endif %}
</div>
</div>
<!-- Status -->
<div class="bg-gray-50 rounded-lg p-4 border border-gray-200">
<h4 class="text-sm font-semibold text-gray-700 mb-3">Status</h4>
<div class="grid grid-cols-2 md:grid-cols-4 gap-4 text-sm">
<div>
<span class="text-gray-500">Frames:</span>
<span class="font-medium ml-1">{{ frame_count if has_frames else 'Not rendered' }}</span>
</div>
<div>
<span class="text-gray-500">Render Interval:</span>
<span class="font-medium ml-1">{{ render_interval }}s</span>
</div>
<div>
<span class="text-gray-500">Display Duration:</span>
<span class="font-medium ml-1">{{ display_duration }}s</span>
</div>
<div>
<span class="text-gray-500">Last Render:</span>
<span class="font-medium ml-1" id="starlark-last-render">{{ last_render_time }}</span>
</div>
</div>
</div>
<!-- Actions -->
<div class="flex gap-3">
<button onclick="forceRenderStarlarkApp('{{ app_id }}')"
class="btn bg-blue-600 hover:bg-blue-700 text-white px-4 py-2 rounded-md text-sm font-semibold">
<i class="fas fa-sync mr-2"></i>Force Render
</button>
<button onclick="toggleStarlarkApp('{{ app_id }}', {{ 'false' if app_enabled else 'true' }})"
class="btn {{ 'bg-red-600 hover:bg-red-700' if app_enabled else 'bg-green-600 hover:bg-green-700' }} text-white px-4 py-2 rounded-md text-sm font-semibold">
<i class="fas {{ 'fa-toggle-off' if app_enabled else 'fa-toggle-on' }} mr-2"></i>
{{ 'Disable' if app_enabled else 'Enable' }}
</button>
</div>
<!-- Configuration -->
<div class="bg-white rounded-lg p-4 border border-gray-200">
<h4 class="text-sm font-semibold text-gray-700 mb-3">Timing Settings</h4>
<div id="starlark-config-form" class="space-y-4">
<div class="grid grid-cols-1 md:grid-cols-2 gap-4">
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">Render Interval (seconds)</label>
<input type="number" min="10" max="86400" step="1"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ render_interval }}"
data-starlark-config="render_interval">
<p class="text-xs text-gray-400 mt-1">How often the app re-renders (fetches new data)</p>
</div>
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">Display Duration (seconds)</label>
<input type="number" min="1" max="3600" step="1"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ display_duration }}"
data-starlark-config="display_duration">
<p class="text-xs text-gray-400 mt-1">How long the app displays before rotating</p>
</div>
</div>
{# ── Schema-driven App Settings ── #}
{% set fields = [] %}
{% if schema %}
{% if schema.fields is defined %}
{% set fields = schema.fields %}
{% elif schema.schema is defined and schema.schema is iterable and schema.schema is not string %}
{% set fields = schema.schema %}
{% endif %}
{% endif %}
{% if fields %}
<hr class="border-gray-200 my-2">
<h4 class="text-sm font-semibold text-gray-700 mb-2">App Settings</h4>
{% for field in fields %}
{% if field.typeOf is defined and field.id is defined %}
{% set field_id = field.id %}
{% set field_type = field.typeOf %}
{% set field_name = field.name or field_id %}
{% set field_desc = field.desc or '' %}
{% set field_default = field.default if field.default is defined else '' %}
{% set current_val = config.get(field_id, field_default) if config else field_default %}
{# ── text ── #}
{% if field_type == 'text' %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<input type="text"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ current_val }}"
placeholder="{{ field_desc }}"
data-starlark-config="{{ field_id }}">
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{# ── dropdown ── #}
{% elif field_type == 'dropdown' %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<select class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm bg-white"
data-starlark-config="{{ field_id }}">
{% for opt in (field.options or []) %}
<option value="{{ opt.value }}" {{ 'selected' if current_val|string == opt.value|string else '' }}>
{{ opt.display }}
</option>
{% endfor %}
</select>
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{# ── toggle ── #}
{% elif field_type == 'toggle' %}
<div class="form-group">
<label class="flex items-center gap-2 text-sm font-medium text-gray-700 cursor-pointer">
<input type="checkbox"
class="w-4 h-4 rounded border-gray-300 text-blue-600 focus:ring-blue-500"
data-starlark-config="{{ field_id }}"
data-starlark-type="toggle"
{{ 'checked' if (current_val is sameas true or current_val|string|lower in ('true', '1', 'yes')) else '' }}>
{{ field_name }}
</label>
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1 ml-6">{{ field_desc }}</p>
{% endif %}
</div>
{# ── color ── #}
{% elif field_type == 'color' %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<div class="flex items-center gap-2">
<input type="color"
class="w-10 h-10 rounded border border-gray-300 cursor-pointer p-0.5"
value="{{ current_val or '#FFFFFF' }}"
data-starlark-color-picker="{{ field_id }}"
oninput="document.querySelector('[data-starlark-config={{ field_id }}]').value = this.value">
<input type="text"
class="form-control flex-1 px-3 py-2 border border-gray-300 rounded-md text-sm font-mono"
value="{{ current_val or '#FFFFFF' }}"
placeholder="#RRGGBB"
data-starlark-config="{{ field_id }}"
oninput="var cp = document.querySelector('[data-starlark-color-picker={{ field_id }}]'); if(this.value.match(/^#[0-9a-fA-F]{6}$/)) cp.value = this.value;">
</div>
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{# ── datetime ── #}
{% elif field_type == 'datetime' %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<input type="datetime-local"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ current_val }}"
data-starlark-config="{{ field_id }}">
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{# ── location (mini-form) ── #}
{% elif field_type == 'location' %}
<div class="form-group" data-starlark-location-group="{{ field_id }}" data-starlark-location-value="{{ current_val }}">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<div class="grid grid-cols-1 md:grid-cols-3 gap-2">
<div>
<input type="number" step="any" min="-90" max="90"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
placeholder="Latitude"
data-starlark-location-field="{{ field_id }}"
data-starlark-location-key="lat">
</div>
<div>
<input type="number" step="any" min="-180" max="180"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
placeholder="Longitude"
data-starlark-location-field="{{ field_id }}"
data-starlark-location-key="lng">
</div>
<div>
<input type="text"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
placeholder="Timezone (e.g. America/New_York)"
data-starlark-location-field="{{ field_id }}"
data-starlark-location-key="timezone">
</div>
</div>
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{# ── oauth2 (unsupported) ── #}
{% elif field_type == 'oauth2' %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<div class="bg-yellow-50 border border-yellow-200 rounded-md p-3 text-sm text-yellow-800" data-starlark-unsupported>
<i class="fas fa-exclamation-triangle mr-1"></i>
This app requires OAuth2 authentication, which is not supported in standalone mode.
</div>
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{# ── photo_select (unsupported) ── #}
{% elif field_type == 'photo_select' %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<div class="bg-yellow-50 border border-yellow-200 rounded-md p-3 text-sm text-yellow-800" data-starlark-unsupported>
<i class="fas fa-exclamation-triangle mr-1"></i>
Photo upload is not supported in this interface.
</div>
</div>
{# ── generated (hidden meta-field, skip) ── #}
{% elif field_type == 'generated' %}
{# Invisible — generated fields are handled server-side by Pixlet #}
{# ── typeahead / location_based (text fallback with note) ── #}
{% elif field_type in ('typeahead', 'location_based') %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<input type="text"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ current_val }}"
placeholder="{{ field_desc }}"
data-starlark-config="{{ field_id }}">
<p class="text-xs text-yellow-600 mt-1">
<i class="fas fa-info-circle mr-1"></i>
This field normally uses autocomplete which requires a Pixlet server. Enter the value manually.
</p>
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{# ── unknown type (text fallback) ── #}
{% else %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ field_name }}</label>
<input type="text"
class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ current_val }}"
placeholder="{{ field_desc }}"
data-starlark-config="{{ field_id }}">
{% if field_desc %}
<p class="text-xs text-gray-400 mt-1">{{ field_desc }}</p>
{% endif %}
</div>
{% endif %}
{% endif %}{# end field.typeOf and field.id check #}
{% endfor %}
{# Also show any config keys NOT in the schema (user-added or legacy) #}
{% if config %}
{% set schema_ids = [] %}
{% for f in fields %}
{% if f.id is defined %}
{% if schema_ids.append(f.id) %}{% endif %}
{% endif %}
{% endfor %}
{% for key, value in config.items() %}
{% if key not in ('render_interval', 'display_duration') and key not in schema_ids %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ key }} <span class="text-xs text-gray-400">(custom)</span></label>
<input type="text" class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ value }}"
data-starlark-config="{{ key }}">
</div>
{% endif %}
{% endfor %}
{% endif %}
{# ── No schema: fall back to raw config key/value pairs ── #}
{% elif config %}
<hr class="border-gray-200 my-2">
<h4 class="text-sm font-semibold text-gray-700 mb-2">App Settings</h4>
{% for key, value in config.items() %}
{% if key not in ('render_interval', 'display_duration') %}
<div class="form-group">
<label class="block text-sm font-medium text-gray-700 mb-1">{{ key }}</label>
<input type="text" class="form-control w-full px-3 py-2 border border-gray-300 rounded-md text-sm"
value="{{ value }}"
data-starlark-config="{{ key }}">
</div>
{% endif %}
{% endfor %}
{% endif %}
<button onclick="saveStarlarkConfig('{{ app_id }}')"
class="btn bg-blue-600 hover:bg-blue-700 text-white px-4 py-2 rounded-md text-sm font-semibold">
<i class="fas fa-save mr-2"></i>Save Configuration
</button>
</div>
</div>
</div>
<script>
function forceRenderStarlarkApp(appId) {
fetch('/api/v3/starlark/apps/' + encodeURIComponent(appId) + '/render', {method: 'POST'})
.then(function(r) { return r.json(); })
.then(function(data) {
if (data.status === 'success') {
if (typeof showNotification === 'function') {
showNotification('Rendered successfully! ' + (data.frame_count || 0) + ' frame(s)', 'success');
} else {
alert('Rendered successfully! ' + (data.frame_count || 0) + ' frame(s)');
}
} else {
var msg = 'Render failed: ' + (data.message || 'Unknown error');
if (typeof showNotification === 'function') showNotification(msg, 'error');
else alert(msg);
}
})
.catch(function(err) {
var msg = 'Render failed: ' + err.message;
if (typeof showNotification === 'function') showNotification(msg, 'error');
else alert(msg);
});
}
function toggleStarlarkApp(appId, enabled) {
fetch('/api/v3/starlark/apps/' + encodeURIComponent(appId) + '/toggle', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({enabled: enabled})
})
.then(function(r) { return r.json(); })
.then(function(data) {
if (data.status === 'success') {
if (typeof loadInstalledPlugins === 'function') loadInstalledPlugins();
else if (typeof window.loadInstalledPlugins === 'function') window.loadInstalledPlugins();
var container = document.getElementById('plugin-config-starlark:' + appId);
if (container && window.htmx) {
htmx.ajax('GET', '/v3/partials/plugin-config/starlark:' + encodeURIComponent(appId), {target: container, swap: 'innerHTML'});
}
} else {
var msg = 'Toggle failed: ' + (data.message || 'Unknown error');
if (typeof showNotification === 'function') showNotification(msg, 'error');
else alert(msg);
}
})
.catch(function(err) {
var msg = 'Toggle failed: ' + err.message;
if (typeof showNotification === 'function') showNotification(msg, 'error');
else alert(msg);
});
}
function saveStarlarkConfig(appId) {
var config = {};
// Collect standard inputs (text, number, select, datetime, color text companion)
document.querySelectorAll('[data-starlark-config]').forEach(function(input) {
var key = input.getAttribute('data-starlark-config');
var type = input.getAttribute('data-starlark-type');
if (key === 'render_interval' || key === 'display_duration') {
config[key] = parseInt(input.value, 10) || 0;
} else if (type === 'toggle') {
config[key] = input.checked ? 'true' : 'false';
} else {
config[key] = input.value;
}
});
// Collect location mini-form groups
document.querySelectorAll('[data-starlark-location-group]').forEach(function(group) {
var fieldId = group.getAttribute('data-starlark-location-group');
var loc = {};
group.querySelectorAll('[data-starlark-location-field="' + fieldId + '"]').forEach(function(sub) {
var locKey = sub.getAttribute('data-starlark-location-key');
if (sub.value) loc[locKey] = sub.value;
});
if (Object.keys(loc).length > 0) {
config[fieldId] = JSON.stringify(loc);
}
});
fetch('/api/v3/starlark/apps/' + encodeURIComponent(appId) + '/config', {
method: 'PUT',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(config)
})
.then(function(r) { return r.json(); })
.then(function(data) {
if (data.status === 'success') {
if (typeof showNotification === 'function') showNotification('Configuration saved!', 'success');
else alert('Configuration saved!');
// Reload partial to reflect updated status
var container = document.getElementById('plugin-config-starlark:' + appId);
if (container && window.htmx) {
htmx.ajax('GET', '/v3/partials/plugin-config/starlark:' + encodeURIComponent(appId), {target: container, swap: 'innerHTML'});
}
} else {
var msg = 'Save failed: ' + (data.message || 'Unknown error');
if (typeof showNotification === 'function') showNotification(msg, 'error');
else alert(msg);
}
})
.catch(function(err) {
var msg = 'Save failed: ' + err.message;
if (typeof showNotification === 'function') showNotification(msg, 'error');
else alert(msg);
});
}
// Pre-fill location fields from stored JSON config values
(function() {
document.querySelectorAll('[data-starlark-location-group]').forEach(function(group) {
var fieldId = group.getAttribute('data-starlark-location-group');
// Find the hidden or stored value — look for a data attribute with the raw JSON
var rawVal = group.getAttribute('data-starlark-location-value');
if (!rawVal) return;
try {
var loc = JSON.parse(rawVal);
group.querySelectorAll('[data-starlark-location-field="' + fieldId + '"]').forEach(function(sub) {
var locKey = sub.getAttribute('data-starlark-location-key');
if (loc[locKey] !== undefined) sub.value = loc[locKey];
});
} catch(e) { /* not valid JSON, ignore */ }
});
})();
</script>