From 76c5bf5781b4be0c9fcd8ff5ae10f452a664fd54 Mon Sep 17 00:00:00 2001 From: 5ymb01 <5ymb01ixm@gmail.com> Date: Sun, 8 Mar 2026 20:41:18 -0400 Subject: [PATCH] fix(security): mask secret fields in API responses and extract helpers (#276) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(security): mask secret fields in API responses and extract helpers GET /config/secrets returned raw API keys in plaintext to the browser. GET /plugins/config returned merged config including deep-merged secrets. POST /plugins/config could overwrite existing secrets with empty strings when the GET endpoint returned masked values that were sent back unchanged. Changes: - Add src/web_interface/secret_helpers.py with reusable functions: find_secret_fields, separate_secrets, mask_secret_fields, mask_all_secret_values, remove_empty_secrets - GET /config/secrets: mask all values with '••••••••' - GET /plugins/config: mask x-secret fields with '' - POST /plugins/config: filter empty-string secrets before saving - pages_v3: mask secrets before rendering plugin config templates - Remove three duplicated inline find_secret_fields/separate_secrets definitions in api_v3.py (replaced by single imported module) Co-Authored-By: Claude Opus 4.6 * fix(security): harden secret masking against CodeRabbit findings - Fail-closed: return 500 when schema unavailable instead of leaking secrets - Fix falsey masking: use `is not None and != ''` instead of truthiness check so values like 0 or False are still redacted - Add array-item secret support: recurse into `type: array` items schema to detect and mask secrets like accounts[].token - pages_v3: fail-closed when schema properties missing Addresses CodeRabbit findings on PR #276: - Critical: fail-closed bypass when schema_mgr/schema missing - Major: falsey values not masked (0, False leak through) - Major: pages_v3 fail-open when schema absent - Major: array-item secrets unsupported Co-Authored-By: 5ymb01 <5ymb01@users.noreply.github.com> Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: 5ymb01 <5ymb01@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 --- src/web_interface/secret_helpers.py | 191 +++++++++++++++++++++++++++ web_interface/blueprints/api_v3.py | 133 +++++-------------- web_interface/blueprints/pages_v3.py | 9 +- 3 files changed, 232 insertions(+), 101 deletions(-) create mode 100644 src/web_interface/secret_helpers.py diff --git a/src/web_interface/secret_helpers.py b/src/web_interface/secret_helpers.py new file mode 100644 index 00000000..310d9848 --- /dev/null +++ b/src/web_interface/secret_helpers.py @@ -0,0 +1,191 @@ +""" +Secret handling helpers for the web interface. + +Provides functions for identifying, masking, separating, and filtering +secret fields in plugin configurations based on JSON Schema x-secret markers. +""" + +from typing import Any, Dict, Set, Tuple + + +def find_secret_fields(properties: Dict[str, Any], prefix: str = '') -> Set[str]: + """Find all fields marked with ``x-secret: true`` in a JSON Schema properties dict. + + Recurses into nested objects and array items to discover secrets at any + depth (e.g. ``accounts[].token``). + + Args: + properties: The ``properties`` dict from a JSON Schema. + prefix: Dot-separated prefix for nested field paths (used in recursion). + + Returns: + A set of dot-separated field paths (e.g. ``{"api_key", "auth.token"}``). + """ + fields: Set[str] = set() + if not isinstance(properties, dict): + return fields + for field_name, field_props in properties.items(): + if not isinstance(field_props, dict): + continue + full_path = f"{prefix}.{field_name}" if prefix else field_name + if field_props.get('x-secret', False): + fields.add(full_path) + if field_props.get('type') == 'object' and 'properties' in field_props: + fields.update(find_secret_fields(field_props['properties'], full_path)) + # Recurse into array items (e.g. accounts[].token) + if field_props.get('type') == 'array' and isinstance(field_props.get('items'), dict): + items_schema = field_props['items'] + if items_schema.get('x-secret', False): + fields.add(f"{full_path}[]") + if items_schema.get('type') == 'object' and 'properties' in items_schema: + fields.update(find_secret_fields(items_schema['properties'], f"{full_path}[]")) + return fields + + +def separate_secrets( + config: Dict[str, Any], secret_paths: Set[str], prefix: str = '' +) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Split a config dict into regular and secret portions. + + Uses the set of dot-separated secret paths (from :func:`find_secret_fields`) + to partition values. Empty nested dicts are dropped from the regular + portion to match the original inline behavior. Handles array-item secrets + using ``[]`` notation in paths (e.g. ``accounts[].token``). + + Args: + config: The full plugin config dict. + secret_paths: Set of dot-separated paths identifying secret fields. + prefix: Dot-separated prefix for nested paths (used in recursion). + + Returns: + A ``(regular, secrets)`` tuple of dicts. + """ + regular: Dict[str, Any] = {} + secrets: Dict[str, Any] = {} + for key, value in config.items(): + full_path = f"{prefix}.{key}" if prefix else key + if isinstance(value, dict): + nested_regular, nested_secrets = separate_secrets(value, secret_paths, full_path) + if nested_regular: + regular[key] = nested_regular + if nested_secrets: + secrets[key] = nested_secrets + elif isinstance(value, list): + # Check if array elements themselves are secrets + array_path = f"{full_path}[]" + if array_path in secret_paths: + secrets[key] = value + else: + # Check if array items have nested secret fields + has_nested = any(p.startswith(f"{array_path}.") for p in secret_paths) + if has_nested: + reg_items = [] + sec_items = [] + for item in value: + if isinstance(item, dict): + r, s = separate_secrets(item, secret_paths, array_path) + reg_items.append(r) + sec_items.append(s) + else: + reg_items.append(item) + sec_items.append({}) + regular[key] = reg_items + if any(sec_items): + secrets[key] = sec_items + else: + regular[key] = value + elif full_path in secret_paths: + secrets[key] = value + else: + regular[key] = value + return regular, secrets + + +def mask_secret_fields(config: Dict[str, Any], schema_properties: Dict[str, Any]) -> Dict[str, Any]: + """Mask config values for fields marked ``x-secret: true`` in the schema. + + Replaces each present secret value with an empty string so that API + responses never expose plain-text secrets. Non-secret values are + returned unchanged. Recurses into nested objects and array items. + + Args: + config: The plugin config dict (may contain secret values). + schema_properties: The ``properties`` dict from the plugin's JSON Schema. + + Returns: + A copy of *config* with secret values replaced by ``''``. + Nested dicts containing secrets are also copied (not mutated in place). + """ + result = dict(config) + for fname, fprops in schema_properties.items(): + if not isinstance(fprops, dict): + continue + if fprops.get('x-secret', False): + # Mask any present value — including falsey ones like 0 or False + if fname in result and result[fname] is not None and result[fname] != '': + result[fname] = '' + elif fprops.get('type') == 'object' and 'properties' in fprops: + if fname in result and isinstance(result[fname], dict): + result[fname] = mask_secret_fields(result[fname], fprops['properties']) + elif fprops.get('type') == 'array' and isinstance(fprops.get('items'), dict): + items_schema = fprops['items'] + if fname in result and isinstance(result[fname], list): + if items_schema.get('x-secret', False): + # Entire array elements are secrets — mask each + result[fname] = ['' for _ in result[fname]] + elif items_schema.get('type') == 'object' and 'properties' in items_schema: + # Recurse into each array element's properties + result[fname] = [ + mask_secret_fields(item, items_schema['properties']) + if isinstance(item, dict) else item + for item in result[fname] + ] + return result + + +def mask_all_secret_values(config: Dict[str, Any]) -> Dict[str, Any]: + """Blanket-mask every non-empty value in a secrets config dict. + + Used by the ``GET /config/secrets`` endpoint where all values are secret + by definition. Placeholder strings (``YOUR_*``) and empty/None values are + left as-is so the UI can distinguish "not set" from "set". + + Args: + config: A raw secrets config dict (e.g. from ``config_secrets.json``). + + Returns: + A copy with all real values replaced by ``'••••••••'``. + """ + masked: Dict[str, Any] = {} + for k, v in config.items(): + if isinstance(v, dict): + masked[k] = mask_all_secret_values(v) + elif v not in (None, '') and not (isinstance(v, str) and v.startswith('YOUR_')): + masked[k] = '••••••••' + else: + masked[k] = v + return masked + + +def remove_empty_secrets(secrets: Dict[str, Any]) -> Dict[str, Any]: + """Remove empty / whitespace-only / None values from a secrets dict. + + When the GET endpoint masks secret values to ``''``, a subsequent POST + will send those empty strings back. This filter strips them so that + existing stored secrets are not overwritten with blanks. + + Args: + secrets: A secrets dict that may contain masked empty values. + + Returns: + A copy with empty entries removed. Empty nested dicts are pruned. + """ + result: Dict[str, Any] = {} + for k, v in secrets.items(): + if isinstance(v, dict): + nested = remove_empty_secrets(v) + if nested: + result[k] = nested + elif v is not None and not (isinstance(v, str) and v.strip() == ''): + result[k] = v + return result diff --git a/web_interface/blueprints/api_v3.py b/web_interface/blueprints/api_v3.py index ecd77f38..fec8eedf 100644 --- a/web_interface/blueprints/api_v3.py +++ b/web_interface/blueprints/api_v3.py @@ -24,6 +24,13 @@ from src.web_interface.validators import ( validate_numeric_range, validate_string_length, sanitize_plugin_config ) from src.error_aggregator import get_error_aggregator +from src.web_interface.secret_helpers import ( + find_secret_fields, + mask_all_secret_values, + mask_secret_fields, + remove_empty_secrets, + separate_secrets, +) # Will be initialized when blueprint is registered config_manager = None @@ -866,18 +873,6 @@ def save_main_config(): plugins_dir = PROJECT_ROOT / plugins_dir_name schema_path = plugins_dir / plugin_id / 'config_schema.json' - def find_secret_fields(properties, prefix=''): - """Recursively find fields marked with x-secret: true""" - fields = set() - for field_name, field_props in properties.items(): - full_path = f"{prefix}.{field_name}" if prefix else field_name - if field_props.get('x-secret', False): - fields.add(full_path) - # Check nested objects - if field_props.get('type') == 'object' and 'properties' in field_props: - fields.update(find_secret_fields(field_props['properties'], full_path)) - return fields - if schema_path.exists(): try: with open(schema_path, 'r', encoding='utf-8') as f: @@ -887,25 +882,6 @@ def save_main_config(): except Exception as e: logger.warning(f"Error reading schema for secret detection: {e}") - # Separate secrets from regular config (same logic as save_plugin_config) - def separate_secrets(config, secrets_set, prefix=''): - """Recursively separate secret fields from regular config""" - regular = {} - secrets = {} - for key, value in config.items(): - full_path = f"{prefix}.{key}" if prefix else key - if isinstance(value, dict): - nested_regular, nested_secrets = separate_secrets(value, secrets_set, full_path) - if nested_regular: - regular[key] = nested_regular - if nested_secrets: - secrets[key] = nested_secrets - elif full_path in secrets_set: - secrets[key] = value - else: - regular[key] = value - return regular, secrets - regular_config, secrets_config = separate_secrets(plugin_config, secret_fields) # PRE-PROCESSING: Preserve 'enabled' state if not in regular_config @@ -1021,7 +997,8 @@ def get_secrets_config(): return jsonify({'status': 'error', 'message': 'Config manager not initialized'}), 500 config = api_v3.config_manager.get_raw_file_content('secrets') - return jsonify({'status': 'success', 'data': config}) + masked = mask_all_secret_values(config) + return jsonify({'status': 'success', 'data': masked}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 @@ -2597,6 +2574,26 @@ def get_plugin_config(): 'display_duration': 30 } + # Mask secret fields before returning to prevent exposing API keys + # Fail closed — if schema unavailable, refuse to return unmasked config + schema_mgr = api_v3.schema_manager + if not schema_mgr: + return error_response( + ErrorCode.CONFIG_LOAD_FAILED, + f"Cannot safely return config for {plugin_id}: schema manager unavailable", + status_code=500 + ) + + schema_for_mask = schema_mgr.load_schema(plugin_id, use_cache=True) + if not schema_for_mask or 'properties' not in schema_for_mask: + return error_response( + ErrorCode.CONFIG_LOAD_FAILED, + f"Cannot safely return config for {plugin_id}: schema unavailable for secret masking", + status_code=500 + ) + + plugin_config = mask_secret_fields(plugin_config, schema_for_mask['properties']) + return success_response(data=plugin_config) except Exception as e: from src.web_interface.errors import WebInterfaceError @@ -4399,21 +4396,6 @@ def save_plugin_config(): # Find secret fields (supports nested schemas) secret_fields = set() - - def find_secret_fields(properties, prefix=''): - """Recursively find fields marked with x-secret: true""" - fields = set() - if not isinstance(properties, dict): - return fields - for field_name, field_props in properties.items(): - full_path = f"{prefix}.{field_name}" if prefix else field_name - if isinstance(field_props, dict) and field_props.get('x-secret', False): - fields.add(full_path) - # Check nested objects - if isinstance(field_props, dict) and field_props.get('type') == 'object' and 'properties' in field_props: - fields.update(find_secret_fields(field_props['properties'], full_path)) - return fields - if schema and 'properties' in schema: secret_fields = find_secret_fields(schema['properties']) @@ -4719,30 +4701,12 @@ def save_plugin_config(): ) # Separate secrets from regular config (handles nested configs) - def separate_secrets(config, secrets_set, prefix=''): - """Recursively separate secret fields from regular config""" - regular = {} - secrets = {} - - for key, value in config.items(): - full_path = f"{prefix}.{key}" if prefix else key - - if isinstance(value, dict): - # Recursively handle nested dicts - nested_regular, nested_secrets = separate_secrets(value, secrets_set, full_path) - if nested_regular: - regular[key] = nested_regular - if nested_secrets: - secrets[key] = nested_secrets - elif full_path in secrets_set: - secrets[key] = value - else: - regular[key] = value - - return regular, secrets - regular_config, secrets_config = separate_secrets(plugin_config, secret_fields) + # Filter empty-string secret values to avoid overwriting existing secrets + # (GET endpoint masks secrets to '', POST sends them back as '') + secrets_config = remove_empty_secrets(secrets_config) + # Get current configs current_config = api_v3.config_manager.load_config() current_secrets = api_v3.config_manager.get_raw_file_content('secrets') @@ -4952,41 +4916,10 @@ def reset_plugin_config(): schema = schema_mgr.load_schema(plugin_id, use_cache=True) secret_fields = set() - def find_secret_fields(properties, prefix=''): - """Recursively find fields marked with x-secret: true""" - fields = set() - if not isinstance(properties, dict): - return fields - for field_name, field_props in properties.items(): - full_path = f"{prefix}.{field_name}" if prefix else field_name - if isinstance(field_props, dict) and field_props.get('x-secret', False): - fields.add(full_path) - if isinstance(field_props, dict) and field_props.get('type') == 'object' and 'properties' in field_props: - fields.update(find_secret_fields(field_props['properties'], full_path)) - return fields - if schema and 'properties' in schema: secret_fields = find_secret_fields(schema['properties']) # Separate defaults into regular and secret configs - def separate_secrets(config, secrets_set, prefix=''): - """Recursively separate secret fields from regular config""" - regular = {} - secrets = {} - for key, value in config.items(): - full_path = f"{prefix}.{key}" if prefix else key - if isinstance(value, dict): - nested_regular, nested_secrets = separate_secrets(value, secrets_set, full_path) - if nested_regular: - regular[key] = nested_regular - if nested_secrets: - secrets[key] = nested_secrets - elif full_path in secrets_set: - secrets[key] = value - else: - regular[key] = value - return regular, secrets - default_regular, default_secrets = separate_secrets(defaults, secret_fields) # Update main config with defaults diff --git a/web_interface/blueprints/pages_v3.py b/web_interface/blueprints/pages_v3.py index c9783ddb..75ff9f5c 100644 --- a/web_interface/blueprints/pages_v3.py +++ b/web_interface/blueprints/pages_v3.py @@ -2,6 +2,7 @@ from flask import Blueprint, render_template, request, redirect, url_for, flash, import json import logging from pathlib import Path +from src.web_interface.secret_helpers import mask_secret_fields logger = logging.getLogger(__name__) @@ -405,11 +406,17 @@ def _load_plugin_config_partial(plugin_id): except Exception as e: print(f"Warning: Could not load manifest for {plugin_id}: {e}") + # Mask secret fields before rendering template (fail closed — never leak secrets) + schema_properties = schema.get('properties') if isinstance(schema, dict) else None + if not isinstance(schema_properties, dict): + return '
Error loading plugin config securely: schema unavailable.
', 500 + config = mask_secret_fields(config, schema_properties) + # Determine enabled status enabled = config.get('enabled', True) if plugin_instance: enabled = plugin_instance.enabled - + # Build plugin data for template plugin_data = { 'id': plugin_id,