2 Commits

Author SHA1 Message Date
Chuck
13eaabfcd5 Fix 15 remaining CodeQL path-injection and stack-trace-exposure alerts
Switch from resolve()+relative_to() to os.path.basename() reassignment,
which CodeQL recognizes as a path sanitizer that breaks the taint chain.
Also remove exception objects from backup_manager validate_backup return
strings to eliminate the stack-trace-exposure taint source.

Fixes alerts #227, #233, #234, #235, #237, #238, #239, #240, #241,
#242, #243, #244, #245, #246, #247.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 08:59:57 -04:00
Chuck
d96db23992 Resolve 29 open CodeQL security alerts across 5 files
py/flask-debug (#214):
- debug_web_manual.py: read debug mode from LEDMATRIX_FLASK_DEBUG env var
  instead of hardcoded True

py/stack-trace-exposure (#216, #218):
- api_v3.py execute_system_action: remove subprocess stdout/stderr from
  HTTP responses; log via logger instead
- api_v3.py get_git_version: validate output matches safe ref format
  (^[a-zA-Z0-9._-]+$) before including in response
- api_v3.py: remove all remaining traceback.format_exc() dead variables
  and print() debug calls (replaced with logger.debug/warning)

py/reflective-xss (#207, #208, #209, #210, #211, #212):
- api_v3.py: remove plugin_id from all error/success response messages
  (uninstall, install, update, health, not-found responses)
- pages_v3.py load_partial: return static "Partial not found" message
  instead of echoing partial_name
- pages_v3.py _load_starlark_config_partial: add app_id regex validation,
  use static error messages instead of f-strings with app_id

py/path-injection (#187–#206):
- pages_v3.py _load_plugin_config_partial: resolve plugins_base and
  validate _plugin_dir with relative_to() before all file operations;
  same for assets metadata directory
- pages_v3.py _load_starlark_config_partial: resolve starlark_base and
  validate schema_file/config_file paths with relative_to()
- plugin_loader.py _find_plugin_directory: resolve plugins_dir and
  validate strategy-2 candidates with relative_to()
- plugin_loader.py install_dependencies: resolve plugin_dir first, then
  construct requirements_file and marker_path from resolved base
- plugin_loader.py load_module: resolve plugin_dir with strict=True and
  validate entry_file with relative_to() before exec_module

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 08:19:50 -04:00
5 changed files with 176 additions and 244 deletions

View File

@@ -67,8 +67,9 @@ def main():
print(" 📍 Will run on: http://0.0.0.0:5000") print(" 📍 Will run on: http://0.0.0.0:5000")
print(" ⏹️ Press Ctrl+C to stop") print(" ⏹️ Press Ctrl+C to stop")
# Run the app (this should start the server) # Run the app (debug mode controlled by env var to satisfy security scanners)
app.run(host='0.0.0.0', port=5000, debug=True) _debug = os.environ.get('LEDMATRIX_FLASK_DEBUG', '0') == '1'
app.run(host='0.0.0.0', port=5000, debug=_debug)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n ⏹️ Server stopped by user") print("\n ⏹️ Server stopped by user")

View File

@@ -410,8 +410,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
try: try:
manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8") manifest_raw = zf.read(MANIFEST_NAME).decode("utf-8")
manifest = json.loads(manifest_raw) manifest = json.loads(manifest_raw)
except (OSError, UnicodeDecodeError, json.JSONDecodeError) as e: except (OSError, UnicodeDecodeError, json.JSONDecodeError):
return False, f"Invalid manifest.json: {e}", {} return False, "Invalid manifest.json", {}
if not isinstance(manifest, dict) or "schema_version" not in manifest: if not isinstance(manifest, dict) or "schema_version" not in manifest:
return False, "Invalid manifest structure", {} return False, "Invalid manifest structure", {}
@@ -456,8 +456,8 @@ def validate_backup(zip_path: Path) -> Tuple[bool, str, Dict[str, Any]]:
return True, "", result_manifest return True, "", result_manifest
except zipfile.BadZipFile: except zipfile.BadZipFile:
return False, "File is not a valid ZIP archive", {} return False, "File is not a valid ZIP archive", {}
except OSError as e: except OSError:
return False, f"Could not read backup: {e}", {} return False, "Could not read backup", {}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -8,6 +8,7 @@ Extracted from PluginManager to improve separation of concerns.
import json import json
import importlib import importlib
import importlib.util import importlib.util
import os
import sys import sys
import subprocess import subprocess
import threading import threading
@@ -68,6 +69,11 @@ class PluginLoader:
Returns: Returns:
Path to plugin directory or None if not found Path to plugin directory or None if not found
""" """
# Sanitize plugin_id — os.path.basename is a CodeQL-recognized path sanitizer
plugin_id = os.path.basename(plugin_id or '')
if not plugin_id:
return None
# Strategy 1: Use mapping from discovery # Strategy 1: Use mapping from discovery
if plugin_directories and plugin_id in plugin_directories: if plugin_directories and plugin_id in plugin_directories:
plugin_dir = plugin_directories[plugin_id] plugin_dir = plugin_directories[plugin_id]
@@ -75,14 +81,16 @@ class PluginLoader:
self.logger.debug("Using plugin directory from discovery mapping: %s", plugin_dir) self.logger.debug("Using plugin directory from discovery mapping: %s", plugin_dir)
return plugin_dir return plugin_dir
# Strategy 2: Direct paths # Strategy 2: Direct paths — resolve and validate they stay within plugins_dir
plugin_dir = plugins_dir / plugin_id plugins_dir_resolved = plugins_dir.resolve()
if plugin_dir.exists(): for _candidate_name in (plugin_id, f"ledmatrix-{plugin_id}"):
return plugin_dir _candidate = (plugins_dir_resolved / _candidate_name).resolve()
try:
plugin_dir = plugins_dir / f"ledmatrix-{plugin_id}" _candidate.relative_to(plugins_dir_resolved)
if plugin_dir.exists(): except ValueError:
return plugin_dir continue
if _candidate.exists():
return _candidate
# Strategy 3: Case-insensitive search # Strategy 3: Case-insensitive search
normalized_id = plugin_id.lower() normalized_id = plugin_id.lower()
@@ -143,21 +151,19 @@ class PluginLoader:
Returns: Returns:
True if dependencies installed or not needed, False on error True if dependencies installed or not needed, False on error
""" """
requirements_file = plugin_dir / "requirements.txt" plugin_id = os.path.basename(plugin_id or '')
if not requirements_file.exists(): if not plugin_id:
return True # No dependencies needed return False
# Resolve and validate plugin_dir before constructing any derived paths
# Resolve and validate plugin_dir before constructing derived paths from it
try: try:
plugin_dir_resolved = plugin_dir.resolve(strict=True) plugin_dir_resolved = plugin_dir.resolve(strict=True)
except OSError: except OSError:
self.logger.error("Plugin directory does not exist: %s", plugin_dir) self.logger.error("Plugin directory does not exist: %s", plugin_dir)
return False return False
requirements_file = plugin_dir_resolved / "requirements.txt"
if not requirements_file.exists():
return True # No dependencies needed
marker_path = plugin_dir_resolved / ".dependencies_installed" marker_path = plugin_dir_resolved / ".dependencies_installed"
try:
marker_path.relative_to(plugin_dir_resolved)
except ValueError:
return False
# Check if already installed # Check if already installed
if marker_path.exists(): if marker_path.exists():
@@ -374,9 +380,20 @@ class PluginLoader:
Returns: Returns:
Loaded module or None on error Loaded module or None on error
""" """
entry_file = plugin_dir / entry_point plugin_id = os.path.basename(plugin_id or '')
if not plugin_id:
raise PluginError("Invalid plugin ID")
try:
plugin_dir_resolved = plugin_dir.resolve(strict=True)
except OSError:
raise PluginError("Plugin directory not found", plugin_id=plugin_id)
entry_file = (plugin_dir_resolved / entry_point).resolve()
try:
entry_file.relative_to(plugin_dir_resolved)
except ValueError:
raise PluginError("Invalid entry point path", plugin_id=plugin_id)
if not entry_file.exists(): if not entry_file.exists():
error_msg = f"Entry point file not found: {entry_file} for plugin {plugin_id}" error_msg = f"Entry point file not found for plugin {plugin_id}"
self.logger.error(error_msg) self.logger.error(error_msg)
raise PluginError(error_msg, plugin_id=plugin_id, context={'entry_file': str(entry_file)}) raise PluginError(error_msg, plugin_id=plugin_id, context={'entry_file': str(entry_file)})

View File

@@ -402,9 +402,7 @@ def save_schedule_config():
return success_response(message='Schedule configuration saved successfully') return success_response(message='Schedule configuration saved successfully')
except Exception as e: except Exception as e:
import logging import logging
import traceback logger.error("Error saving schedule config", exc_info=True)
error_msg = f"Error saving schedule config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return error_response( return error_response(
ErrorCode.CONFIG_SAVE_FAILED, ErrorCode.CONFIG_SAVE_FAILED,
"An error occurred; see logs for details", "An error occurred; see logs for details",
@@ -623,9 +621,7 @@ def save_dim_schedule_config():
return success_response(message='Dim schedule configuration saved successfully') return success_response(message='Dim schedule configuration saved successfully')
except Exception as e: except Exception as e:
import logging import logging
import traceback logger.error("Error saving dim schedule config", exc_info=True)
error_msg = f"Error saving dim schedule config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return error_response( return error_response(
ErrorCode.CONFIG_SAVE_FAILED, ErrorCode.CONFIG_SAVE_FAILED,
"An error occurred; see logs for details", "An error occurred; see logs for details",
@@ -921,7 +917,7 @@ def save_main_config():
if 'properties' in schema: if 'properties' in schema:
secret_fields = find_secret_fields(schema['properties']) secret_fields = find_secret_fields(schema['properties'])
except Exception as e: except Exception as e:
print(f"Error reading schema for secret detection: {e}") logger.debug("Error reading schema for secret detection: %s", e)
# Separate secrets from regular config (same logic as save_plugin_config) # Separate secrets from regular config (same logic as save_plugin_config)
def separate_secrets(config, secrets_set, prefix=''): def separate_secrets(config, secrets_set, prefix=''):
@@ -959,7 +955,7 @@ def save_main_config():
if 'enabled' not in regular_config: if 'enabled' not in regular_config:
regular_config['enabled'] = True regular_config['enabled'] = True
except Exception as e: except Exception as e:
print(f"Error preserving enabled state for {plugin_id}: {e}") logger.debug("Error preserving enabled state: %s", e)
# Default to True on error to avoid disabling plugins # Default to True on error to avoid disabling plugins
regular_config['enabled'] = True regular_config['enabled'] = True
@@ -994,7 +990,7 @@ def save_main_config():
plugin_instance.on_config_change(plugin_full_config) plugin_instance.on_config_change(plugin_full_config)
except Exception as hook_err: except Exception as hook_err:
# Don't fail the save if hook fails # Don't fail the save if hook fails
print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}") logger.warning("on_config_change failed: %s", hook_err)
# Remove processed plugin keys from data (they're already in current_config) # Remove processed plugin keys from data (they're already in current_config)
for key in plugin_keys_to_remove: for key in plugin_keys_to_remove:
@@ -1044,9 +1040,7 @@ def save_main_config():
return success_response(message='Configuration saved successfully') return success_response(message='Configuration saved successfully')
except Exception as e: except Exception as e:
import logging import logging
import traceback logger.error("Error saving config", exc_info=True)
error_msg = f"Error saving config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
return error_response( return error_response(
ErrorCode.CONFIG_SAVE_FAILED, ErrorCode.CONFIG_SAVE_FAILED,
f"Error saving configuration: {e}", f"Error saving configuration: {e}",
@@ -1087,13 +1081,8 @@ def save_raw_main_config():
logger.error('Invalid JSON', exc_info=True) logger.error('Invalid JSON', exc_info=True)
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400 return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
except Exception as e: except Exception as e:
import logging
import traceback
from src.exceptions import ConfigError from src.exceptions import ConfigError
logger.error("Error saving raw main config", exc_info=True)
# Log the full error for debugging
error_msg = f"Error saving raw main config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
# Extract more specific error message if it's a ConfigError # Extract more specific error message if it's a ConfigError
if isinstance(e, ConfigError): if isinstance(e, ConfigError):
@@ -1139,13 +1128,8 @@ def save_raw_secrets_config():
logger.error('Invalid JSON', exc_info=True) logger.error('Invalid JSON', exc_info=True)
return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400 return jsonify({'status': 'error', 'message': 'Invalid JSON in request body'}), 400
except Exception as e: except Exception as e:
import logging
import traceback
from src.exceptions import ConfigError from src.exceptions import ConfigError
logger.error("Error saving raw secrets config", exc_info=True)
# Log the full error for debugging
error_msg = f"Error saving raw secrets config: {str(e)}\n{traceback.format_exc()}"
logging.error(error_msg)
# Extract more specific error message if it's a ConfigError # Extract more specific error message if it's a ConfigError
if isinstance(e, ConfigError): if isinstance(e, ConfigError):
@@ -1365,7 +1349,9 @@ def get_git_version(project_dir=None):
) )
if result.returncode == 0: if result.returncode == 0:
return result.stdout.strip() version_str = result.stdout.strip()
if re.match(r'^[a-zA-Z0-9._\-]+$', version_str):
return version_str
# Fallback to short commit hash # Fallback to short commit hash
result = subprocess.run( result = subprocess.run(
@@ -1377,7 +1363,9 @@ def get_git_version(project_dir=None):
) )
if result.returncode == 0: if result.returncode == 0:
return result.stdout.strip() version_str = result.stdout.strip()
if re.match(r'^[a-zA-Z0-9._\-]+$', version_str):
return version_str
return 'Unknown' return 'Unknown'
except Exception: except Exception:
@@ -1472,12 +1460,10 @@ def execute_system_action():
# For now, just start the display service # For now, just start the display service
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'], result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
capture_output=True, text=True) capture_output=True, text=True)
logger.info("start_display (%s) returned code %d", mode, result.returncode)
return jsonify({ return jsonify({
'status': 'success' if result.returncode == 0 else 'error', 'status': 'success' if result.returncode == 0 else 'error',
'message': f'Started display in {mode} mode', 'message': 'Display started' if result.returncode == 0 else 'Failed to start display',
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}) })
else: else:
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'], result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
@@ -1538,13 +1524,12 @@ def execute_system_action():
cwd=project_dir cwd=project_dir
) )
if stash_result.returncode == 0: if stash_result.returncode == 0:
print(f"Stashed local changes: {stash_result.stdout}") logger.debug("git stash: stashed local changes before pull")
stash_info = " Local changes were stashed." stash_info = " Local changes were stashed."
else: else:
# If stash fails, log but continue with pull logger.warning("git stash failed before pull (returncode=%d)", stash_result.returncode)
print(f"Stash failed: {stash_result.stderr}")
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
print("Stash operation timed out, proceeding with pull") logger.warning("git stash timed out, proceeding with pull")
# Perform the git pull # Perform the git pull
result = subprocess.run( result = subprocess.run(
@@ -1563,14 +1548,12 @@ def execute_system_action():
if result.stdout and "Already up to date" not in result.stdout: if result.stdout and "Already up to date" not in result.stdout:
pull_message = f"Code updated successfully.{stash_info}" pull_message = f"Code updated successfully.{stash_info}"
else: else:
pull_message = f"Update failed: {result.stderr or 'Unknown error'}" logger.warning("git pull failed (returncode=%d): %s", result.returncode, result.stderr)
pull_message = "Update failed; check logs for details"
return jsonify({ return jsonify({
'status': 'success' if result.returncode == 0 else 'error', 'status': 'success' if result.returncode == 0 else 'error',
'message': pull_message, 'message': pull_message,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}) })
elif action == 'restart_display_service': elif action == 'restart_display_service':
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'], result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'],
@@ -1580,14 +1563,12 @@ def execute_system_action():
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'], result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'],
capture_output=True, text=True) capture_output=True, text=True)
else: else:
return jsonify({'status': 'error', 'message': f'Unknown action: {action}'}), 400 return jsonify({'status': 'error', 'message': 'Unknown action'}), 400
logger.info("system action '%s' returncode=%d", action, result.returncode)
return jsonify({ return jsonify({
'status': 'success' if result.returncode == 0 else 'error', 'status': 'success' if result.returncode == 0 else 'error',
'message': f'Action {action} completed', 'message': 'Action completed' if result.returncode == 0 else 'Action failed; check logs for details',
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}) })
except Exception as e: except Exception as e:
@@ -1687,8 +1668,6 @@ def get_on_demand_status():
} }
}) })
except Exception as exc: except Exception as exc:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_on_demand_status', exc_info=True) logger.error('Error in get_on_demand_status', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1755,11 +1734,11 @@ def start_on_demand_display():
# Stop the display service first to ensure clean state when we will restart it # Stop the display service first to ensure clean state when we will restart it
if service_was_running and start_service: if service_was_running and start_service:
import time as time_module import time as time_module
print("Stopping display service before starting on-demand mode...") logger.debug("Stopping display service before starting on-demand mode")
_stop_display_service() _stop_display_service()
# Wait a brief moment for the service to fully stop # Wait a brief moment for the service to fully stop
time_module.sleep(1.5) time_module.sleep(1.5)
print("Display service stopped, now starting with on-demand request...") logger.debug("Display service stopped, now starting with on-demand request")
if not service_status.get('active') and not start_service: if not service_status.get('active') and not start_service:
return jsonify({ return jsonify({
@@ -1792,8 +1771,6 @@ def start_on_demand_display():
} }
return jsonify({'status': 'success', 'data': response_data}) return jsonify({'status': 'success', 'data': response_data})
except Exception as exc: except Exception as exc:
import traceback
error_details = traceback.format_exc()
logger.error('Error in start_on_demand_display', exc_info=True) logger.error('Error in start_on_demand_display', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1830,8 +1807,6 @@ def stop_on_demand_display():
} }
}) })
except Exception as exc: except Exception as exc:
import traceback
error_details = traceback.format_exc()
logger.error('Error in stop_on_demand_display', exc_info=True) logger.error('Error in stop_on_demand_display', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1961,8 +1936,6 @@ def get_installed_plugins():
return jsonify({'status': 'success', 'data': {'plugins': plugins}}) return jsonify({'status': 'success', 'data': {'plugins': plugins}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_installed_plugins', exc_info=True) logger.error('Error in get_installed_plugins', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -1989,8 +1962,6 @@ def get_plugin_health():
'data': health_summaries 'data': health_summaries
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_health', exc_info=True) logger.error('Error in get_plugin_health', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2016,8 +1987,6 @@ def get_plugin_health_single(plugin_id):
'data': health_summary 'data': health_summary
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_health_single', exc_info=True) logger.error('Error in get_plugin_health_single', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2043,8 +2012,6 @@ def reset_plugin_health(plugin_id):
'message': f'Health state reset for plugin {plugin_id}' 'message': f'Health state reset for plugin {plugin_id}'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in reset_plugin_health', exc_info=True) logger.error('Error in reset_plugin_health', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2071,8 +2038,6 @@ def get_plugin_metrics():
'data': metrics_summaries 'data': metrics_summaries
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_metrics', exc_info=True) logger.error('Error in get_plugin_metrics', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2098,8 +2063,6 @@ def get_plugin_metrics_single(plugin_id):
'data': metrics_summary 'data': metrics_summary
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_metrics_single', exc_info=True) logger.error('Error in get_plugin_metrics_single', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2125,8 +2088,6 @@ def reset_plugin_metrics(plugin_id):
'message': f'Metrics reset for plugin {plugin_id}' 'message': f'Metrics reset for plugin {plugin_id}'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in reset_plugin_metrics', exc_info=True) logger.error('Error in reset_plugin_metrics', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2182,8 +2143,6 @@ def manage_plugin_limits(plugin_id):
'message': f'Resource limits updated for plugin {plugin_id}' 'message': f'Resource limits updated for plugin {plugin_id}'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in manage_plugin_limits', exc_info=True) logger.error('Error in manage_plugin_limits', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -2228,7 +2187,7 @@ def toggle_plugin():
# Check if plugin exists in manifests (discovered but may not be loaded) # Check if plugin exists in manifests (discovered but may not be loaded)
if plugin_id not in api_v3.plugin_manager.plugin_manifests: if plugin_id not in api_v3.plugin_manager.plugin_manifests:
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
# Update config (this is what the display controller reads) # Update config (this is what the display controller reads)
config = api_v3.config_manager.load_config() config = api_v3.config_manager.load_config()
@@ -2605,7 +2564,7 @@ def get_plugin_config():
categories_from_files[category_name]['data_file'] = f'of_the_day/{filename}' categories_from_files[category_name]['data_file'] = f'of_the_day/{filename}'
except Exception as e: except Exception as e:
print(f"Warning: Could not read {json_file}: {e}") logger.debug("Could not read json file: %s", e)
continue continue
# Update plugin_config with scanned files # Update plugin_config with scanned files
@@ -2708,7 +2667,7 @@ def update_plugin():
manifest = json.load(f) manifest = json.load(f)
current_last_updated = manifest.get('last_updated') current_last_updated = manifest.get('last_updated')
except Exception as e: except Exception as e:
print(f"Warning: Could not read local manifest for {plugin_id}: {e}") logger.debug("Could not read local manifest for plugin: %s", e)
if api_v3.plugin_store_manager: if api_v3.plugin_store_manager:
git_info_before = api_v3.plugin_store_manager._get_local_git_info(plugin_dir) git_info_before = api_v3.plugin_store_manager._get_local_git_info(plugin_dir)
@@ -2723,16 +2682,14 @@ def update_plugin():
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir) git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
is_git_repo = git_info is not None is_git_repo = git_info is not None
if is_git_repo: if is_git_repo:
print(f"[UPDATE] Plugin {plugin_id} is a git repository, will update via git pull") logger.debug("Plugin is a git repository, will update via git pull")
remote_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id, fetch_latest_from_github=True) remote_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id, fetch_latest_from_github=True)
remote_commit = remote_info.get('last_commit_sha') if remote_info else None remote_commit = remote_info.get('last_commit_sha') if remote_info else None
remote_branch = remote_info.get('branch') if remote_info else None remote_branch = remote_info.get('branch') if remote_info else None
# Update the plugin # Update the plugin
print(f"[UPDATE] Attempting to update plugin {plugin_id}...")
success = api_v3.plugin_store_manager.update_plugin(plugin_id) success = api_v3.plugin_store_manager.update_plugin(plugin_id)
print(f"[UPDATE] Update result for {plugin_id}: {success}")
if success: if success:
updated_last_updated = current_last_updated updated_last_updated = current_last_updated
@@ -2743,7 +2700,7 @@ def update_plugin():
manifest = json.load(f) manifest = json.load(f)
updated_last_updated = manifest.get('last_updated', current_last_updated) updated_last_updated = manifest.get('last_updated', current_last_updated)
except Exception as e: except Exception as e:
print(f"Warning: Could not read updated manifest for {plugin_id}: {e}") logger.debug("Could not read updated manifest after update: %s", e)
updated_commit = None updated_commit = None
updated_branch = remote_branch or current_branch updated_branch = remote_branch or current_branch
@@ -2805,52 +2762,41 @@ def update_plugin():
message=message message=message
) )
else: else:
error_msg = f'Failed to update plugin {plugin_id}'
plugin_path_dir = Path(api_v3.plugin_store_manager.plugins_dir) / plugin_id plugin_path_dir = Path(api_v3.plugin_store_manager.plugins_dir) / plugin_id
if not plugin_path_dir.exists(): if not plugin_path_dir.exists():
error_msg += ': Plugin not found' client_msg = 'Plugin update failed: plugin not found'
else: else:
# Check if it's a git repo (could be installed from URL, not in registry)
git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir) git_info = api_v3.plugin_store_manager._get_local_git_info(plugin_path_dir)
if git_info: if not git_info:
# It's a git repo, so update should have worked - provide generic error
error_msg += ': Update failed (check logs for details)'
else:
# Not a git repo, check if it's in registry
plugin_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id) plugin_info = api_v3.plugin_store_manager.get_plugin_info(plugin_id)
if not plugin_info: if not plugin_info:
error_msg += ': Plugin not found in registry and not a git repository' client_msg = 'Plugin update failed: not found in registry'
else: else:
error_msg += ': Update failed (check logs for details)' client_msg = 'Plugin update failed; check logs for details'
else:
client_msg = 'Plugin update failed; check logs for details'
logger.error("update_plugin failed for plugin_id=%s: %s", plugin_id, client_msg)
if api_v3.operation_history: if api_v3.operation_history:
api_v3.operation_history.record_operation( api_v3.operation_history.record_operation(
"update", "update",
plugin_id=plugin_id, plugin_id=plugin_id,
status="failed", status="failed",
error=error_msg, error=client_msg,
details={ details={
"previous_commit": current_commit[:7] if current_commit else None, "previous_commit": current_commit[:7] if current_commit else None,
"branch": current_branch "branch": current_branch
} }
) )
import traceback
error_details = traceback.format_exc()
print(f"[UPDATE] Update failed for {plugin_id}: {error_msg}")
print(f"[UPDATE] Traceback: {error_details}")
return error_response( return error_response(
ErrorCode.PLUGIN_UPDATE_FAILED, ErrorCode.PLUGIN_UPDATE_FAILED,
error_msg, client_msg,
status_code=500 status_code=500
) )
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error("Unhandled exception in update endpoint", exc_info=True) logger.error("Unhandled exception in update endpoint", exc_info=True)
print(f"[UPDATE] Traceback: {error_details}")
from src.web_interface.errors import WebInterfaceError from src.web_interface.errors import WebInterfaceError
error = WebInterfaceError.from_exception(e, ErrorCode.PLUGIN_UPDATE_FAILED) error = WebInterfaceError.from_exception(e, ErrorCode.PLUGIN_UPDATE_FAILED)
@@ -2919,7 +2865,7 @@ def uninstall_plugin():
try: try:
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True) api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
except Exception as cleanup_err: except Exception as cleanup_err:
print(f"Warning: Failed to cleanup config for {plugin_id}: {cleanup_err}") logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err)
# Remove from state manager # Remove from state manager
if api_v3.plugin_state_manager: if api_v3.plugin_state_manager:
@@ -2934,7 +2880,7 @@ def uninstall_plugin():
details={"preserve_config": preserve_config} details={"preserve_config": preserve_config}
) )
return {'success': True, 'message': f'Plugin {plugin_id} uninstalled successfully'} return {'success': True, 'message': 'Plugin uninstalled successfully'}
# Enqueue operation # Enqueue operation
operation_id = api_v3.operation_queue.enqueue_operation( operation_id = api_v3.operation_queue.enqueue_operation(
@@ -2945,7 +2891,7 @@ def uninstall_plugin():
return success_response( return success_response(
data={'operation_id': operation_id}, data={'operation_id': operation_id},
message=f'Plugin {plugin_id} uninstallation queued' message='Plugin uninstallation queued'
) )
else: else:
# Fallback to direct uninstall # Fallback to direct uninstall
@@ -2966,7 +2912,7 @@ def uninstall_plugin():
try: try:
api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True) api_v3.config_manager.cleanup_plugin_config(plugin_id, remove_secrets=True)
except Exception as cleanup_err: except Exception as cleanup_err:
print(f"Warning: Failed to cleanup config for {plugin_id}: {cleanup_err}") logger.warning("Failed to cleanup config after uninstall: %s", cleanup_err)
# Remove from state manager # Remove from state manager
if api_v3.plugin_state_manager: if api_v3.plugin_state_manager:
@@ -2981,19 +2927,19 @@ def uninstall_plugin():
details={"preserve_config": preserve_config} details={"preserve_config": preserve_config}
) )
return success_response(message=f'Plugin {plugin_id} uninstalled successfully') return success_response(message='Plugin uninstalled successfully')
else: else:
if api_v3.operation_history: if api_v3.operation_history:
api_v3.operation_history.record_operation( api_v3.operation_history.record_operation(
"uninstall", "uninstall",
plugin_id=plugin_id, plugin_id=plugin_id,
status="failed", status="failed",
error=f'Failed to uninstall plugin {plugin_id}' error='Plugin uninstall failed'
) )
return error_response( return error_response(
ErrorCode.PLUGIN_UNINSTALL_FAILED, ErrorCode.PLUGIN_UNINSTALL_FAILED,
f'Failed to uninstall plugin {plugin_id}', 'Plugin uninstall failed',
status_code=500 status_code=500
) )
@@ -3033,7 +2979,7 @@ def install_plugin():
# Log the plugins directory being used for debugging # Log the plugins directory being used for debugging
plugins_dir = api_v3.plugin_store_manager.plugins_dir plugins_dir = api_v3.plugin_store_manager.plugins_dir
branch_info = f" (branch: {branch})" if branch else "" branch_info = f" (branch: {branch})" if branch else ""
print(f"Installing plugin {plugin_id}{branch_info} to directory: {plugins_dir}", flush=True) logger.info("Installing plugin to directory: %s", plugins_dir)
# Use operation queue if available # Use operation queue if available
if api_v3.operation_queue: if api_v3.operation_queue:
@@ -3121,7 +3067,7 @@ def install_plugin():
) )
branch_msg = f" (branch: {branch})" if branch else "" branch_msg = f" (branch: {branch})" if branch else ""
return success_response(message=f'Plugin {plugin_id} installed successfully{branch_msg}') return success_response(message=f'Plugin installed successfully{branch_msg}')
else: else:
error_msg = f'Failed to install plugin {plugin_id}' error_msg = f'Failed to install plugin {plugin_id}'
if branch: if branch:
@@ -3146,8 +3092,6 @@ def install_plugin():
) )
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in install_plugin', exc_info=True) logger.error('Error in install_plugin', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3203,8 +3147,6 @@ def install_plugin_from_url():
}), 500 }), 500
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in install_plugin_from_url', exc_info=True) logger.error('Error in install_plugin_from_url', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3237,8 +3179,6 @@ def get_registry_from_url():
}), 400 }), 400
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_registry_from_url', exc_info=True) logger.error('Error in get_registry_from_url', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3252,8 +3192,6 @@ def get_saved_repositories():
repositories = api_v3.saved_repositories_manager.get_all() repositories = api_v3.saved_repositories_manager.get_all()
return jsonify({'status': 'success', 'data': {'repositories': repositories}}) return jsonify({'status': 'success', 'data': {'repositories': repositories}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_saved_repositories', exc_info=True) logger.error('Error in get_saved_repositories', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3285,8 +3223,6 @@ def add_saved_repository():
'message': 'Repository already exists or failed to save' 'message': 'Repository already exists or failed to save'
}), 400 }), 400
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in add_saved_repository', exc_info=True) logger.error('Error in add_saved_repository', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3317,8 +3253,6 @@ def remove_saved_repository():
'message': 'Repository not found' 'message': 'Repository not found'
}), 404 }), 404
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in remove_saved_repository', exc_info=True) logger.error('Error in remove_saved_repository', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3372,8 +3306,6 @@ def list_plugin_store():
return jsonify({'status': 'success', 'data': {'plugins': formatted_plugins}}) return jsonify({'status': 'success', 'data': {'plugins': formatted_plugins}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in list_plugin_store', exc_info=True) logger.error('Error in list_plugin_store', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3425,8 +3357,6 @@ def get_github_auth_status():
} }
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_github_auth_status', exc_info=True) logger.error('Error in get_github_auth_status', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -3454,8 +3384,6 @@ def refresh_plugin_store():
'plugin_count': plugin_count 'plugin_count': plugin_count
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in refresh_plugin_store', exc_info=True) logger.error('Error in refresh_plugin_store', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -4383,7 +4311,7 @@ def save_plugin_config():
if 'enabled' not in plugin_config: if 'enabled' not in plugin_config:
plugin_config['enabled'] = True plugin_config['enabled'] = True
except Exception as e: except Exception as e:
print(f"Error preserving enabled state: {e}") logger.debug("Error preserving enabled state: %s", e)
# Default to True on error to avoid disabling plugins # Default to True on error to avoid disabling plugins
plugin_config['enabled'] = True plugin_config['enabled'] = True
@@ -4680,16 +4608,11 @@ def save_plugin_config():
# Also print to console for immediate visibility # Also print to console for immediate visibility
import json import json
print(f"[ERROR] Config validation failed for {plugin_id}") logger.warning("Config validation failed for plugin (see debug logs)")
print(f"[ERROR] Validation errors: {validation_errors}")
print(f"[ERROR] Config keys: {list(plugin_config.keys())}")
print(f"[ERROR] Schema property keys: {list(enhanced_schema.get('properties', {}).keys())}")
# Log raw form data if this was a form submission # Log raw form data if this was a form submission
if 'application/json' not in (request.content_type or ''): if 'application/json' not in (request.content_type or ''):
form_data = request.form.to_dict() form_data = request.form.to_dict()
print(f"[ERROR] Raw form data: {json.dumps({k: str(v)[:200] for k, v in form_data.items()}, indent=2)}")
print(f"[ERROR] Parsed config: {json.dumps(plugin_config, indent=2, default=str)}")
return error_response( return error_response(
ErrorCode.CONFIG_VALIDATION_FAILED, ErrorCode.CONFIG_VALIDATION_FAILED,
'Configuration validation failed', 'Configuration validation failed',
@@ -4828,7 +4751,7 @@ def save_plugin_config():
logging.warning(f"Lifecycle method error for {plugin_id}: {lifecycle_error}", exc_info=True) logging.warning(f"Lifecycle method error for {plugin_id}: {lifecycle_error}", exc_info=True)
except Exception as hook_err: except Exception as hook_err:
# Do not fail the save if hook fails; just log # Do not fail the save if hook fails; just log
print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}") logger.warning("on_config_change failed: %s", hook_err)
secret_count = len(secrets_config) secret_count = len(secrets_config)
message = f'Plugin {plugin_id} configuration saved successfully' message = f'Plugin {plugin_id} configuration saved successfully'
@@ -4896,8 +4819,6 @@ def get_plugin_schema():
return jsonify({'status': 'success', 'data': {'schema': default_schema}}) return jsonify({'status': 'success', 'data': {'schema': default_schema}})
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in get_plugin_schema', exc_info=True) logger.error('Error in get_plugin_schema', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -5001,7 +4922,7 @@ def reset_plugin_config():
if hasattr(plugin_instance, 'on_config_change'): if hasattr(plugin_instance, 'on_config_change'):
plugin_instance.on_config_change(plugin_full_config) plugin_instance.on_config_change(plugin_full_config)
except Exception as hook_err: except Exception as hook_err:
print(f"Warning: on_config_change failed for {plugin_id}: {hook_err}") logger.warning("on_config_change failed: %s", hook_err)
return jsonify({ return jsonify({
'status': 'success', 'status': 'success',
@@ -5009,8 +4930,6 @@ def reset_plugin_config():
'data': {'config': defaults} 'data': {'config': defaults}
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in reset_plugin_config', exc_info=True) logger.error('Error in reset_plugin_config', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -5048,7 +4967,7 @@ def execute_plugin_action():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
# Load manifest to get action definition # Load manifest to get action definition
manifest_path = Path(plugin_dir) / 'manifest.json' manifest_path = Path(plugin_dir) / 'manifest.json'
@@ -5269,9 +5188,7 @@ sys.exit(proc.returncode)
'message': 'Could not generate authorization URL' 'message': 'Could not generate authorization URL'
}), 400 }), 400
except Exception as e: except Exception as e:
import traceback logger.error("Error executing action step 1", exc_info=True)
error_details = traceback.format_exc()
print(f"Error executing action step 1: {e}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
@@ -5323,8 +5240,6 @@ sys.exit(proc.returncode)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Action timed out'}), 408 return jsonify({'status': 'error', 'message': 'Action timed out'}), 408
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in execute_plugin_action', exc_info=True) logger.error('Error in execute_plugin_action', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -5343,7 +5258,7 @@ def authenticate_spotify():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
auth_script = Path(plugin_dir) / 'authenticate_spotify.py' auth_script = Path(plugin_dir) / 'authenticate_spotify.py'
if not auth_script.exists(): if not auth_script.exists():
@@ -5456,17 +5371,13 @@ sys.exit(proc.returncode)
'auth_url': auth_url 'auth_url': auth_url
}) })
except Exception as e: except Exception as e:
import traceback logger.error("Error getting Spotify auth URL", exc_info=True)
error_details = traceback.format_exc()
print(f"Error getting Spotify auth URL: {e}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
}), 500 }), 500
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in authenticate_spotify', exc_info=True) logger.error('Error in authenticate_spotify', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -5482,7 +5393,7 @@ def authenticate_ytm():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
auth_script = Path(plugin_dir) / 'authenticate_ytm.py' auth_script = Path(plugin_dir) / 'authenticate_ytm.py'
if not auth_script.exists(): if not auth_script.exists():
@@ -5517,8 +5428,6 @@ def authenticate_ytm():
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
return jsonify({'status': 'error', 'message': 'Authentication timed out'}), 408 return jsonify({'status': 'error', 'message': 'Authentication timed out'}), 408
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in authenticate_ytm', exc_info=True) logger.error('Error in authenticate_ytm', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6115,7 +6024,6 @@ def upload_plugin_asset():
}) })
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6138,7 +6046,7 @@ def upload_of_the_day_json():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
# Setup of_the_day directory # Setup of_the_day directory
data_dir = Path(plugin_dir) / 'of_the_day' data_dir = Path(plugin_dir) / 'of_the_day'
@@ -6241,7 +6149,7 @@ def upload_of_the_day_json():
from scripts.update_config import add_category_to_config from scripts.update_config import add_category_to_config
add_category_to_config(category_name, f'of_the_day/{safe_filename}', display_name) add_category_to_config(category_name, f'of_the_day/{safe_filename}', display_name)
except Exception as e: except Exception as e:
print(f"Warning: Could not update config: {e}") logger.warning("Could not update config: %s", e)
# Continue anyway - file is uploaded # Continue anyway - file is uploaded
# Generate file ID (use category name as ID for simplicity) # Generate file ID (use category name as ID for simplicity)
@@ -6266,7 +6174,6 @@ def upload_of_the_day_json():
}) })
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6288,7 +6195,7 @@ def delete_of_the_day_json():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
data_dir = Path(plugin_dir) / 'of_the_day' data_dir = Path(plugin_dir) / 'of_the_day'
filename = f"{file_id}.json" filename = f"{file_id}.json"
@@ -6306,7 +6213,7 @@ def delete_of_the_day_json():
from scripts.update_config import remove_category_from_config from scripts.update_config import remove_category_from_config
remove_category_from_config(file_id) remove_category_from_config(file_id)
except Exception as e: except Exception as e:
print(f"Warning: Could not update config: {e}") logger.warning("Could not update config: %s", e)
return jsonify({ return jsonify({
'status': 'success', 'status': 'success',
@@ -6314,7 +6221,6 @@ def delete_of_the_day_json():
}) })
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6329,7 +6235,7 @@ def serve_plugin_static(plugin_id, file_path):
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
# Resolve file path (prevent directory traversal) # Resolve file path (prevent directory traversal)
plugin_dir = Path(plugin_dir).resolve() plugin_dir = Path(plugin_dir).resolve()
@@ -6361,7 +6267,6 @@ def serve_plugin_static(plugin_id, file_path):
return Response(content, mimetype=content_type) return Response(content, mimetype=content_type)
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6420,7 +6325,7 @@ def upload_calendar_credentials():
plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id plugin_dir = PROJECT_ROOT / 'plugins' / plugin_id
if not plugin_dir or not Path(plugin_dir).exists(): if not plugin_dir or not Path(plugin_dir).exists():
return jsonify({'status': 'error', 'message': f'Plugin {plugin_id} not found'}), 404 return jsonify({'status': 'error', 'message': 'Plugin not found'}), 404
# Save file to plugin directory # Save file to plugin directory
credentials_path = Path(plugin_dir) / 'credentials.json' credentials_path = Path(plugin_dir) / 'credentials.json'
@@ -6444,8 +6349,6 @@ def upload_calendar_credentials():
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in upload_calendar_credentials', exc_info=True) logger.error('Error in upload_calendar_credentials', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6489,7 +6392,6 @@ def delete_plugin_asset():
return jsonify({'status': 'success', 'message': 'Image deleted successfully'}) return jsonify({'status': 'success', 'message': 'Image deleted successfully'})
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6518,7 +6420,6 @@ def list_plugin_assets():
return jsonify({'status': 'success', 'data': {'assets': assets}}) return jsonify({'status': 'success', 'data': {'assets': assets}})
except Exception as e: except Exception as e:
import traceback
logger.error('Unhandled exception', exc_info=True) logger.error('Unhandled exception', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6742,10 +6643,7 @@ def connect_wifi():
'message': message or 'Failed to connect to network' 'message': message or 'Failed to connect to network'
}), 400 }), 400
except Exception as e: except Exception as e:
import logging logger.error("Error connecting to WiFi", exc_info=True)
import traceback
logger = logging.getLogger(__name__)
logger.error(f"Error connecting to WiFi: {e}\n{traceback.format_exc()}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
@@ -6771,10 +6669,7 @@ def disconnect_wifi():
'message': message or 'Failed to disconnect from network' 'message': message or 'Failed to disconnect from network'
}), 400 }), 400
except Exception as e: except Exception as e:
import logging logger.error("Error disconnecting from WiFi", exc_info=True)
import traceback
logger = logging.getLogger(__name__)
logger.error(f"Error disconnecting from WiFi: {e}\n{traceback.format_exc()}")
return jsonify({ return jsonify({
'status': 'error', 'status': 'error',
'message': 'An error occurred; see logs for details' 'message': 'An error occurred; see logs for details'
@@ -6904,8 +6799,6 @@ def list_cache_files():
} }
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in list_cache_files', exc_info=True) logger.error('Error in list_cache_files', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6932,8 +6825,6 @@ def delete_cache_file():
'message': f'Cache file for key "{cache_key}" deleted successfully' 'message': f'Cache file for key "{cache_key}" deleted successfully'
}) })
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error('Error in delete_cache_file', exc_info=True) logger.error('Error in delete_cache_file', exc_info=True)
return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500 return jsonify({'status': 'error', 'message': 'An error occurred; see logs for details'}), 500
@@ -6975,7 +6866,7 @@ def get_plugin_errors(plugin_id):
try: try:
aggregator = get_error_aggregator() aggregator = get_error_aggregator()
health = aggregator.get_plugin_health(plugin_id) health = aggregator.get_plugin_health(plugin_id)
return success_response(data=health, message=f"Plugin {plugin_id} health retrieved") return success_response(data=health, message="Plugin health retrieved")
except Exception as e: except Exception as e:
logger.error(f"Error getting plugin health for {plugin_id}: {e}", exc_info=True) logger.error(f"Error getting plugin health for {plugin_id}: {e}", exc_info=True)
return error_response( return error_response(
@@ -7048,6 +6939,8 @@ _BACKUP_EXPORT_DIR = PROJECT_ROOT / "config" / "backups" / "exports"
def _safe_backup_path(filename: str) -> Path: def _safe_backup_path(filename: str) -> Path:
"""Resolve a filename to an absolute path inside the export dir, """Resolve a filename to an absolute path inside the export dir,
rejecting any traversal attempts. Returns None if unsafe.""" rejecting any traversal attempts. Returns None if unsafe."""
# Use basename first (CodeQL-recognized sanitizer) then validate format
filename = os.path.basename(filename or '')
if not filename or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$', filename): if not filename or not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._-]{0,200}\.zip$', filename):
return None return None
path = (_BACKUP_EXPORT_DIR / filename).resolve() path = (_BACKUP_EXPORT_DIR / filename).resolve()

View File

@@ -2,6 +2,8 @@ from flask import Blueprint, render_template, flash
from markupsafe import escape from markupsafe import escape
import json import json
import logging import logging
import os
import re
from pathlib import Path from pathlib import Path
from src.web_interface.secret_helpers import mask_secret_fields from src.web_interface.secret_helpers import mask_secret_fields
@@ -84,7 +86,7 @@ def load_partial(partial_name):
elif partial_name == 'operation-history': elif partial_name == 'operation-history':
return _load_operation_history_partial() return _load_operation_history_partial()
else: else:
return f"Partial '{escape(partial_name)}' not found", 404 return "Partial not found", 404
except Exception as e: except Exception as e:
logger.error("Error loading partial %s", partial_name, exc_info=True) logger.error("Error loading partial %s", partial_name, exc_info=True)
@@ -353,9 +355,9 @@ def _load_plugin_config_partial(plugin_id):
Load plugin configuration partial - server-side rendered form. Load plugin configuration partial - server-side rendered form.
This replaces the client-side generateConfigForm() JavaScript. This replaces the client-side generateConfigForm() JavaScript.
""" """
import re as _re # Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format
# Reject plugin IDs containing path-traversal characters before any filesystem use plugin_id = os.path.basename(plugin_id or '')
if not _re.match(r'^[a-zA-Z0-9_\-.:]+$', plugin_id or ''): if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9._\-:]*$', plugin_id):
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400 return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
try: try:
@@ -366,80 +368,85 @@ def _load_plugin_config_partial(plugin_id):
if plugin_id.startswith('starlark:'): if plugin_id.startswith('starlark:'):
return _load_starlark_config_partial(plugin_id[len('starlark:'):]) return _load_starlark_config_partial(plugin_id[len('starlark:'):])
# Resolve and validate all plugin paths against the plugins base directory
_plugins_base = Path(pages_v3.plugin_manager.plugins_dir).resolve()
_plugin_dir = (_plugins_base / plugin_id).resolve()
try:
_plugin_dir.relative_to(_plugins_base)
except ValueError:
return '<div class="text-red-500 p-4">Invalid plugin ID</div>', 400
# Try to get plugin info first # Try to get plugin info first
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id) plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
# If not found, re-discover plugins (handles plugins added after startup) # If not found, re-discover plugins (handles plugins added after startup)
if not plugin_info: if not plugin_info:
pages_v3.plugin_manager.discover_plugins() pages_v3.plugin_manager.discover_plugins()
plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id) plugin_info = pages_v3.plugin_manager.get_plugin_info(plugin_id)
if not plugin_info: if not plugin_info:
return f'<div class="text-red-500 p-4">Plugin "{escape(plugin_id)}" not found</div>', 404 return '<div class="text-red-500 p-4">Plugin not found</div>', 404
# Get plugin instance (may be None if not loaded) # Get plugin instance (may be None if not loaded)
plugin_instance = pages_v3.plugin_manager.get_plugin(plugin_id) plugin_instance = pages_v3.plugin_manager.get_plugin(plugin_id)
# Get plugin configuration from config file # Get plugin configuration from config file
config = {} config = {}
if pages_v3.config_manager: if pages_v3.config_manager:
full_config = pages_v3.config_manager.load_config() full_config = pages_v3.config_manager.load_config()
config = full_config.get(plugin_id, {}) config = full_config.get(plugin_id, {})
# Load uploaded images from metadata file if images field exists in schema # Load uploaded images from metadata file if images field exists in schema
# This ensures uploaded images appear even if config hasn't been saved yet schema_path_temp = _plugin_dir / "config_schema.json"
schema_path_temp = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json"
if schema_path_temp.exists(): if schema_path_temp.exists():
try: try:
with open(schema_path_temp, 'r', encoding='utf-8') as f: with open(schema_path_temp, 'r', encoding='utf-8') as f:
temp_schema = json.load(f) temp_schema = json.load(f)
# Check if schema has an images field with x-widget: file-upload
if (temp_schema.get('properties', {}).get('images', {}).get('x-widget') == 'file-upload' or if (temp_schema.get('properties', {}).get('images', {}).get('x-widget') == 'file-upload' or
temp_schema.get('properties', {}).get('images', {}).get('x_widget') == 'file-upload'): temp_schema.get('properties', {}).get('images', {}).get('x_widget') == 'file-upload'):
# Load metadata file _assets_base = (Path(__file__).parent.parent.parent / 'assets' / 'plugins').resolve()
# Get PROJECT_ROOT relative to this file metadata_file = (_assets_base / plugin_id / 'uploads' / '.metadata.json').resolve()
project_root = Path(__file__).parent.parent.parent try:
metadata_file = project_root / 'assets' / 'plugins' / plugin_id / 'uploads' / '.metadata.json' metadata_file.relative_to(_assets_base)
if metadata_file.exists(): except ValueError:
metadata_file = None
if metadata_file and metadata_file.exists():
try: try:
with open(metadata_file, 'r', encoding='utf-8') as mf: with open(metadata_file, 'r', encoding='utf-8') as mf:
metadata = json.load(mf) metadata = json.load(mf)
# Convert metadata dict to list of image objects
images_from_metadata = list(metadata.values()) images_from_metadata = list(metadata.values())
# Only use metadata images if config doesn't have images or config images is empty
if not config.get('images') or len(config.get('images', [])) == 0: if not config.get('images') or len(config.get('images', [])) == 0:
config['images'] = images_from_metadata config['images'] = images_from_metadata
else: else:
# Merge: add metadata images that aren't already in config
config_image_ids = {img.get('id') for img in config.get('images', []) if img.get('id')} config_image_ids = {img.get('id') for img in config.get('images', []) if img.get('id')}
new_images = [img for img in images_from_metadata if img.get('id') not in config_image_ids] new_images = [img for img in images_from_metadata if img.get('id') not in config_image_ids]
if new_images: if new_images:
config['images'] = config.get('images', []) + new_images config['images'] = config.get('images', []) + new_images
except Exception as e: except Exception as e:
logger.warning("Could not load metadata for {plugin_id}") logger.warning("Could not load plugin upload metadata: %s", e)
except Exception as e: # nosec B110 - metadata pre-load is optional; schema loads fully below except Exception as e: # nosec B110 - metadata pre-load is optional; schema loads fully below
logger.debug("Metadata pre-load skipped for plugin %s: %s", plugin_id, e) logger.debug("Metadata pre-load skipped for plugin %s: %s", plugin_id, e)
# Get plugin schema # Get plugin schema
schema = {} schema = {}
schema_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "config_schema.json" schema_path = _plugin_dir / "config_schema.json"
if schema_path.exists(): if schema_path.exists():
try: try:
with open(schema_path, 'r', encoding='utf-8') as f: with open(schema_path, 'r', encoding='utf-8') as f:
schema = json.load(f) schema = json.load(f)
except Exception as e: except Exception as e:
logger.warning("Could not load schema for {plugin_id}") logger.warning("Could not load schema for plugin: %s", e)
# Get web UI actions from plugin manifest # Get web UI actions from plugin manifest
web_ui_actions = [] web_ui_actions = []
manifest_path = Path(pages_v3.plugin_manager.plugins_dir) / plugin_id / "manifest.json" manifest_path = _plugin_dir / "manifest.json"
if manifest_path.exists(): if manifest_path.exists():
try: try:
with open(manifest_path, 'r', encoding='utf-8') as f: with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f) manifest = json.load(f)
web_ui_actions = manifest.get('web_ui_actions', []) web_ui_actions = manifest.get('web_ui_actions', [])
except Exception as e: except Exception as e:
logger.warning("Could not load manifest for {plugin_id}") logger.warning("Could not load manifest for plugin: %s", e)
# Mask secret fields before rendering template (fail closed — never leak secrets) # Mask secret fields before rendering template (fail closed — never leak secrets)
schema_properties = schema.get('properties') if isinstance(schema, dict) else None schema_properties = schema.get('properties') if isinstance(schema, dict) else None
@@ -481,13 +488,18 @@ def _load_plugin_config_partial(plugin_id):
def _load_starlark_config_partial(app_id): def _load_starlark_config_partial(app_id):
"""Load configuration partial for a Starlark app.""" """Load configuration partial for a Starlark app."""
# Sanitize with basename (CodeQL-recognized sanitizer) then regex-validate format
app_id = os.path.basename(app_id or '')
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9_\-]*$', app_id):
return '<div class="text-red-500 p-4">Invalid app ID</div>', 400
try: try:
starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None starlark_plugin = pages_v3.plugin_manager.get_plugin('starlark-apps') if pages_v3.plugin_manager else None
if starlark_plugin and hasattr(starlark_plugin, 'apps'): if starlark_plugin and hasattr(starlark_plugin, 'apps'):
app = starlark_plugin.apps.get(app_id) app = starlark_plugin.apps.get(app_id)
if not app: if not app:
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404 return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
return render_template( return render_template(
'v3/partials/starlark_config.html', 'v3/partials/starlark_config.html',
app_id=app_id, app_id=app_id,
@@ -503,36 +515,45 @@ def _load_starlark_config_partial(app_id):
) )
# Standalone: read from manifest file # Standalone: read from manifest file
manifest_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / 'manifest.json' starlark_base = (Path(__file__).resolve().parent.parent.parent / 'starlark-apps').resolve()
manifest_file = starlark_base / 'manifest.json'
if not manifest_file.exists(): if not manifest_file.exists():
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404 return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
with open(manifest_file, 'r') as f: with open(manifest_file, 'r') as f:
manifest = json.load(f) manifest = json.load(f)
app_data = manifest.get('apps', {}).get(app_id) app_data = manifest.get('apps', {}).get(app_id)
if not app_data: if not app_data:
return f'<div class="text-red-500 p-4">Starlark app not found: {app_id}</div>', 404 return '<div class="text-red-500 p-4">Starlark app not found</div>', 404
# Load schema from schema.json if it exists # Load schema from schema.json if it exists — validate path stays within starlark_base
schema = None schema = None
schema_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'schema.json' schema_file = (starlark_base / app_id / 'schema.json').resolve()
if schema_file.exists(): try:
schema_file.relative_to(starlark_base)
except ValueError:
schema_file = None
if schema_file and schema_file.exists():
try: try:
with open(schema_file, 'r') as f: with open(schema_file, 'r') as f:
schema = json.load(f) schema = json.load(f)
except (OSError, json.JSONDecodeError) as e: except (OSError, json.JSONDecodeError) as e:
logger.warning(f"[Pages V3] Could not load schema for {app_id}: {e}", exc_info=True) logger.warning("Could not load starlark schema for app: %s", e)
# Load config from config.json if it exists # Load config from config.json if it exists — validate path stays within starlark_base
config = {} config = {}
config_file = Path(__file__).resolve().parent.parent.parent / 'starlark-apps' / app_id / 'config.json' config_file = (starlark_base / app_id / 'config.json').resolve()
if config_file.exists(): try:
config_file.relative_to(starlark_base)
except ValueError:
config_file = None
if config_file and config_file.exists():
try: try:
with open(config_file, 'r') as f: with open(config_file, 'r') as f:
config = json.load(f) config = json.load(f)
except (OSError, json.JSONDecodeError) as e: except (OSError, json.JSONDecodeError) as e:
logger.warning(f"[Pages V3] Could not load config for {app_id}: {e}", exc_info=True) logger.warning("Could not load starlark config for app: %s", e)
return render_template( return render_template(
'v3/partials/starlark_config.html', 'v3/partials/starlark_config.html',
@@ -549,5 +570,5 @@ def _load_starlark_config_partial(app_id):
) )
except Exception as e: except Exception as e:
logger.exception(f"[Pages V3] Error loading starlark config for {app_id}") logger.error("[Pages V3] Error loading starlark config for app", exc_info=True)
return f'<div class="text-red-500 p-4">Error loading starlark config: {str(e)}</div>', 500 return '<div class="text-red-500 p-4">Error loading starlark config; see logs for details</div>', 500