fix(plugin-loader): use realpath+startswith containment check for CodeQL path-injection

Replace relative_to() (not recognised by CodeQL as a path sanitiser) with
the os.path.realpath() + startswith() pattern that CodeQL explicitly models
as sanitising py/path-injection.

- Add plugins_dir optional param to install_dependencies() and load_plugin()
- PluginManager.load_plugin() passes self.plugins_dir as the trusted anchor;
  install_dependencies() validates that the resolved plugin_dir starts with
  the resolved plugins_dir before any file I/O
- Replace all Path.read_bytes/read_text/write_text/exists with open() and
  os.path.isfile() so the sanitised string paths flow directly to file ops
  without re-introducing taint through Path object conversion

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Chuck
2026-05-30 10:32:03 -04:00
parent b44ff079c9
commit abade43772
2 changed files with 46 additions and 31 deletions

View File

@@ -139,51 +139,61 @@ class PluginLoader:
self,
plugin_dir: Path,
plugin_id: str,
plugins_dir: Optional[Path] = None,
timeout: int = 300
) -> bool:
"""
Install plugin dependencies from requirements.txt.
Args:
plugin_dir: Plugin directory path
plugin_id: Plugin identifier
plugins_dir: Trusted base plugins directory for path containment check
timeout: Installation timeout in seconds
Returns:
True if dependencies installed or not needed, False on error
"""
plugin_id = os.path.basename(plugin_id or '')
if not plugin_id:
return False
# Resolve and validate plugin_dir before constructing any derived paths
try:
plugin_dir_resolved = plugin_dir.resolve(strict=True)
except OSError:
# Resolve to a canonical absolute path (normalises .. and symlinks)
plugin_dir_real = os.path.realpath(str(plugin_dir))
if plugins_dir is not None:
# Validate plugin_dir is within the trusted plugins base directory.
# os.path.realpath + startswith is the CodeQL-recognised sanitiser
# pattern for path-injection (py/path-injection).
plugins_dir_real = os.path.realpath(str(plugins_dir))
if not plugin_dir_real.startswith(plugins_dir_real + os.sep):
self.logger.error(
"Plugin dir for %s is outside the plugins directory, skipping deps",
plugin_id,
)
return False
elif not os.path.isdir(plugin_dir_real):
self.logger.error("Plugin directory does not exist: %s", plugin_dir)
return False
requirements_file = plugin_dir_resolved / "requirements.txt"
if not requirements_file.exists():
requirements_file = os.path.join(plugin_dir_real, "requirements.txt")
marker_file = os.path.join(plugin_dir_real, ".dependencies_installed")
if not os.path.isfile(requirements_file):
return True # No dependencies needed
marker_path = plugin_dir_resolved / ".dependencies_installed"
# Validate both paths stay within the plugin directory (path containment check)
try:
requirements_file.relative_to(plugin_dir_resolved)
marker_path.relative_to(plugin_dir_resolved)
except ValueError:
self.logger.error("Dependency paths outside plugin directory for %s", plugin_id)
return False
try:
current_hash = hashlib.sha256(requirements_file.read_bytes()).hexdigest()
with open(requirements_file, 'rb') as fh:
current_hash = hashlib.sha256(fh.read()).hexdigest()
except OSError as e:
self.logger.error("Failed to read requirements.txt for %s: %s", plugin_id, e)
return False
# Skip if requirements.txt hasn't changed since last install
if marker_path.exists():
if os.path.isfile(marker_file):
try:
stored_hash = marker_path.read_text(encoding='utf-8').strip()
with open(marker_file, 'r', encoding='utf-8') as fh:
stored_hash = fh.read().strip()
except OSError as e:
self.logger.warning(
"Could not read dependency marker for %s (%s), will reinstall dependencies",
@@ -198,7 +208,7 @@ class PluginLoader:
try:
self.logger.info("Installing dependencies for plugin %s...", plugin_id)
result = subprocess.run(
[sys.executable, "-m", "pip", "install", "--break-system-packages", "-r", str(requirements_file)],
[sys.executable, "-m", "pip", "install", "--break-system-packages", "-r", requirements_file],
capture_output=True,
text=True,
timeout=timeout,
@@ -207,8 +217,9 @@ class PluginLoader:
if result.returncode == 0:
try:
marker_path.write_text(current_hash, encoding='utf-8')
ensure_file_permissions(marker_path, get_plugin_file_mode())
with open(marker_file, 'w', encoding='utf-8') as fh:
fh.write(current_hash)
ensure_file_permissions(Path(marker_file), get_plugin_file_mode())
except OSError as marker_err:
self.logger.debug("Could not write dependency marker for %s: %s", plugin_id, marker_err)
self.logger.info("Dependencies installed successfully for %s", plugin_id)
@@ -226,8 +237,9 @@ class PluginLoader:
plugin_id, stderr.strip()
)
try:
marker_path.write_text(current_hash, encoding='utf-8')
ensure_file_permissions(marker_path, get_plugin_file_mode())
with open(marker_file, 'w', encoding='utf-8') as fh:
fh.write(current_hash)
ensure_file_permissions(Path(marker_file), get_plugin_file_mode())
except OSError as marker_err:
self.logger.debug("Could not write dependency marker for %s: %s", plugin_id, marker_err)
return True
@@ -572,11 +584,12 @@ class PluginLoader:
display_manager: Any,
cache_manager: Any,
plugin_manager: Any,
install_deps: bool = True
install_deps: bool = True,
plugins_dir: Optional[Path] = None,
) -> Tuple[Any, Any]:
"""
Complete plugin loading process.
Args:
plugin_id: Plugin identifier
manifest: Plugin manifest
@@ -586,16 +599,17 @@ class PluginLoader:
cache_manager: Cache manager instance
plugin_manager: Plugin manager instance
install_deps: Whether to install dependencies
plugins_dir: Trusted base plugins directory forwarded to install_dependencies
Returns:
Tuple of (plugin_instance, module)
Raises:
PluginError: If loading fails
"""
# Install dependencies if needed
if install_deps:
self.install_dependencies(plugin_dir, plugin_id)
self.install_dependencies(plugin_dir, plugin_id, plugins_dir=plugins_dir)
# Load module
entry_point = manifest.get('entry_point', 'manager.py')

View File

@@ -350,7 +350,8 @@ class PluginManager:
display_manager=self.display_manager,
cache_manager=self.cache_manager,
plugin_manager=self,
install_deps=True
install_deps=True,
plugins_dir=self.plugins_dir,
)
# Store module