mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-04-30 20:43:00 +00:00
fix(wifi): address Codacy review findings in AP mode implementation
- Validate ap_ssid/ap_channel from config before passing to subprocess (printable ASCII ≤32 chars; channel 1-14) to prevent command injection - Fix INPUT iptables rule: PREROUTING redirects port 80→5000 so the INPUT chain sees dport=5000, not 80. Old INPUT rule on port 80 was a no-op. - Refactor iptables setup/teardown into _setup_iptables_redirect() and _teardown_iptables_redirect() helpers, eliminating duplicate logic in the hostapd and nmcli paths - Save/restore ip_forward state (via /tmp/ledmatrix_ip_forward_saved) instead of forcing it to 0 on cleanup, which could break VPNs or bridges already relying on forwarding - nmcli path skips ip_forward management entirely: NM's ipv4.method=shared already manages it for the duration of the connection - Fix _get_ap_status_nmcli() verification: new 'connection add type wifi' profiles have type '802-11-wireless', not 'hotspot', so verification was always returning False. Now also matches by our known connection name. - Remove SSID-based connection deletion: deleting any profile whose SSID matched the AP SSID could destroy a user's saved home WiFi profile. Now only deletes by our application-managed profile names. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -656,7 +656,121 @@ class WiFiManager:
|
||||
return False
|
||||
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved")
|
||||
|
||||
def _validate_ap_config(self) -> Tuple[str, int]:
|
||||
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
|
||||
import re as _re
|
||||
ssid = str(self.config.get("ap_ssid", DEFAULT_AP_SSID))
|
||||
if not ssid or len(ssid) > 32 or not _re.match(r'^[\x20-\x7E]+$', ssid):
|
||||
logger.warning(f"AP SSID '{ssid}' is invalid, falling back to default")
|
||||
ssid = DEFAULT_AP_SSID
|
||||
try:
|
||||
channel = int(self.config.get("ap_channel", DEFAULT_AP_CHANNEL))
|
||||
if channel < 1 or channel > 14:
|
||||
raise ValueError
|
||||
except (TypeError, ValueError):
|
||||
logger.warning("AP channel out of range, falling back to default")
|
||||
channel = DEFAULT_AP_CHANNEL
|
||||
return ssid, channel
|
||||
|
||||
def _setup_iptables_redirect(self) -> bool:
|
||||
"""
|
||||
Add iptables rules that redirect port 80 → Flask on 5000 for the captive portal.
|
||||
The INPUT rule must accept port 5000 (post-redirect destination), not port 80.
|
||||
ip_forward state is saved to disk before enabling; call _teardown_iptables_redirect
|
||||
to restore it.
|
||||
Returns True if rules were applied.
|
||||
"""
|
||||
try:
|
||||
if subprocess.run(["which", "iptables"], capture_output=True,
|
||||
timeout=2).returncode != 0:
|
||||
logger.debug("iptables unavailable; captive portal requires direct port-5000 access")
|
||||
return False
|
||||
|
||||
# Save current ip_forward state so we can restore it exactly on teardown
|
||||
fwd = subprocess.run(["sysctl", "-n", "net.ipv4.ip_forward"],
|
||||
capture_output=True, text=True, timeout=3)
|
||||
saved = fwd.stdout.strip() if fwd.returncode == 0 else "0"
|
||||
try:
|
||||
self._IP_FORWARD_SAVE_PATH.write_text(saved)
|
||||
except OSError:
|
||||
pass # non-fatal; restore will fall back to "0"
|
||||
|
||||
subprocess.run(["sudo", "sysctl", "-w", "net.ipv4.ip_forward=1"],
|
||||
capture_output=True, timeout=5)
|
||||
|
||||
# PREROUTING: redirect HTTP → Flask
|
||||
if subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-C", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
).returncode != 0:
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-A", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
|
||||
# INPUT: accept traffic on port 5000 (the post-redirect destination port)
|
||||
if subprocess.run(
|
||||
["sudo", "iptables", "-C", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
|
||||
"-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
).returncode != 0:
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-A", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
|
||||
"-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
|
||||
logger.info("iptables: port 80→5000 redirect and INPUT accept-5000 rules added")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not set up iptables redirect: {e}")
|
||||
return False
|
||||
|
||||
def _teardown_iptables_redirect(self) -> None:
|
||||
"""Remove the port 80→5000 iptables rules and restore the saved ip_forward state."""
|
||||
try:
|
||||
if subprocess.run(["which", "iptables"], capture_output=True,
|
||||
timeout=2).returncode != 0:
|
||||
return
|
||||
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-D", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-D", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
|
||||
"-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
|
||||
# Restore ip_forward to whatever it was before we touched it
|
||||
try:
|
||||
saved = self._IP_FORWARD_SAVE_PATH.read_text().strip()
|
||||
self._IP_FORWARD_SAVE_PATH.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
saved = "0"
|
||||
subprocess.run(["sudo", "sysctl", "-w", f"net.ipv4.ip_forward={saved}"],
|
||||
capture_output=True, timeout=5)
|
||||
logger.info(f"iptables redirect rules removed; ip_forward restored to {saved}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not tear down iptables redirect: {e}")
|
||||
|
||||
def scan_networks(self, allow_cached: bool = True) -> Tuple[List[WiFiNetwork], bool]:
|
||||
"""
|
||||
Scan for available WiFi networks.
|
||||
@@ -1647,60 +1761,8 @@ class WiFiManager:
|
||||
subprocess.run(["sudo", "systemctl", "stop", HOSTAPD_SERVICE], timeout=5)
|
||||
return False, f"Failed to start dnsmasq: {result.stderr}"
|
||||
|
||||
# Set up iptables port forwarding: redirect port 80 to 5000
|
||||
# This makes the captive portal work on standard HTTP port
|
||||
try:
|
||||
# Check if iptables is available
|
||||
iptables_check = subprocess.run(
|
||||
["which", "iptables"],
|
||||
capture_output=True,
|
||||
timeout=2
|
||||
)
|
||||
|
||||
if iptables_check.returncode == 0:
|
||||
# Enable IP forwarding (needed for NAT)
|
||||
subprocess.run(
|
||||
["sudo", "sysctl", "-w", "net.ipv4.ip_forward=1"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Add NAT rule to redirect port 80 to 5000 on WiFi interface
|
||||
# First check if rule already exists
|
||||
check_result = subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-C", "PREROUTING", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
if check_result.returncode != 0:
|
||||
# Rule doesn't exist, add it
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-A", "PREROUTING", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
logger.info("Added iptables rule to redirect port 80 to 5000")
|
||||
|
||||
# Also allow incoming connections on port 80
|
||||
check_input = subprocess.run(
|
||||
["sudo", "iptables", "-C", "INPUT", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "ACCEPT"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
if check_input.returncode != 0:
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-A", "INPUT", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "ACCEPT"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
else:
|
||||
logger.debug("iptables not available, port forwarding not set up")
|
||||
logger.info("Note: Port 80 forwarding requires iptables. Users will need to access port 5000 directly.")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not set up iptables port forwarding: {e}")
|
||||
# Continue anyway - port 5000 will still work
|
||||
# Set up iptables port forwarding (port 80 → 5000) and save ip_forward state
|
||||
self._setup_iptables_redirect()
|
||||
|
||||
logger.info("AP mode enabled successfully")
|
||||
ap_ssid = self.config.get("ap_ssid", DEFAULT_AP_SSID)
|
||||
@@ -1731,36 +1793,16 @@ class WiFiManager:
|
||||
self.disconnect_from_network()
|
||||
time.sleep(1)
|
||||
|
||||
ap_ssid = self.config.get("ap_ssid", DEFAULT_AP_SSID)
|
||||
ap_channel = self.config.get("ap_channel", DEFAULT_AP_CHANNEL)
|
||||
ap_ssid, ap_channel = self._validate_ap_config()
|
||||
|
||||
# Clean up any pre-existing AP connection profiles
|
||||
# Delete only the specific application-managed AP profiles by name.
|
||||
# Never delete by SSID — that would destroy a user's saved home network.
|
||||
for conn_name in ["Hotspot", "LEDMatrix-Setup-AP", "TickerSetup-AP"]:
|
||||
subprocess.run(["nmcli", "connection", "down", conn_name],
|
||||
capture_output=True, timeout=5)
|
||||
subprocess.run(["nmcli", "connection", "delete", conn_name],
|
||||
capture_output=True, timeout=10)
|
||||
|
||||
# Also delete any connection whose SSID matches ours or is hotspot type
|
||||
result = subprocess.run(
|
||||
["nmcli", "-t", "-f", "NAME,TYPE,802-11-wireless.ssid", "connection", "show"],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
if result.returncode == 0:
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2:
|
||||
conn_name = parts[0].strip()
|
||||
conn_type = parts[1].strip().lower() if len(parts) > 1 else ""
|
||||
conn_ssid = parts[2].strip() if len(parts) > 2 else ""
|
||||
if ('hotspot' in conn_type or conn_ssid == ap_ssid or
|
||||
'hotspot' in conn_name.lower()):
|
||||
logger.info(f"Deleting existing AP connection: {conn_name}")
|
||||
subprocess.run(["nmcli", "connection", "down", conn_name],
|
||||
capture_output=True, timeout=5)
|
||||
subprocess.run(["nmcli", "connection", "delete", conn_name],
|
||||
capture_output=True, timeout=10)
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# Create an open AP connection profile from scratch.
|
||||
@@ -1811,47 +1853,9 @@ class WiFiManager:
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
# Set up iptables port forwarding: redirect port 80 → 5000
|
||||
# This enables the captive portal auto-redirect on phones/laptops.
|
||||
try:
|
||||
iptables_check = subprocess.run(["which", "iptables"],
|
||||
capture_output=True, timeout=2)
|
||||
if iptables_check.returncode == 0:
|
||||
subprocess.run(
|
||||
["sudo", "sysctl", "-w", "net.ipv4.ip_forward=1"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
check_result = subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-C", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
if check_result.returncode != 0:
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-A", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
logger.info("Added iptables rule to redirect port 80 to 5000")
|
||||
check_input = subprocess.run(
|
||||
["sudo", "iptables", "-C", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
if check_input.returncode != 0:
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-A", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
else:
|
||||
logger.debug("iptables not available; users must access port 5000 directly")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not set up iptables port forwarding: {e}")
|
||||
# NM's ipv4.method=shared manages ip_forward automatically, so we only
|
||||
# need to add the iptables port-redirect rules for the captive portal.
|
||||
self._setup_iptables_redirect()
|
||||
|
||||
# Verify the AP is actually running
|
||||
status = self._get_ap_status_nmcli()
|
||||
@@ -1887,7 +1891,12 @@ class WiFiManager:
|
||||
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2 and 'hotspot' in parts[1].lower():
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
conn_name = parts[0].strip()
|
||||
conn_type = parts[1].strip().lower()
|
||||
# Match our known AP profile name OR the legacy nmcli hotspot type
|
||||
if conn_name == "LEDMatrix-Setup-AP" or 'hotspot' in conn_type:
|
||||
# Get actual IP address (may be 192.168.4.1 or 10.42.0.1 depending on config)
|
||||
ip = '192.168.4.1'
|
||||
interface = parts[2] if len(parts) > 2 else self._wifi_interface
|
||||
@@ -1983,45 +1992,9 @@ class WiFiManager:
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not remove dnsmasq drop-in config: {e}")
|
||||
|
||||
# Remove iptables port forwarding rules and disable IP forwarding (only for hostapd mode)
|
||||
# Remove iptables redirect rules and restore ip_forward state (hostapd mode only)
|
||||
if hostapd_active:
|
||||
try:
|
||||
# Check if iptables is available
|
||||
iptables_check = subprocess.run(
|
||||
["which", "iptables"],
|
||||
capture_output=True,
|
||||
timeout=2
|
||||
)
|
||||
|
||||
if iptables_check.returncode == 0:
|
||||
# Remove NAT redirect rule
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-D", "PREROUTING", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Remove INPUT rule
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-D", "INPUT", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "ACCEPT"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
logger.info("Removed iptables port forwarding rules")
|
||||
else:
|
||||
logger.debug("iptables not available, skipping rule removal")
|
||||
|
||||
# Disable IP forwarding (restore to default client mode)
|
||||
subprocess.run(
|
||||
["sudo", "sysctl", "-w", "net.ipv4.ip_forward=0"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
logger.info("Disabled IP forwarding")
|
||||
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError) as e:
|
||||
logger.warning(f"Could not remove iptables rules or disable forwarding: {e}")
|
||||
# Continue anyway
|
||||
self._teardown_iptables_redirect()
|
||||
|
||||
# Clean up WiFi interface IP configuration
|
||||
subprocess.run(
|
||||
@@ -2064,32 +2037,10 @@ class WiFiManager:
|
||||
except Exception as e:
|
||||
logger.error(f"Final WiFi radio unblock attempt failed: {e}")
|
||||
else:
|
||||
# nmcli AP mode - restart not needed, but clean up iptables rules
|
||||
# (we add these in _enable_ap_mode_nmcli_hotspot for captive portal)
|
||||
# nmcli AP mode — NM's ipv4.method=shared manages ip_forward automatically,
|
||||
# so we only need to remove the iptables redirect rules we added.
|
||||
logger.info("Skipping NetworkManager restart (nmcli AP mode, restart not needed)")
|
||||
try:
|
||||
iptables_check = subprocess.run(["which", "iptables"],
|
||||
capture_output=True, timeout=2)
|
||||
if iptables_check.returncode == 0:
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-D", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-D", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
subprocess.run(
|
||||
["sudo", "sysctl", "-w", "net.ipv4.ip_forward=0"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
logger.info("Removed iptables port forwarding rules (nmcli path)")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not remove iptables rules (nmcli path): {e}")
|
||||
self._teardown_iptables_redirect()
|
||||
# Ensure WiFi radio is enabled after nmcli operations
|
||||
wifi_enabled = self._ensure_wifi_radio_enabled(max_retries=3)
|
||||
if not wifi_enabled:
|
||||
|
||||
Reference in New Issue
Block a user