Files
LEDMatrix/src/common/permission_utils.py
Chuck 158e07c82b fix(plugins): prevent root-owned files from blocking plugin updates (#242)
* fix(web): unify operation history tracking for monorepo plugin operations

The operation history UI was reading from the wrong data source
(operation_queue instead of operation_history), install/update records
lacked version details, toggle operations used a type name that didn't
match UI filters, and the Clear History button was non-functional.

- Switch GET /plugins/operation/history to read from OperationHistory
  audit log with return type hint and targeted exception handling
- Add DELETE /plugins/operation/history endpoint; wire up Clear button
- Add _get_plugin_version helper with specific exception handling
  (FileNotFoundError, PermissionError, json.JSONDecodeError) and
  structured logging with plugin_id/path context
- Record plugin version, branch, and commit details on install/update
- Record install failures in the direct (non-queue) code path
- Replace "toggle" operation type with "enable"/"disable"
- Add normalizeStatus() in JS to map completed→success, error→failed
  so status filter works regardless of server-side convention
- Truncate commit SHAs to 7 chars in details display
- Fix HTML filter options, operation type colors, duplicate JS init

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(plugins): prevent root-owned files from blocking plugin updates

The root ledmatrix service creates __pycache__ and data cache files
owned by root inside plugin directories. The web service (non-root)
cannot delete these when updating or uninstalling plugins, causing
operations to fail with "Permission denied".

Defense in depth with three layers:
- Prevent: PYTHONDONTWRITEBYTECODE=1 in systemd service + run.py
- Fallback: sudoers rules for rm on plugin directories
- Code: _safe_remove_directory() now uses sudo as last resort,
  and all bare shutil.rmtree() calls routed through it

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(security): harden sudo removal with path-validated helper script

Address code review findings:

- Replace raw rm/find sudoers wildcards with a vetted helper script
  (safe_plugin_rm.sh) that resolves symlinks and validates the target
  is a strict child of plugin-repos/ or plugins/ before deletion
- Add allow-list validation in sudo_remove_directory() that checks
  resolved paths against allowed bases before invoking sudo
- Check _safe_remove_directory() return value before shutil.move()
  in the manifest ID rename path
- Move stat import to module level in store_manager.py
- Use stat.S_IRWXU instead of 0o777 in chmod fallback stage
- Add ignore_errors=True to temp dir cleanup in finally block
- Use command -v instead of which in configure_web_sudo.sh

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(security): address code review round 2 — harden paths and error handling

- safe_plugin_rm.sh: use realpath --canonicalize-missing for ALLOWED_BASES
  so the script doesn't fail under set -e when dirs don't exist yet
- safe_plugin_rm.sh: add -- before path in rm -rf to prevent flag injection
- permission_utils.py: use shutil.which('bash') instead of hardcoded /bin/bash
  to match whatever path the sudoers BASH_PATH resolves to
- store_manager.py: check _safe_remove_directory() return before shutil.move()
  in _install_from_monorepo_zip to prevent moving into a non-removed target
- store_manager.py: catch OSError instead of PermissionError in Stage 1 removal
  to handle both EACCES and EPERM error codes
- store_manager.py: hoist sudo_remove_directory import to module level
- configure_web_sudo.sh: harden safe_plugin_rm.sh to root-owned 755 so
  the web user cannot modify the vetted helper script

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(security): validate command paths in sudoers config and use resolved paths

- configure_web_sudo.sh: validate that required commands (systemctl, bash,
  python3) resolve to non-empty paths before generating sudoers entries;
  abort with clear error if any are missing; skip optional commands
  (reboot, poweroff, journalctl) with a warning instead of emitting
  malformed NOPASSWD lines; validate helper script exists on disk
- permission_utils.py: pass the already-resolved path to the subprocess
  call and use it for the post-removal exists() check, eliminating a
  TOCTOU window between Python-side validation and shell-side execution

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Chuck <chuck@example.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 19:28:05 -05:00

290 lines
9.2 KiB
Python

"""
Permission Utilities
Centralized utility functions for managing file and directory permissions
across the LEDMatrix codebase. Ensures consistent permission handling for
files that need to be accessible by both root service and web user.
"""
import os
import logging
import shutil as _shutil
import subprocess
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# System directories that should never have their permissions modified
# These directories have special system-level permissions that must be preserved
PROTECTED_SYSTEM_DIRECTORIES = {
'/tmp',
'/var/tmp',
'/dev',
'/proc',
'/sys',
'/run',
'/var/run',
'/etc',
'/boot',
'/var',
'/usr',
'/lib',
'/lib64',
'/bin',
'/sbin',
}
def ensure_directory_permissions(path: Path, mode: int = 0o775) -> None:
"""
Create directory and set permissions.
If the directory already exists and we cannot change its permissions,
we check if it's usable (readable/writable). If so, we continue without
raising an exception. This allows the system to work even when running
as a non-root user who cannot change permissions on existing directories.
Protected system directories (like /tmp, /etc, /var) are never modified
to prevent breaking system functionality.
Args:
path: Directory path to create/ensure
mode: Permission mode (default: 0o775 for group-writable directories)
Raises:
OSError: If directory creation fails or directory exists but is not usable
"""
try:
# Never modify permissions on system directories
path_str = str(path.resolve() if path.is_absolute() else path)
if path_str in PROTECTED_SYSTEM_DIRECTORIES:
logger.debug(f"Skipping permission modification on protected system directory: {path_str}")
# Verify the directory is usable
if path.exists() and os.access(path, os.R_OK | os.W_OK):
return
elif path.exists():
logger.warning(f"Protected system directory {path_str} exists but is not writable")
return
else:
raise OSError(f"Protected system directory {path_str} does not exist")
# Create directory if it doesn't exist
path.mkdir(parents=True, exist_ok=True)
# Try to set permissions
try:
os.chmod(path, mode)
logger.debug(f"Set directory permissions {oct(mode)} on {path}")
except (OSError, PermissionError) as perm_error:
# If we can't set permissions, check if directory is usable
if path.exists():
# Check if directory is readable and writable
if os.access(path, os.R_OK | os.W_OK):
logger.warning(
f"Could not set permissions on {path} (may be owned by different user), "
f"but directory is usable (readable/writable). Continuing."
)
return
else:
# Directory exists but is not usable
logger.error(
f"Directory {path} exists but is not readable/writable. "
f"Permission change failed: {perm_error}"
)
raise OSError(
f"Directory {path} exists but is not usable: {perm_error}"
) from perm_error
else:
# Directory doesn't exist and we couldn't create it
raise
except OSError as e:
logger.error(f"Failed to ensure directory {path}: {e}")
raise
def ensure_file_permissions(path: Path, mode: int = 0o644) -> None:
"""
Set file permissions after creation.
Args:
path: File path to set permissions on
mode: Permission mode (default: 0o644 for readable files)
Raises:
OSError: If permission setting fails
"""
try:
if path.exists():
os.chmod(path, mode)
logger.debug(f"Set file permissions {oct(mode)} on {path}")
else:
logger.warning(f"File does not exist, cannot set permissions: {path}")
except OSError as e:
logger.error(f"Failed to set file permissions on {path}: {e}")
raise
def get_config_file_mode(file_path: Path) -> int:
"""
Return appropriate permission mode for config files.
Args:
file_path: Path to config file
Returns:
Permission mode: 0o640 for secrets files, 0o644 for regular config
"""
if 'secrets' in str(file_path):
return 0o640 # rw-r-----
else:
return 0o644 # rw-r--r--
def get_assets_file_mode() -> int:
"""
Return permission mode for asset files (logos, images, etc.).
Returns:
Permission mode: 0o664 (rw-rw-r--) for group-writable assets
"""
return 0o664 # rw-rw-r--
def get_assets_dir_mode() -> int:
"""
Return permission mode for asset directories.
Returns:
Permission mode: 0o2775 (rwxrwxr-x + sticky bit) for group-writable directories
"""
return 0o2775 # rwxrwsr-x (setgid + group writable)
def get_config_dir_mode() -> int:
"""
Return permission mode for config directory.
Returns:
Permission mode: 0o2775 (rwxrwxr-x + sticky bit) for group-writable directories
"""
return 0o2775 # rwxrwsr-x (setgid + group writable)
def get_plugin_file_mode() -> int:
"""
Return permission mode for plugin files.
Returns:
Permission mode: 0o664 (rw-rw-r--) for group-writable plugin files
"""
return 0o664 # rw-rw-r--
def get_plugin_dir_mode() -> int:
"""
Return permission mode for plugin directories.
Returns:
Permission mode: 0o2775 (rwxrwxr-x + sticky bit) for group-writable directories
"""
return 0o2775 # rwxrwsr-x (setgid + group writable)
def get_cache_dir_mode() -> int:
"""
Return permission mode for cache directories.
Returns:
Permission mode: 0o2775 (rwxrwxr-x + sticky bit) for group-writable cache directories
"""
return 0o2775 # rwxrwsr-x (setgid + group writable)
def sudo_remove_directory(path: Path, allowed_bases: Optional[list] = None) -> bool:
"""
Remove a directory using sudo as a last resort.
Used when normal removal fails due to root-owned files (e.g., __pycache__
directories created by the root ledmatrix service). Delegates to the
safe_plugin_rm.sh helper which validates the path is inside allowed
plugin directories.
Before invoking sudo, this function also validates that the resolved
path is a descendant of at least one allowed base directory.
Args:
path: Directory path to remove
allowed_bases: List of allowed parent directories. If None, defaults
to plugin-repos/ and plugins/ under the project root.
Returns:
True if removal succeeded, False otherwise
"""
# Determine project root (permission_utils.py is at src/common/)
project_root = Path(__file__).resolve().parent.parent.parent
if allowed_bases is None:
allowed_bases = [
project_root / "plugin-repos",
project_root / "plugins",
]
# Resolve the target path to prevent symlink/traversal tricks
try:
resolved = path.resolve()
except (OSError, ValueError) as e:
logger.error(f"Cannot resolve path {path}: {e}")
return False
# Validate the resolved path is a strict child of an allowed base
is_allowed = False
for base in allowed_bases:
try:
base_resolved = base.resolve()
if resolved != base_resolved and resolved.is_relative_to(base_resolved):
is_allowed = True
break
except (OSError, ValueError):
continue
if not is_allowed:
logger.error(
f"sudo_remove_directory DENIED: {resolved} is not inside "
f"allowed bases {[str(b) for b in allowed_bases]}"
)
return False
# Use the safe_plugin_rm.sh helper which does its own validation
helper_script = project_root / "scripts" / "fix_perms" / "safe_plugin_rm.sh"
if not helper_script.exists():
logger.error(f"Safe removal helper not found: {helper_script}")
return False
bash_path = _shutil.which('bash') or '/bin/bash'
try:
result = subprocess.run(
['sudo', '-n', bash_path, str(helper_script), str(resolved)],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and not resolved.exists():
logger.info(f"Successfully removed {path} via sudo helper")
return True
else:
stderr = result.stderr.strip()
logger.error(f"sudo helper failed for {path}: {stderr}")
return False
except subprocess.TimeoutExpired:
logger.error(f"sudo helper timed out for {path}")
return False
except FileNotFoundError:
logger.error("sudo command not found on system")
return False
except Exception as e:
logger.error(f"Unexpected error during sudo helper for {path}: {e}")
return False