From 92fb0989cedb15346286a8948df1856d7e654bd2 Mon Sep 17 00:00:00 2001 From: Chuck Date: Thu, 19 Feb 2026 21:38:37 -0500 Subject: [PATCH] fix(starlark): critical path traversal and exception handling fixes Path traversal security fixes (CRITICAL): - Add _validate_starlark_app_path() helper to check for path traversal attacks - Validate app_id in get_starlark_app(), uninstall_starlark_app(), get_starlark_app_config(), and update_starlark_app_config() - Check for '..' and path separators before any filesystem access - Verify resolved paths are within _STARLARK_APPS_DIR using Path.relative_to() - Prevents unauthorized file access via crafted app_id like '../../../etc/passwd' Exception handling improvements (tronbyte_repository.py): - Replace broad "except Exception" with specific types - _make_request: catch requests.Timeout, requests.RequestException, json.JSONDecodeError - _fetch_raw_file: catch requests.Timeout, requests.RequestException separately - download_app_assets: narrow to OSError, ValueError - Add "[Tronbyte Repo]" context prefix to all log messages - Use exc_info=True for better stack traces API improvements: - Narrow exception catches to OSError, json.JSONDecodeError in config loading - Remove duplicate path traversal checks (now centralized in helper) Co-Authored-By: Claude Sonnet 4.5 --- .../starlark-apps/tronbyte_repository.py | 35 +++++----- web_interface/blueprints/api_v3.py | 69 +++++++++++++++---- 2 files changed, 75 insertions(+), 29 deletions(-) diff --git a/plugin-repos/starlark-apps/tronbyte_repository.py b/plugin-repos/starlark-apps/tronbyte_repository.py index 1c1eaca4..ba647de0 100644 --- a/plugin-repos/starlark-apps/tronbyte_repository.py +++ b/plugin-repos/starlark-apps/tronbyte_repository.py @@ -75,25 +75,25 @@ class TronbyteRepository: if response.status_code == 403: # Rate limit exceeded - logger.warning("GitHub API rate limit exceeded") + logger.warning("[Tronbyte Repo] GitHub API rate limit exceeded") return None elif response.status_code == 404: - logger.warning(f"Resource not found: {url}") + logger.warning(f"[Tronbyte Repo] Resource not found: {url}") return None elif response.status_code != 200: - logger.error(f"GitHub API error: {response.status_code}") + logger.error(f"[Tronbyte Repo] GitHub API error: {response.status_code}") return None return response.json() except requests.Timeout: - logger.error(f"Request timeout: {url}") + logger.error(f"[Tronbyte Repo] Request timeout: {url}") return None except requests.RequestException as e: - logger.error(f"Request error: {e}") + logger.error(f"[Tronbyte Repo] Request error: {e}", exc_info=True) return None - except Exception as e: - logger.error(f"Unexpected error: {e}") + except (json.JSONDecodeError, ValueError) as e: + logger.error(f"[Tronbyte Repo] JSON parse error for {url}: {e}", exc_info=True) return None def _fetch_raw_file(self, file_path: str, branch: Optional[str] = None, binary: bool = False): @@ -116,10 +116,13 @@ class TronbyteRepository: if response.status_code == 200: return response.content if binary else response.text else: - logger.warning(f"Failed to fetch raw file: {file_path} ({response.status_code})") + logger.warning(f"[Tronbyte Repo] Failed to fetch raw file: {file_path} ({response.status_code})") return None - except Exception as e: - logger.error(f"Error fetching raw file {file_path}: {e}") + except requests.Timeout: + logger.error(f"[Tronbyte Repo] Timeout fetching raw file: {file_path}") + return None + except requests.RequestException as e: + logger.error(f"[Tronbyte Repo] Network error fetching raw file {file_path}: {e}", exc_info=True) return None def list_apps(self) -> Tuple[bool, Optional[List[Dict[str, Any]]], Optional[str]]: @@ -502,17 +505,17 @@ class TronbyteRepository: try: with open(output_path, 'wb') as f: f.write(content) - logger.debug(f"Downloaded asset: {dir_name}/{file_name}") - except Exception as e: - logger.warning(f"Failed to save {dir_name}/{file_name}: {e}") + logger.debug(f"[Tronbyte Repo] Downloaded asset: {dir_name}/{file_name}") + except OSError as e: + logger.warning(f"[Tronbyte Repo] Failed to save {dir_name}/{file_name}: {e}", exc_info=True) else: logger.warning(f"Failed to download {dir_name}/{file_name}") - logger.info(f"Downloaded assets for {app_id} ({len(asset_dirs)} directories)") + logger.info(f"[Tronbyte Repo] Downloaded assets for {app_id} ({len(asset_dirs)} directories)") return True, None - except Exception as e: - logger.exception(f"Error downloading assets for {app_id}: {e}") + except (OSError, ValueError) as e: + logger.exception(f"[Tronbyte Repo] Error downloading assets for {app_id}: {e}") return False, f"Error downloading assets: {e}" def search_apps(self, query: str, apps_with_metadata: List[Dict[str, Any]]) -> List[Dict[str, Any]]: diff --git a/web_interface/blueprints/api_v3.py b/web_interface/blueprints/api_v3.py index 76612ca9..7b1ee407 100644 --- a/web_interface/blueprints/api_v3.py +++ b/web_interface/blueprints/api_v3.py @@ -7073,6 +7073,36 @@ def _get_starlark_plugin() -> Optional[Any]: return api_v3.plugin_manager.get_plugin('starlark-apps') +def _validate_starlark_app_path(app_id: str) -> Tuple[bool, Optional[str]]: + """ + Validate app_id for path traversal attacks before filesystem access. + + Args: + app_id: App identifier from user input + + Returns: + Tuple of (is_valid, error_message) + """ + # Check for path traversal characters + if '..' in app_id or '/' in app_id or '\\' in app_id: + return False, f"Invalid app_id: contains path traversal characters" + + # Construct and resolve the path + try: + app_path = (_STARLARK_APPS_DIR / app_id).resolve() + base_path = _STARLARK_APPS_DIR.resolve() + + # Verify the resolved path is within the base directory + try: + app_path.relative_to(base_path) + return True, None + except ValueError: + return False, f"Invalid app_id: path traversal attempt" + except Exception as e: + logger.warning(f"Path validation error for app_id '{app_id}': {e}") + return False, f"Invalid app_id" + + # Starlark standalone helpers for web service (plugin not loaded) _STARLARK_APPS_DIR = PROJECT_ROOT / 'starlark-apps' _STARLARK_MANIFEST_FILE = _STARLARK_APPS_DIR / 'manifest.json' @@ -7288,6 +7318,11 @@ def get_starlark_apps(): def get_starlark_app(app_id): """Get details for a specific Starlark app.""" try: + # Validate app_id before any filesystem access + is_valid, error_msg = _validate_starlark_app_path(app_id) + if not is_valid: + return jsonify({'status': 'error', 'message': error_msg}), 400 + starlark_plugin = _get_starlark_plugin() if starlark_plugin: app = starlark_plugin.apps.get(app_id) @@ -7315,14 +7350,14 @@ def get_starlark_app(app_id): if not app_data: return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404 - # Load schema from schema.json if it exists + # Load schema from schema.json if it exists (path already validated above) schema = None schema_file = _STARLARK_APPS_DIR / app_id / 'schema.json' if schema_file.exists(): try: with open(schema_file, 'r') as f: schema = json.load(f) - except Exception as e: + except (OSError, json.JSONDecodeError) as e: logger.warning(f"Failed to load schema for {app_id}: {e}") return jsonify({ @@ -7419,20 +7454,18 @@ def upload_starlark_app(): def uninstall_starlark_app(app_id): """Uninstall a Starlark app.""" try: + # Validate app_id before any filesystem access + is_valid, error_msg = _validate_starlark_app_path(app_id) + if not is_valid: + return jsonify({'status': 'error', 'message': error_msg}), 400 + starlark_plugin = _get_starlark_plugin() if starlark_plugin: success = starlark_plugin.uninstall_app(app_id) else: - # Standalone: remove app dir and manifest entry + # Standalone: remove app dir and manifest entry (path already validated) import shutil - app_dir = (_STARLARK_APPS_DIR / app_id).resolve() - - # Path traversal check - ensure app_dir is within _STARLARK_APPS_DIR - try: - app_dir.relative_to(_STARLARK_APPS_DIR.resolve()) - except ValueError: - logger.warning(f"Path traversal attempt in uninstall: {app_id}") - return jsonify({'status': 'error', 'message': 'Invalid app_id'}), 400 + app_dir = _STARLARK_APPS_DIR / app_id if app_dir.exists(): shutil.rmtree(app_dir) @@ -7454,6 +7487,11 @@ def uninstall_starlark_app(app_id): def get_starlark_app_config(app_id): """Get configuration for a Starlark app.""" try: + # Validate app_id before any filesystem access + is_valid, error_msg = _validate_starlark_app_path(app_id) + if not is_valid: + return jsonify({'status': 'error', 'message': error_msg}), 400 + starlark_plugin = _get_starlark_plugin() if starlark_plugin: app = starlark_plugin.apps.get(app_id) @@ -7461,7 +7499,7 @@ def get_starlark_app_config(app_id): return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404 return jsonify({'status': 'success', 'config': app.config, 'schema': app.schema}) - # Standalone: read from config.json file + # Standalone: read from config.json file (path already validated) app_dir = _STARLARK_APPS_DIR / app_id config_file = app_dir / "config.json" @@ -7473,7 +7511,7 @@ def get_starlark_app_config(app_id): try: with open(config_file, 'r') as f: config = json.load(f) - except Exception as e: + except (OSError, json.JSONDecodeError) as e: logger.warning(f"Failed to load config for {app_id}: {e}") # Load schema from schema.json @@ -7497,6 +7535,11 @@ def get_starlark_app_config(app_id): def update_starlark_app_config(app_id): """Update configuration for a Starlark app.""" try: + # Validate app_id before any filesystem access + is_valid, error_msg = _validate_starlark_app_path(app_id) + if not is_valid: + return jsonify({'status': 'error', 'message': error_msg}), 400 + data = request.get_json() if not data: return jsonify({'status': 'error', 'message': 'No configuration provided'}), 400