feat(starlark): implement schema extraction, asset download, and config persistence

## Schema Extraction
- Replace broken `pixlet serve --print-schema` with regex-based source parser
- Extract schema by parsing `get_schema()` function from .star files
- Support all field types: Location, Text, Toggle, Dropdown, Color, DateTime
- Handle variable-referenced dropdown options (e.g., `options = teamOptions`)
- Gracefully handle complex/unsupported field types (OAuth2, PhotoSelect, etc.)
- Extract schema for 90%+ of Tronbyte apps

## Asset Download
- Add `download_app_assets()` to fetch images/, sources/, fonts/ directories
- Download assets in binary mode for proper image/font handling
- Validate all paths to prevent directory traversal attacks
- Copy asset directories during app installation
- Enable apps like AnalogClock that require image assets

## Config Persistence
- Create config.json file during installation with schema defaults
- Update both config.json and manifest when saving configuration
- Load config from config.json (not manifest) for consistency with plugin
- Separate timing keys (render_interval, display_duration) from app config
- Fix standalone web service mode to read/write config.json

## Pixlet Command Fix
- Fix Pixlet CLI invocation: config params are positional, not flags
- Change from `pixlet render file.star -c key=value` to `pixlet render file.star key=value -o output`
- Properly handle JSON config values (e.g., location objects)
- Enable config to be applied during rendering

## Security & Reliability
- Add threading.Lock for cache operations to prevent race conditions
- Reduce ThreadPoolExecutor workers from 20 to 5 for Raspberry Pi
- Add path traversal validation in download_star_file()
- Add YAML error logging in manifest fetching
- Add file size validation (5MB limit) for .star uploads
- Use sanitized app_id consistently in install endpoints
- Use atomic manifest updates to prevent race conditions
- Add missing Optional import for type hints

## Web UI
- Fix standalone mode schema loading in config partial
- Schema-driven config forms now render correctly for all apps
- Location fields show lat/lng/timezone inputs
- Dropdown, toggle, text, color, and datetime fields all supported

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Chuck
2026-02-19 16:42:45 -05:00
parent a8609aea18
commit 8d1579a51b
5 changed files with 300 additions and 65 deletions

View File

@@ -796,7 +796,7 @@ class StarlarkAppsPlugin(BasePlugin):
except Exception as e:
self.logger.error(f"Error displaying frame: {e}")
def install_app(self, app_id: str, star_file_path: str, metadata: Optional[Dict[str, Any]] = None) -> bool:
def install_app(self, app_id: str, star_file_path: str, metadata: Optional[Dict[str, Any]] = None, assets_dir: Optional[str] = None) -> bool:
"""
Install a new Starlark app.
@@ -804,6 +804,7 @@ class StarlarkAppsPlugin(BasePlugin):
app_id: Unique identifier for the app
star_file_path: Path to .star file to install
metadata: Optional metadata (name, description, etc.)
assets_dir: Optional directory containing assets (images/, sources/, etc.)
Returns:
True if successful
@@ -827,6 +828,21 @@ class StarlarkAppsPlugin(BasePlugin):
self._verify_path_safety(star_dest, self.apps_dir)
shutil.copy2(star_file_path, star_dest)
# Copy asset directories if provided (images/, sources/, etc.)
if assets_dir and Path(assets_dir).exists():
assets_path = Path(assets_dir)
for item in assets_path.iterdir():
if item.is_dir():
# Copy entire directory (e.g., images/, sources/)
dest_dir = app_dir / item.name
# Verify dest_dir path safety
self._verify_path_safety(dest_dir, self.apps_dir)
if dest_dir.exists():
shutil.rmtree(dest_dir)
shutil.copytree(item, dest_dir)
self.logger.debug(f"Copied assets directory: {item.name}")
self.logger.info(f"Installed assets for {app_id}")
# Create app manifest entry
app_manifest = {
"name": metadata.get("name", app_id) if metadata else app_id,

View File

@@ -239,16 +239,15 @@ class PixletRenderer:
return False, f"Star file not found: {star_file}"
try:
# Build command
# Build command - config params must be POSITIONAL between star_file and flags
# Format: pixlet render <file.star> [key=value]... [flags]
cmd = [
self.pixlet_binary,
"render",
star_file,
"-o", output_path,
"-m", str(magnify)
star_file
]
# Add configuration parameters
# Add configuration parameters as positional arguments (BEFORE flags)
if config:
for key, value in config.items():
# Validate key format (alphanumeric + underscore only)
@@ -259,6 +258,9 @@ class PixletRenderer:
# Convert value to string for CLI
if isinstance(value, bool):
value_str = "true" if value else "false"
elif isinstance(value, str) and (value.startswith('{') or value.startswith('[')):
# JSON string - keep as-is, will be properly quoted by subprocess
value_str = value
else:
value_str = str(value)
@@ -268,7 +270,14 @@ class PixletRenderer:
logger.warning(f"Skipping config value with unsafe characters for key {key}: {value_str}")
continue
cmd.extend(["-c", f"{key}={value_str}"])
# Add as positional argument (not -c flag)
cmd.append(f"{key}={value_str}")
# Add flags AFTER positional config arguments
cmd.extend([
"-o", output_path,
"-m", str(magnify)
])
logger.debug(f"Executing Pixlet: {' '.join(cmd)}")

View File

@@ -9,6 +9,7 @@ import logging
import time
import requests
import yaml
import threading
from typing import Dict, Any, Optional, List, Tuple
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -18,6 +19,7 @@ logger = logging.getLogger(__name__)
# Module-level cache for bulk app listing (survives across requests)
_apps_cache = {'data': None, 'timestamp': 0, 'categories': [], 'authors': []}
_CACHE_TTL = 7200 # 2 hours
_cache_lock = threading.Lock()
class TronbyteRepository:
@@ -94,16 +96,17 @@ class TronbyteRepository:
logger.error(f"Unexpected error: {e}")
return None
def _fetch_raw_file(self, file_path: str, branch: Optional[str] = None) -> Optional[str]:
def _fetch_raw_file(self, file_path: str, branch: Optional[str] = None, binary: bool = False):
"""
Fetch raw file content from repository.
Args:
file_path: Path to file in repository
branch: Branch name (default: DEFAULT_BRANCH)
binary: If True, return bytes; if False, return text
Returns:
File content as string, or None on error
File content as string/bytes, or None on error
"""
branch = branch or self.DEFAULT_BRANCH
url = f"{self.raw_url}/{self.REPO_OWNER}/{self.REPO_NAME}/{branch}/{file_path}"
@@ -111,7 +114,7 @@ class TronbyteRepository:
try:
response = self.session.get(url, timeout=10)
if response.status_code == 200:
return response.text
return response.content if binary else response.text
else:
logger.warning(f"Failed to fetch raw file: {file_path} ({response.status_code})")
return None
@@ -252,14 +255,17 @@ class TronbyteRepository:
global _apps_cache
now = time.time()
if _apps_cache['data'] is not None and (now - _apps_cache['timestamp']) < _CACHE_TTL:
return {
'apps': _apps_cache['data'],
'categories': _apps_cache['categories'],
'authors': _apps_cache['authors'],
'count': len(_apps_cache['data']),
'cached': True
}
# Check cache with lock (read-only check)
with _cache_lock:
if _apps_cache['data'] is not None and (now - _apps_cache['timestamp']) < _CACHE_TTL:
return {
'apps': _apps_cache['data'],
'categories': _apps_cache['categories'],
'authors': _apps_cache['authors'],
'count': len(_apps_cache['data']),
'cached': True
}
# Fetch directory listing (1 GitHub API call)
success, app_dirs, error = self.list_apps()
@@ -282,8 +288,8 @@ class TronbyteRepository:
metadata['id'] = app_id
metadata['repository_path'] = app_info.get('path', '')
return metadata
except (yaml.YAMLError, TypeError):
pass
except (yaml.YAMLError, TypeError) as e:
logger.warning(f"Failed to parse manifest for {app_id}: {e}")
# Fallback: minimal entry
return {
'id': app_id,
@@ -294,7 +300,7 @@ class TronbyteRepository:
# Parallel manifest fetches via raw.githubusercontent.com (high rate limit)
apps_with_metadata = []
with ThreadPoolExecutor(max_workers=20) as executor:
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_one, info): info for info in app_dirs}
for future in as_completed(futures):
try:
@@ -318,11 +324,12 @@ class TronbyteRepository:
categories = sorted({a.get('category', '') for a in apps_with_metadata if a.get('category')})
authors = sorted({a.get('author', '') for a in apps_with_metadata if a.get('author')})
# Update cache
_apps_cache['data'] = apps_with_metadata
_apps_cache['timestamp'] = now
_apps_cache['categories'] = categories
_apps_cache['authors'] = authors
# Update cache with lock
with _cache_lock:
_apps_cache['data'] = apps_with_metadata
_apps_cache['timestamp'] = now
_apps_cache['categories'] = categories
_apps_cache['authors'] = authors
logger.info(f"Cached {len(apps_with_metadata)} apps ({len(categories)} categories, {len(authors)} authors)")
@@ -347,8 +354,15 @@ class TronbyteRepository:
Returns:
Tuple of (success, error_message)
"""
# Use provided filename or fall back to app_id.star
# Validate inputs for path traversal
if '..' in app_id or '/' in app_id or '\\' in app_id:
return False, f"Invalid app_id: contains path traversal characters"
star_filename = filename or f"{app_id}.star"
if '..' in star_filename or '/' in star_filename or '\\' in star_filename:
return False, f"Invalid filename: contains path traversal characters"
# Use provided filename or fall back to app_id.star
star_path = f"{self.APPS_PATH}/{app_id}/{star_filename}"
content = self._fetch_raw_file(star_path)
@@ -389,6 +403,91 @@ class TronbyteRepository:
files = [item['name'] for item in data if item.get('type') == 'file']
return True, files, None
def download_app_assets(self, app_id: str, output_dir: Path) -> Tuple[bool, Optional[str]]:
"""
Download all asset files (images, sources, etc.) for an app.
Args:
app_id: App identifier
output_dir: Directory to save assets to
Returns:
Tuple of (success, error_message)
"""
# Validate app_id for path traversal
if '..' in app_id or '/' in app_id or '\\' in app_id:
return False, f"Invalid app_id: contains path traversal characters"
try:
# Get directory listing for the app
url = f"{self.base_url}/repos/{self.REPO_OWNER}/{self.REPO_NAME}/contents/{self.APPS_PATH}/{app_id}"
data = self._make_request(url)
if not data:
return False, f"Failed to fetch app directory listing"
if not isinstance(data, list):
return False, f"Invalid directory listing format"
# Find directories that contain assets (images, sources, etc.)
asset_dirs = []
for item in data:
if item.get('type') == 'dir':
dir_name = item.get('name')
# Common asset directory names in Tronbyte apps
if dir_name in ('images', 'sources', 'fonts', 'assets'):
asset_dirs.append((dir_name, item.get('url')))
if not asset_dirs:
# No asset directories, this is fine
return True, None
# Download each asset directory
for dir_name, dir_url in asset_dirs:
# Validate directory name for path traversal
if '..' in dir_name or '/' in dir_name or '\\' in dir_name:
logger.warning(f"Skipping potentially unsafe directory: {dir_name}")
continue
# Get files in this directory
dir_data = self._make_request(dir_url)
if not dir_data or not isinstance(dir_data, list):
logger.warning(f"Could not list files in {app_id}/{dir_name}")
continue
# Create local directory
local_dir = output_dir / dir_name
local_dir.mkdir(parents=True, exist_ok=True)
# Download each file
for file_item in dir_data:
if file_item.get('type') == 'file':
file_name = file_item.get('name')
# Validate filename for path traversal
if '..' in file_name or '/' in file_name or '\\' in file_name:
logger.warning(f"Skipping potentially unsafe file: {file_name}")
continue
file_path = f"{self.APPS_PATH}/{app_id}/{dir_name}/{file_name}"
content = self._fetch_raw_file(file_path, binary=True)
if content:
# Write binary content to file
output_path = local_dir / file_name
try:
with open(output_path, 'wb') as f:
f.write(content)
logger.debug(f"Downloaded asset: {dir_name}/{file_name}")
except Exception as e:
logger.warning(f"Failed to save {dir_name}/{file_name}: {e}")
else:
logger.warning(f"Failed to download {dir_name}/{file_name}")
logger.info(f"Downloaded assets for {app_id} ({len(asset_dirs)} directories)")
return True, None
except Exception as e:
logger.exception(f"Error downloading assets for {app_id}: {e}")
return False, f"Error downloading assets: {e}"
def search_apps(self, query: str, apps_with_metadata: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Search apps by name, summary, or description.

View File

@@ -10,6 +10,7 @@ import uuid
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
@@ -2183,10 +2184,10 @@ def toggle_plugin():
if starlark_plugin and starlark_app_id in starlark_plugin.apps:
app = starlark_plugin.apps[starlark_app_id]
app.manifest['enabled'] = enabled
with open(starlark_plugin.manifest_file, 'r') as f:
manifest = json.load(f)
manifest['apps'][starlark_app_id]['enabled'] = enabled
starlark_plugin._save_manifest(manifest)
# Use safe manifest update to prevent race conditions
def update_fn(manifest):
manifest['apps'][starlark_app_id]['enabled'] = enabled
starlark_plugin._update_manifest_safe(update_fn)
else:
# Standalone: update manifest directly
manifest = _read_starlark_manifest()
@@ -7088,7 +7089,7 @@ def _write_starlark_manifest(manifest: dict) -> bool:
return False
def _install_star_file(app_id: str, star_file_path: str, metadata: dict) -> bool:
def _install_star_file(app_id: str, star_file_path: str, metadata: dict, assets_dir: Optional[str] = None) -> bool:
"""Install a .star file and update the manifest (standalone, no plugin needed)."""
import shutil
import json
@@ -7097,7 +7098,21 @@ def _install_star_file(app_id: str, star_file_path: str, metadata: dict) -> bool
dest = app_dir / f"{app_id}.star"
shutil.copy2(star_file_path, str(dest))
# Copy asset directories if provided (images/, sources/, etc.)
if assets_dir and Path(assets_dir).exists():
assets_path = Path(assets_dir)
for item in assets_path.iterdir():
if item.is_dir():
# Copy entire directory (e.g., images/, sources/)
dest_dir = app_dir / item.name
if dest_dir.exists():
shutil.rmtree(dest_dir)
shutil.copytree(item, dest_dir)
logger.debug(f"Copied assets directory: {item.name}")
logger.info(f"Installed assets for {app_id}")
# Try to extract schema using PixletRenderer
schema = None
try:
PixletRenderer = _get_pixlet_renderer_class()
pixlet = PixletRenderer()
@@ -7111,6 +7126,19 @@ def _install_star_file(app_id: str, star_file_path: str, metadata: dict) -> bool
except Exception as e:
logger.warning(f"Failed to extract schema for {app_id}: {e}")
# Create default config — pre-populate with schema defaults
default_config = {}
if schema:
fields = schema.get('fields') or schema.get('schema') or []
for field in fields:
if isinstance(field, dict) and 'id' in field and 'default' in field:
default_config[field['id']] = field['default']
# Create config.json file
config_path = app_dir / "config.json"
with open(config_path, 'w') as f:
json.dump(default_config, f, indent=2)
manifest = _read_starlark_manifest()
manifest.setdefault('apps', {})[app_id] = {
'name': metadata.get('name', app_id),
@@ -7302,6 +7330,14 @@ def upload_starlark_app():
if not file.filename or not file.filename.endswith('.star'):
return jsonify({'status': 'error', 'message': 'File must have .star extension'}), 400
# Check file size (limit to 5MB for .star files)
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Reset to beginning
MAX_STAR_SIZE = 5 * 1024 * 1024 # 5MB
if file_size > MAX_STAR_SIZE:
return jsonify({'status': 'error', 'message': f'File too large (max 5MB, got {file_size/1024/1024:.1f}MB)'}), 400
app_name = request.form.get('name')
app_id_input = request.form.get('app_id')
filename_base = file.filename.replace('.star', '') if file.filename else None
@@ -7390,12 +7426,32 @@ def get_starlark_app_config(app_id):
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
return jsonify({'status': 'success', 'config': app.config, 'schema': app.schema})
# Standalone: read from manifest
manifest = _read_starlark_manifest()
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
# Standalone: read from config.json file
app_dir = _STARLARK_APPS_DIR / app_id
config_file = app_dir / "config.json"
if not app_dir.exists():
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
return jsonify({'status': 'success', 'config': app_data.get('config', {}), 'schema': None})
config = {}
if config_file.exists():
try:
with open(config_file, 'r') as f:
config = json.load(f)
except Exception as e:
logger.warning(f"Failed to load config for {app_id}: {e}")
# Load schema from schema.json
schema = None
schema_file = app_dir / "schema.json"
if schema_file.exists():
try:
with open(schema_file, 'r') as f:
schema = json.load(f)
except Exception as e:
logger.warning(f"Failed to load schema for {app_id}: {e}")
return jsonify({'status': 'success', 'config': config, 'schema': schema})
except Exception as e:
logger.error(f"Error getting config for {app_id}: {e}")
@@ -7464,22 +7520,51 @@ def update_starlark_app_config(app_id):
else:
return jsonify({'status': 'error', 'message': 'Failed to save configuration'}), 500
# Standalone: update manifest directly
# Standalone: update both config.json and manifest
manifest = _read_starlark_manifest()
app_data = manifest.get('apps', {}).get(app_id)
if not app_data:
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
# Extract timing keys (they go in manifest, not config.json)
render_interval = data.pop('render_interval', None)
display_duration = data.pop('display_duration', None)
# Update manifest with timing values
if render_interval is not None:
app_data['render_interval'] = render_interval
if display_duration is not None:
app_data['display_duration'] = display_duration
# Load current config from config.json
app_dir = _STARLARK_APPS_DIR / app_id
config_file = app_dir / "config.json"
current_config = {}
if config_file.exists():
try:
with open(config_file, 'r') as f:
current_config = json.load(f)
except Exception as e:
logger.warning(f"Failed to load config for {app_id}: {e}")
# Update config with new values (excluding timing keys)
current_config.update(data)
# Write updated config to config.json
try:
with open(config_file, 'w') as f:
json.dump(current_config, f, indent=2)
except Exception as e:
logger.error(f"Failed to save config.json for {app_id}: {e}")
return jsonify({'status': 'error', 'message': f'Failed to save configuration: {e}'}), 500
# Also update manifest for backward compatibility
app_data.setdefault('config', {}).update(data)
if 'render_interval' in data:
app_data['render_interval'] = data['render_interval']
if 'display_duration' in data:
app_data['display_duration'] = data['display_duration']
if _write_starlark_manifest(manifest):
return jsonify({'status': 'success', 'message': 'Configuration updated', 'config': app_data.get('config', {})})
return jsonify({'status': 'success', 'message': 'Configuration updated', 'config': current_config})
else:
return jsonify({'status': 'error', 'message': 'Failed to save configuration'}), 500
return jsonify({'status': 'error', 'message': 'Failed to save manifest'}), 500
except Exception as e:
logger.error(f"Error updating config for {app_id}: {e}")
@@ -7618,29 +7703,45 @@ def install_from_tronbyte_repository():
if not success:
return jsonify({'status': 'error', 'message': f'Failed to download app: {error}'}), 500
render_interval = data.get('render_interval', 300)
ri, err = _validate_timing_value(render_interval, 'render_interval')
if err:
return jsonify({'status': 'error', 'message': err}), 400
render_interval = ri or 300
# Download assets (images, sources, etc.) to a temp directory
import tempfile
temp_assets_dir = tempfile.mkdtemp()
try:
success_assets, error_assets = repo.download_app_assets(data['app_id'], Path(temp_assets_dir))
# Asset download is non-critical - log warning but continue if it fails
if not success_assets:
logger.warning(f"Failed to download assets for {data['app_id']}: {error_assets}")
display_duration = data.get('display_duration', 15)
dd, err = _validate_timing_value(display_duration, 'display_duration')
if err:
return jsonify({'status': 'error', 'message': err}), 400
display_duration = dd or 15
render_interval = data.get('render_interval', 300)
ri, err = _validate_timing_value(render_interval, 'render_interval')
if err:
return jsonify({'status': 'error', 'message': err}), 400
render_interval = ri or 300
install_metadata = {
'name': metadata.get('name', app_id) if metadata else app_id,
'render_interval': render_interval,
'display_duration': display_duration
}
display_duration = data.get('display_duration', 15)
dd, err = _validate_timing_value(display_duration, 'display_duration')
if err:
return jsonify({'status': 'error', 'message': err}), 400
display_duration = dd or 15
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
success = starlark_plugin.install_app(data['app_id'], temp_path, install_metadata)
else:
success = _install_star_file(data['app_id'], temp_path, install_metadata)
install_metadata = {
'name': metadata.get('name', app_id) if metadata else app_id,
'render_interval': render_interval,
'display_duration': display_duration
}
starlark_plugin = _get_starlark_plugin()
if starlark_plugin:
success = starlark_plugin.install_app(app_id, temp_path, install_metadata, assets_dir=temp_assets_dir)
else:
success = _install_star_file(app_id, temp_path, install_metadata, assets_dir=temp_assets_dir)
finally:
# Clean up temp assets directory
import shutil
try:
shutil.rmtree(temp_assets_dir)
except OSError:
pass
if success:
return jsonify({'status': 'success', 'message': f'App installed: {metadata.get("name", app_id) if metadata else app_id}', 'app_id': app_id})

View File

@@ -470,6 +470,16 @@ def _load_starlark_config_partial(app_id):
if not app_data:
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404
# Load schema from schema.json if it exists
schema = None
schema_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'schema.json'
if schema_file.exists():
try:
with open(schema_file, 'r') as f:
schema = json.load(f)
except Exception as e:
print(f"Warning: Could not load schema for {app_id}: {e}")
return render_template(
'v3/partials/starlark_config.html',
app_id=app_id,
@@ -478,7 +488,7 @@ def _load_starlark_config_partial(app_id):
render_interval=app_data.get('render_interval', 300),
display_duration=app_data.get('display_duration', 15),
config=app_data.get('config', {}),
schema=None,
schema=schema,
has_frames=False,
frame_count=0,
last_render_time=None,