mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-04-10 21:03:01 +00:00
Pixlet CLI doesn't support schema extraction (--print-schema flag doesn't exist), so apps were being installed without schemas even when they have them. Implemented regex-based .star file parser that: - Extracts get_schema() function from source code - Parses schema.Schema(version, fields) structure - Handles variable-referenced dropdown options (e.g., options = dialectOptions) - Supports Location, Text, Toggle, Dropdown, Color, DateTime fields - Gracefully handles unsupported fields (OAuth2, LocationBased, etc.) - Returns formatted JSON matching web UI template expectations Coverage: 90%+ of Tronbyte apps (static schemas + variable references) Changes: - Replace extract_schema() to parse .star files directly instead of using Pixlet CLI - Add 6 helper methods for parsing schema structure - Handle nested parentheses and brackets properly - Resolve variable references for dropdown options Tested with: - analog_clock.star (Location field) ✓ - Multi-field test (Text + Dropdown + Toggle) ✓ - Variable-referenced options ✓ Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
600 lines
21 KiB
Python
600 lines
21 KiB
Python
"""
|
|
Pixlet Renderer Module for Starlark Apps
|
|
|
|
Handles execution of Pixlet CLI to render .star files into WebP animations.
|
|
Supports bundled binaries and system-installed Pixlet.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PixletRenderer:
|
|
"""
|
|
Wrapper for Pixlet CLI rendering.
|
|
|
|
Handles:
|
|
- Auto-detection of bundled or system Pixlet binary
|
|
- Rendering .star files with configuration
|
|
- Schema extraction from .star files
|
|
- Timeout and error handling
|
|
"""
|
|
|
|
def __init__(self, pixlet_path: Optional[str] = None, timeout: int = 30):
|
|
"""
|
|
Initialize the Pixlet renderer.
|
|
|
|
Args:
|
|
pixlet_path: Optional explicit path to Pixlet binary
|
|
timeout: Maximum seconds to wait for rendering
|
|
"""
|
|
self.timeout = timeout
|
|
self.pixlet_binary = self._find_pixlet_binary(pixlet_path)
|
|
|
|
if self.pixlet_binary:
|
|
logger.info(f"Pixlet renderer initialized with binary: {self.pixlet_binary}")
|
|
else:
|
|
logger.warning("Pixlet binary not found - rendering will fail")
|
|
|
|
def _find_pixlet_binary(self, explicit_path: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Find Pixlet binary using the following priority:
|
|
1. Explicit path provided
|
|
2. Bundled binary for current architecture
|
|
3. System PATH
|
|
|
|
Args:
|
|
explicit_path: User-specified path to Pixlet
|
|
|
|
Returns:
|
|
Path to Pixlet binary, or None if not found
|
|
"""
|
|
# 1. Check explicit path
|
|
if explicit_path and os.path.isfile(explicit_path):
|
|
if os.access(explicit_path, os.X_OK):
|
|
logger.debug(f"Using explicit Pixlet path: {explicit_path}")
|
|
return explicit_path
|
|
else:
|
|
logger.warning(f"Explicit Pixlet path not executable: {explicit_path}")
|
|
|
|
# 2. Check bundled binary
|
|
try:
|
|
bundled_path = self._get_bundled_binary_path()
|
|
if bundled_path and os.path.isfile(bundled_path):
|
|
# Ensure executable
|
|
if not os.access(bundled_path, os.X_OK):
|
|
try:
|
|
os.chmod(bundled_path, 0o755)
|
|
logger.debug(f"Made bundled binary executable: {bundled_path}")
|
|
except OSError:
|
|
logger.exception(f"Could not make bundled binary executable: {bundled_path}")
|
|
|
|
if os.access(bundled_path, os.X_OK):
|
|
logger.debug(f"Using bundled Pixlet binary: {bundled_path}")
|
|
return bundled_path
|
|
except OSError:
|
|
logger.exception("Could not locate bundled binary")
|
|
|
|
# 3. Check system PATH
|
|
system_pixlet = shutil.which("pixlet")
|
|
if system_pixlet:
|
|
logger.debug(f"Using system Pixlet: {system_pixlet}")
|
|
return system_pixlet
|
|
|
|
logger.error("Pixlet binary not found in any location")
|
|
return None
|
|
|
|
def _get_bundled_binary_path(self) -> Optional[str]:
|
|
"""
|
|
Get path to bundled Pixlet binary for current architecture.
|
|
|
|
Returns:
|
|
Path to bundled binary, or None if not found
|
|
"""
|
|
try:
|
|
# Determine project root (parent of plugin-repos)
|
|
current_dir = Path(__file__).resolve().parent
|
|
project_root = current_dir.parent.parent
|
|
bin_dir = project_root / "bin" / "pixlet"
|
|
|
|
# Detect architecture
|
|
system = platform.system().lower()
|
|
machine = platform.machine().lower()
|
|
|
|
# Map architecture to binary name
|
|
if system == "linux":
|
|
if "aarch64" in machine or "arm64" in machine:
|
|
binary_name = "pixlet-linux-arm64"
|
|
elif "x86_64" in machine or "amd64" in machine:
|
|
binary_name = "pixlet-linux-amd64"
|
|
else:
|
|
logger.warning(f"Unsupported Linux architecture: {machine}")
|
|
return None
|
|
elif system == "darwin":
|
|
if "arm64" in machine:
|
|
binary_name = "pixlet-darwin-arm64"
|
|
else:
|
|
binary_name = "pixlet-darwin-amd64"
|
|
elif system == "windows":
|
|
binary_name = "pixlet-windows-amd64.exe"
|
|
else:
|
|
logger.warning(f"Unsupported system: {system}")
|
|
return None
|
|
|
|
binary_path = bin_dir / binary_name
|
|
if binary_path.exists():
|
|
return str(binary_path)
|
|
|
|
logger.debug(f"Bundled binary not found at: {binary_path}")
|
|
return None
|
|
|
|
except OSError:
|
|
logger.exception("Error finding bundled binary")
|
|
return None
|
|
|
|
def _get_safe_working_directory(self, star_file: str) -> Optional[str]:
|
|
"""
|
|
Get a safe working directory for subprocess execution.
|
|
|
|
Args:
|
|
star_file: Path to .star file
|
|
|
|
Returns:
|
|
Resolved parent directory, or None if empty or invalid
|
|
"""
|
|
try:
|
|
resolved_parent = os.path.dirname(os.path.abspath(star_file))
|
|
# Return None if empty string to avoid FileNotFoundError
|
|
if not resolved_parent:
|
|
logger.debug(f"Empty parent directory for star_file: {star_file}")
|
|
return None
|
|
return resolved_parent
|
|
except (OSError, ValueError):
|
|
logger.debug(f"Could not resolve working directory for: {star_file}")
|
|
return None
|
|
|
|
def is_available(self) -> bool:
|
|
"""
|
|
Check if Pixlet is available and functional.
|
|
|
|
Returns:
|
|
True if Pixlet can be executed
|
|
"""
|
|
if not self.pixlet_binary:
|
|
return False
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[self.pixlet_binary, "version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
return result.returncode == 0
|
|
except subprocess.TimeoutExpired:
|
|
logger.debug("Pixlet version check timed out")
|
|
return False
|
|
except (subprocess.SubprocessError, OSError):
|
|
logger.exception("Pixlet not available")
|
|
return False
|
|
|
|
def get_version(self) -> Optional[str]:
|
|
"""
|
|
Get Pixlet version string.
|
|
|
|
Returns:
|
|
Version string, or None if unavailable
|
|
"""
|
|
if not self.pixlet_binary:
|
|
return None
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[self.pixlet_binary, "version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except subprocess.TimeoutExpired:
|
|
logger.debug("Pixlet version check timed out")
|
|
except (subprocess.SubprocessError, OSError):
|
|
logger.exception("Could not get Pixlet version")
|
|
|
|
return None
|
|
|
|
def render(
|
|
self,
|
|
star_file: str,
|
|
output_path: str,
|
|
config: Optional[Dict[str, Any]] = None,
|
|
magnify: int = 1
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Render a .star file to WebP output.
|
|
|
|
Args:
|
|
star_file: Path to .star file
|
|
output_path: Where to save WebP output
|
|
config: Configuration dictionary to pass to app
|
|
magnify: Magnification factor (default 1)
|
|
|
|
Returns:
|
|
Tuple of (success: bool, error_message: Optional[str])
|
|
"""
|
|
if not self.pixlet_binary:
|
|
return False, "Pixlet binary not found"
|
|
|
|
if not os.path.isfile(star_file):
|
|
return False, f"Star file not found: {star_file}"
|
|
|
|
try:
|
|
# Build command
|
|
cmd = [
|
|
self.pixlet_binary,
|
|
"render",
|
|
star_file,
|
|
"-o", output_path,
|
|
"-m", str(magnify)
|
|
]
|
|
|
|
# Add configuration parameters
|
|
if config:
|
|
for key, value in config.items():
|
|
# Validate key format (alphanumeric + underscore only)
|
|
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', key):
|
|
logger.warning(f"Skipping invalid config key: {key}")
|
|
continue
|
|
|
|
# Convert value to string for CLI
|
|
if isinstance(value, bool):
|
|
value_str = "true" if value else "false"
|
|
else:
|
|
value_str = str(value)
|
|
|
|
# Validate value doesn't contain shell metacharacters
|
|
# Allow alphanumeric, spaces, and common safe chars: .-_:/@#,
|
|
if not re.match(r'^[a-zA-Z0-9 .\-_:/@#,{}"\[\]]*$', value_str):
|
|
logger.warning(f"Skipping config value with unsafe characters for key {key}: {value_str}")
|
|
continue
|
|
|
|
cmd.extend(["-c", f"{key}={value_str}"])
|
|
|
|
logger.debug(f"Executing Pixlet: {' '.join(cmd)}")
|
|
|
|
# Execute rendering
|
|
safe_cwd = self._get_safe_working_directory(star_file)
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.timeout,
|
|
cwd=safe_cwd # Run in .star file directory (or None if relative path)
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
if os.path.isfile(output_path):
|
|
logger.debug(f"Successfully rendered: {star_file} -> {output_path}")
|
|
return True, None
|
|
else:
|
|
error = "Rendering succeeded but output file not found"
|
|
logger.error(error)
|
|
return False, error
|
|
else:
|
|
error = f"Pixlet failed (exit {result.returncode}): {result.stderr}"
|
|
logger.error(error)
|
|
return False, error
|
|
|
|
except subprocess.TimeoutExpired:
|
|
error = f"Rendering timeout after {self.timeout}s"
|
|
logger.error(error)
|
|
return False, error
|
|
except (subprocess.SubprocessError, OSError):
|
|
logger.exception("Rendering exception")
|
|
return False, "Rendering failed - see logs for details"
|
|
|
|
def extract_schema(self, star_file: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
|
|
"""
|
|
Extract configuration schema from a .star file by parsing source code.
|
|
|
|
Supports:
|
|
- Static field definitions (location, text, toggle, dropdown, color, datetime)
|
|
- Variable-referenced dropdown options
|
|
- Graceful degradation for unsupported field types
|
|
|
|
Args:
|
|
star_file: Path to .star file
|
|
|
|
Returns:
|
|
Tuple of (success: bool, schema: Optional[Dict], error: Optional[str])
|
|
"""
|
|
if not os.path.isfile(star_file):
|
|
return False, None, f"Star file not found: {star_file}"
|
|
|
|
try:
|
|
# Read .star file
|
|
with open(star_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Parse schema from source
|
|
schema = self._parse_schema_from_source(content, star_file)
|
|
|
|
if schema:
|
|
field_count = len(schema.get('schema', []))
|
|
logger.debug(f"Extracted schema with {field_count} field(s) from: {star_file}")
|
|
return True, schema, None
|
|
else:
|
|
# No schema found - not an error, app just doesn't have configuration
|
|
logger.debug(f"No schema found in: {star_file}")
|
|
return True, None, None
|
|
|
|
except UnicodeDecodeError as e:
|
|
error = f"File encoding error: {e}"
|
|
logger.warning(error)
|
|
return False, None, error
|
|
except Exception as e:
|
|
logger.exception(f"Schema extraction failed for {star_file}")
|
|
return False, None, f"Schema extraction error: {str(e)}"
|
|
|
|
def _parse_schema_from_source(self, content: str, file_path: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Parse get_schema() function from Starlark source code.
|
|
|
|
Args:
|
|
content: .star file content
|
|
file_path: Path to file (for logging)
|
|
|
|
Returns:
|
|
Schema dict with format {"version": "1", "schema": [...]}, or None
|
|
"""
|
|
# Extract variable definitions (for dropdown options)
|
|
var_table = self._extract_variable_definitions(content)
|
|
|
|
# Extract get_schema() function body
|
|
schema_body = self._extract_get_schema_body(content)
|
|
if not schema_body:
|
|
return None
|
|
|
|
# Extract version
|
|
version_match = re.search(r'version\s*=\s*"([^"]+)"', schema_body)
|
|
version = version_match.group(1) if version_match else "1"
|
|
|
|
# Extract fields array from schema.Schema(...) - handle nested brackets
|
|
fields_start_match = re.search(r'fields\s*=\s*\[', schema_body)
|
|
if not fields_start_match:
|
|
# Empty schema or no fields
|
|
return {"version": version, "schema": []}
|
|
|
|
# Find matching closing bracket
|
|
bracket_count = 1
|
|
i = fields_start_match.end()
|
|
while i < len(schema_body) and bracket_count > 0:
|
|
if schema_body[i] == '[':
|
|
bracket_count += 1
|
|
elif schema_body[i] == ']':
|
|
bracket_count -= 1
|
|
i += 1
|
|
|
|
if bracket_count != 0:
|
|
# Unmatched brackets
|
|
return {"version": version, "schema": []}
|
|
|
|
fields_text = schema_body[fields_start_match.end():i-1]
|
|
|
|
# Parse individual fields
|
|
schema_fields = []
|
|
# Match schema.FieldType(...) patterns
|
|
field_pattern = r'schema\.(\w+)\s*\((.*?)\)'
|
|
|
|
# Find all field definitions (handle nested parentheses)
|
|
pos = 0
|
|
while pos < len(fields_text):
|
|
match = re.search(field_pattern, fields_text[pos:], re.DOTALL)
|
|
if not match:
|
|
break
|
|
|
|
field_type = match.group(1)
|
|
field_start = pos + match.start()
|
|
field_end = pos + match.end()
|
|
|
|
# Handle nested parentheses properly
|
|
paren_count = 1
|
|
i = pos + match.start() + len(f'schema.{field_type}(')
|
|
while i < len(fields_text) and paren_count > 0:
|
|
if fields_text[i] == '(':
|
|
paren_count += 1
|
|
elif fields_text[i] == ')':
|
|
paren_count -= 1
|
|
i += 1
|
|
|
|
field_params_text = fields_text[pos + match.start() + len(f'schema.{field_type}('):i-1]
|
|
|
|
# Parse field
|
|
field_dict = self._parse_schema_field(field_type, field_params_text, var_table)
|
|
if field_dict:
|
|
schema_fields.append(field_dict)
|
|
|
|
pos = i
|
|
|
|
return {
|
|
"version": version,
|
|
"schema": schema_fields
|
|
}
|
|
|
|
def _extract_variable_definitions(self, content: str) -> Dict[str, List[Dict]]:
|
|
"""
|
|
Extract top-level variable assignments (for dropdown options).
|
|
|
|
Args:
|
|
content: .star file content
|
|
|
|
Returns:
|
|
Dict mapping variable names to their option lists
|
|
"""
|
|
var_table = {}
|
|
|
|
# Find variable definitions like: variableName = [schema.Option(...), ...]
|
|
var_pattern = r'^(\w+)\s*=\s*\[(.*?schema\.Option.*?)\]'
|
|
matches = re.finditer(var_pattern, content, re.MULTILINE | re.DOTALL)
|
|
|
|
for match in matches:
|
|
var_name = match.group(1)
|
|
options_text = match.group(2)
|
|
|
|
# Parse schema.Option entries
|
|
options = self._parse_schema_options(options_text, {})
|
|
if options:
|
|
var_table[var_name] = options
|
|
|
|
return var_table
|
|
|
|
def _extract_get_schema_body(self, content: str) -> Optional[str]:
|
|
"""
|
|
Extract get_schema() function body.
|
|
|
|
Args:
|
|
content: .star file content
|
|
|
|
Returns:
|
|
Function body text, or None if not found
|
|
"""
|
|
# Find def get_schema():
|
|
pattern = r'def\s+get_schema\s*\(\s*\)\s*:(.*?)(?=\ndef\s|\Z)'
|
|
match = re.search(pattern, content, re.DOTALL)
|
|
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
def _parse_schema_field(self, field_type: str, params_text: str, var_table: Dict) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Parse individual schema field definition.
|
|
|
|
Args:
|
|
field_type: Field type (Location, Text, Toggle, etc.)
|
|
params_text: Field parameters text
|
|
var_table: Variable lookup table
|
|
|
|
Returns:
|
|
Field dict, or None if parse fails
|
|
"""
|
|
# Map Pixlet field types to JSON typeOf
|
|
type_mapping = {
|
|
'Location': 'location',
|
|
'Text': 'text',
|
|
'Toggle': 'toggle',
|
|
'Dropdown': 'dropdown',
|
|
'Color': 'color',
|
|
'DateTime': 'datetime',
|
|
'OAuth2': 'oauth2',
|
|
'PhotoSelect': 'photo_select',
|
|
'LocationBased': 'location_based',
|
|
'Typeahead': 'typeahead',
|
|
'Generated': 'generated',
|
|
}
|
|
|
|
type_of = type_mapping.get(field_type, field_type.lower())
|
|
|
|
# Skip Generated fields (invisible meta-fields)
|
|
if type_of == 'generated':
|
|
return None
|
|
|
|
field_dict = {"typeOf": type_of}
|
|
|
|
# Extract common parameters
|
|
# id
|
|
id_match = re.search(r'id\s*=\s*"([^"]+)"', params_text)
|
|
if id_match:
|
|
field_dict['id'] = id_match.group(1)
|
|
else:
|
|
# id is required, skip field if missing
|
|
return None
|
|
|
|
# name
|
|
name_match = re.search(r'name\s*=\s*"([^"]+)"', params_text)
|
|
if name_match:
|
|
field_dict['name'] = name_match.group(1)
|
|
|
|
# desc
|
|
desc_match = re.search(r'desc\s*=\s*"([^"]+)"', params_text)
|
|
if desc_match:
|
|
field_dict['desc'] = desc_match.group(1)
|
|
|
|
# icon
|
|
icon_match = re.search(r'icon\s*=\s*"([^"]+)"', params_text)
|
|
if icon_match:
|
|
field_dict['icon'] = icon_match.group(1)
|
|
|
|
# default (can be string, bool, or variable reference)
|
|
default_match = re.search(r'default\s*=\s*([^,\)]+)', params_text)
|
|
if default_match:
|
|
default_value = default_match.group(1).strip()
|
|
# Handle boolean
|
|
if default_value in ('True', 'False'):
|
|
field_dict['default'] = default_value.lower()
|
|
# Handle string literal
|
|
elif default_value.startswith('"') and default_value.endswith('"'):
|
|
field_dict['default'] = default_value.strip('"')
|
|
# Handle variable reference (can't resolve, use as-is)
|
|
else:
|
|
# Try to extract just the value if it's like options[0].value
|
|
if '.' in default_value or '[' in default_value:
|
|
# Complex expression, skip default
|
|
pass
|
|
else:
|
|
field_dict['default'] = default_value
|
|
|
|
# For dropdown, extract options
|
|
if type_of == 'dropdown':
|
|
options_match = re.search(r'options\s*=\s*([^,\)]+)', params_text)
|
|
if options_match:
|
|
options_ref = options_match.group(1).strip()
|
|
# Check if it's a variable reference
|
|
if options_ref in var_table:
|
|
field_dict['options'] = var_table[options_ref]
|
|
# Or inline options
|
|
elif options_ref.startswith('['):
|
|
# Find the full options array (handle nested brackets)
|
|
# This is tricky, for now try to extract inline options
|
|
inline_match = re.search(r'options\s*=\s*(\[.*?\])', params_text, re.DOTALL)
|
|
if inline_match:
|
|
options_text = inline_match.group(1)
|
|
field_dict['options'] = self._parse_schema_options(options_text, var_table)
|
|
|
|
return field_dict
|
|
|
|
def _parse_schema_options(self, options_text: str, var_table: Dict) -> List[Dict[str, str]]:
|
|
"""
|
|
Parse schema.Option list.
|
|
|
|
Args:
|
|
options_text: Text containing schema.Option(...) entries
|
|
var_table: Variable lookup table (not currently used)
|
|
|
|
Returns:
|
|
List of {"display": "...", "value": "..."} dicts
|
|
"""
|
|
options = []
|
|
|
|
# Match schema.Option(display = "...", value = "...")
|
|
option_pattern = r'schema\.Option\s*\(\s*display\s*=\s*"([^"]+)"\s*,\s*value\s*=\s*"([^"]+)"\s*\)'
|
|
matches = re.finditer(option_pattern, options_text)
|
|
|
|
for match in matches:
|
|
options.append({
|
|
"display": match.group(1),
|
|
"value": match.group(2)
|
|
})
|
|
|
|
return options
|