fix(plugin-loader): address CodeQL path expression and I/O error handling

- Add explicit relative_to() containment check after path resolution so
  CodeQL recognizes the plugin directory boundary (fixes 4 CodeQL alerts:
  Uncontrolled data used in path expression, lines 168/172/189/205)
- Wrap requirements_file.read_bytes() in try/except OSError — on Raspberry
  Pi with flaky SD card storage this can fail; returns False with a clear log
- Wrap marker_path.read_text() in try/except OSError — a corrupted marker
  falls through to a clean reinstall instead of crashing
- Wrap both marker_path.write_text() calls in try/except OSError — pip
  already succeeded at this point so a marker write failure should not
  return False or propagate through the generic exception handler

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Chuck
2026-05-29 15:42:03 -04:00
parent 6c4700583b
commit b44ff079c9

View File

@@ -165,16 +165,36 @@ class PluginLoader:
if not requirements_file.exists(): if not requirements_file.exists():
return True # No dependencies needed return True # No dependencies needed
marker_path = plugin_dir_resolved / ".dependencies_installed" marker_path = plugin_dir_resolved / ".dependencies_installed"
current_hash = hashlib.sha256(requirements_file.read_bytes()).hexdigest()
# Validate both paths stay within the plugin directory (path containment check)
try:
requirements_file.relative_to(plugin_dir_resolved)
marker_path.relative_to(plugin_dir_resolved)
except ValueError:
self.logger.error("Dependency paths outside plugin directory for %s", plugin_id)
return False
try:
current_hash = hashlib.sha256(requirements_file.read_bytes()).hexdigest()
except OSError as e:
self.logger.error("Failed to read requirements.txt for %s: %s", plugin_id, e)
return False
# Skip if requirements.txt hasn't changed since last install # Skip if requirements.txt hasn't changed since last install
if marker_path.exists(): if marker_path.exists():
stored_hash = marker_path.read_text(encoding='utf-8').strip() try:
if stored_hash == current_hash: stored_hash = marker_path.read_text(encoding='utf-8').strip()
self.logger.debug("Dependencies already installed for %s (requirements unchanged)", plugin_id) except OSError as e:
return True self.logger.warning(
self.logger.info("Requirements changed for %s, reinstalling dependencies", plugin_id) "Could not read dependency marker for %s (%s), will reinstall dependencies",
plugin_id, e
)
else:
if stored_hash == current_hash:
self.logger.debug("Dependencies already installed for %s (requirements unchanged)", plugin_id)
return True
self.logger.info("Requirements changed for %s, reinstalling dependencies", plugin_id)
try: try:
self.logger.info("Installing dependencies for plugin %s...", plugin_id) self.logger.info("Installing dependencies for plugin %s...", plugin_id)
result = subprocess.run( result = subprocess.run(
@@ -184,10 +204,13 @@ class PluginLoader:
timeout=timeout, timeout=timeout,
check=False check=False
) )
if result.returncode == 0: if result.returncode == 0:
marker_path.write_text(current_hash, encoding='utf-8') try:
ensure_file_permissions(marker_path, get_plugin_file_mode()) marker_path.write_text(current_hash, encoding='utf-8')
ensure_file_permissions(marker_path, get_plugin_file_mode())
except OSError as marker_err:
self.logger.debug("Could not write dependency marker for %s: %s", plugin_id, marker_err)
self.logger.info("Dependencies installed successfully for %s", plugin_id) self.logger.info("Dependencies installed successfully for %s", plugin_id)
return True return True
else: else:
@@ -202,8 +225,11 @@ class PluginLoader:
"Assuming they are satisfied: %s", "Assuming they are satisfied: %s",
plugin_id, stderr.strip() plugin_id, stderr.strip()
) )
marker_path.write_text(current_hash, encoding='utf-8') try:
ensure_file_permissions(marker_path, get_plugin_file_mode()) marker_path.write_text(current_hash, encoding='utf-8')
ensure_file_permissions(marker_path, get_plugin_file_mode())
except OSError as marker_err:
self.logger.debug("Could not write dependency marker for %s: %s", plugin_id, marker_err)
return True return True
self.logger.warning( self.logger.warning(
"Dependency installation returned non-zero exit code for %s: %s", "Dependency installation returned non-zero exit code for %s: %s",