4 Commits

Author SHA1 Message Date
Chuck
15fc9003ac fix: address three wifi_manager and one plugins_manager review findings
wifi_manager.py:
- _create_hostapd_config: use _validate_ap_config() for ssid/channel instead
  of raw self.config values; strip newlines from SSID to prevent config-file
  injection via the generated hostapd.conf
- _setup_iptables_redirect: check return codes of sysctl ip_forward enable and
  both iptables -A calls; on any failure log the error output, call
  _teardown_iptables_redirect() to restore state, and return False instead of
  silently succeeding
- _enable_ap_mode_nmcli_hotspot: on AP verification failure roll back fully —
  tear down iptables redirect, delete the LEDMatrix-Setup-AP connection profile,
  clear the LED message — before returning False

plugins_manager.js:
- initializePlugins: chain searchPluginStore(!isReswapWarm) inside
  loadInstalledPlugins().then() so window.installedPlugins is populated before
  the store renders Installed/Reinstall badges (same pattern applied to
  refreshPlugins() in the previous commit)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 18:33:53 -04:00
Chuck
55a6a53fca fix(plugins): fix async race in refreshPlugins; use cache TTL to gate re-swap metadata fetch
refreshPlugins() called searchPluginStore(true) and showNotification() immediately
after refreshInstalledPlugins() without awaiting the returned Promise, so
window.installedPlugins could still be stale when the store rendered its
Installed/Reinstall badges. Chain .then() so both run only after the fetch
completes.

In initializePlugins(), the re-swap path always passed fetchCommitInfo=false to
searchPluginStore, skipping GitHub metadata even when the 5-minute cache TTL had
expired. Add storeCacheExpired() helper and compute isReswapWarm = _reswap &&
!storeCacheExpired() so fresh metadata is fetched whenever the cache is cold,
regardless of whether the render is a first load or a tab re-swap.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 18:33:53 -04:00
Chuck
c54718af2d fix(wifi): address Codacy review findings in AP mode implementation
- Validate ap_ssid/ap_channel from config before passing to subprocess
  (printable ASCII ≤32 chars; channel 1-14) to prevent command injection

- Fix INPUT iptables rule: PREROUTING redirects port 80→5000 so the INPUT
  chain sees dport=5000, not 80. Old INPUT rule on port 80 was a no-op.

- Refactor iptables setup/teardown into _setup_iptables_redirect() and
  _teardown_iptables_redirect() helpers, eliminating duplicate logic in
  the hostapd and nmcli paths

- Save/restore ip_forward state (via /tmp/ledmatrix_ip_forward_saved)
  instead of forcing it to 0 on cleanup, which could break VPNs or
  bridges already relying on forwarding

- nmcli path skips ip_forward management entirely: NM's ipv4.method=shared
  already manages it for the duration of the connection

- Fix _get_ap_status_nmcli() verification: new 'connection add type wifi'
  profiles have type '802-11-wireless', not 'hotspot', so verification was
  always returning False. Now also matches by our known connection name.

- Remove SSID-based connection deletion: deleting any profile whose SSID
  matched the AP SSID could destroy a user's saved home WiFi profile.
  Now only deletes by our application-managed profile names.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 18:33:53 -04:00
Chuck
e8afd23c98 fix(wifi): create truly open AP via nmcli connection add; add captive portal to nmcli path
nmcli device wifi hotspot always attaches a WPA2 PSK on Bookworm/Trixie
and silently ignores post-creation security modifications, causing users
to be prompted for an unknown password. Switch to nmcli connection add
with 802-11-wireless.mode ap and no security section — NM cannot auto-add
a password to a profile that has no 802-11-wireless-security block.

Also:
- Remove dead DEFAULT_AP_PASSWORD / ap_password config field (stored but
  never passed to hostapd or nmcli, causing user confusion)
- Add iptables port 80→5000 redirect to the nmcli AP path so captive portal
  auto-popup works on phones without hostapd (previously only worked on
  the hostapd path)
- Clean up iptables rules on disable for the nmcli path
- Improve LED message on AP enable: show SSID, "No password", and IP:port
  on both paths so users know exactly how to connect
- Fix systemd template: replace hardcoded /home/ledpi/LEDMatrix/ with
  __PROJECT_ROOT_DIR__ placeholder (install script already writes correct path)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 18:33:53 -04:00
10 changed files with 727 additions and 1381 deletions

View File

@@ -1,5 +1,4 @@
# LEDMatrix
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/77fc9b446a5948e5b0aed7a7aaeb1bab)](https://app.codacy.com/gh/ChuckBuilds/LEDMatrix/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_grade)
## Welcome to LEDMatrix!
Welcome to the LEDMatrix Project! This open-source project enables you to run an information-rich display on a Raspberry Pi connected to an LED RGB Matrix panel. Whether you want to see your calendar, weather forecasts, sports scores, stock prices, or any other information at a glance, LEDMatrix brings it all together.

View File

@@ -1086,7 +1086,6 @@ SYSTEMCTL_PATH=$(which systemctl)
REBOOT_PATH=$(which reboot)
POWEROFF_PATH=$(which poweroff)
BASH_PATH=$(which bash)
JOURNALCTL_PATH=$(which journalctl 2>/dev/null || true)
# Create sudoers content
cat > /tmp/ledmatrix_web_sudoers << EOF
@@ -1102,23 +1101,10 @@ $ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH enable ledmatrix.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH disable ledmatrix.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH status ledmatrix.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH start ledmatrix-web.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH stop ledmatrix-web.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix-web.service
$ACTUAL_USER ALL=(ALL) NOPASSWD: $PYTHON_PATH $PROJECT_ROOT_DIR/display_controller.py
$ACTUAL_USER ALL=(ALL) NOPASSWD: $BASH_PATH $PROJECT_ROOT_DIR/start_display.sh
$ACTUAL_USER ALL=(ALL) NOPASSWD: $BASH_PATH $PROJECT_ROOT_DIR/stop_display.sh
$ACTUAL_USER ALL=(ALL) NOPASSWD: $BASH_PATH $PROJECT_ROOT_DIR/scripts/fix_perms/safe_plugin_rm.sh *
EOF
if [ -n "$JOURNALCTL_PATH" ]; then
cat >> /tmp/ledmatrix_web_sudoers << EOF
$ACTUAL_USER ALL=(ALL) NOPASSWD: $JOURNALCTL_PATH -u ledmatrix.service *
$ACTUAL_USER ALL=(ALL) NOPASSWD: $JOURNALCTL_PATH -u ledmatrix *
$ACTUAL_USER ALL=(ALL) NOPASSWD: $JOURNALCTL_PATH -t ledmatrix *
EOF
fi
if [ -f "$SUDOERS_FILE" ] && cmp -s /tmp/ledmatrix_web_sudoers "$SUDOERS_FILE"; then
echo "Sudoers configuration already up to date"

View File

@@ -1,4 +1,4 @@
requests>=2.28.0
Pillow>=12.2.0
Pillow>=9.1.0
pytz>=2022.1
numpy>=1.24.0

View File

@@ -89,9 +89,9 @@ TEMP_SUDOERS="/tmp/ledmatrix_web_sudoers_$$"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH status ledmatrix.service"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix.service"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH start ledmatrix-web.service"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH stop ledmatrix-web.service"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix-web.service"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH start ledmatrix-web"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH stop ledmatrix-web"
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix-web"
# Optional: journalctl (non-critical — skip if not found)
if [ -n "$JOURNALCTL_PATH" ]; then

View File

@@ -10,7 +10,6 @@ import sys
import time
import logging
import signal
import subprocess
from pathlib import Path
# Add project root to path (parent of scripts/utils/)
@@ -44,11 +43,7 @@ class WiFiMonitorDaemon:
self.wifi_manager = WiFiManager()
self.running = True
self.last_state = None
# Counts consecutive checks where nmcli says "connected" but internet is unreachable.
# After _nm_restart_threshold failures, NetworkManager is restarted as a recovery step.
self._consecutive_internet_failures = 0
self._nm_restart_threshold = 5 # ~2.5 min at 30s interval
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
@@ -127,43 +122,6 @@ class WiFiMonitorDaemon:
else:
logger.debug(f"Status check: WiFi=disconnected, Ethernet={updated_ethernet}, AP={updated_status.ap_mode_active}")
# Escalating recovery: if nmcli reports connected but actual internet
# is unreachable for several consecutive checks, restart NetworkManager.
# This is done HERE (not inside check_and_manage_ap_mode) to keep the
# AP-enable trigger clean and avoid false-positive AP enables from
# transient packet loss on otherwise working WiFi.
if updated_status.connected and not updated_status.ap_mode_active:
if not self.wifi_manager.check_internet_connectivity():
self._consecutive_internet_failures += 1
logger.warning(
f"Internet unreachable despite nmcli connection "
f"({self._consecutive_internet_failures}/{self._nm_restart_threshold})"
)
if self._consecutive_internet_failures >= self._nm_restart_threshold:
logger.warning("Restarting NetworkManager to recover internet connectivity")
try:
subprocess.run(
["/usr/bin/systemctl", "restart", "NetworkManager"],
capture_output=True, timeout=20, check=True
)
self._consecutive_internet_failures = 0
# NM restart causes a brief WiFi drop; reset the AP-mode grace
# counter so that transient disconnect doesn't count toward
# triggering AP mode.
self.wifi_manager._disconnected_checks = 0
except subprocess.CalledProcessError as e:
logger.error(f"NetworkManager restart failed (rc={e.returncode}); "
"resetting failure counter to avoid tight retry loop")
self._consecutive_internet_failures = 0
except (subprocess.SubprocessError, OSError) as e:
logger.error(f"NetworkManager restart error: {e}; "
"resetting failure counter to avoid tight retry loop")
self._consecutive_internet_failures = 0
else:
self._consecutive_internet_failures = 0
else:
self._consecutive_internet_failures = 0
# Sleep until next check
time.sleep(self.check_interval)

View File

@@ -60,11 +60,6 @@ def get_wifi_config_path():
HOSTAPD_CONFIG_PATH = Path("/etc/hostapd/hostapd.conf")
DNSMASQ_CONFIG_PATH = Path("/etc/dnsmasq.d/ledmatrix-captive.conf")
# Drop-in config for NetworkManager's built-in dnsmasq (ipv4.method=shared).
# Writing address=/#/<ap_ip> here causes NM to resolve every hostname to the AP,
# triggering the OS captive-portal popup automatically on iOS/Android/Windows/macOS.
NM_DNSMASQ_SHARED_DIR = Path("/etc/NetworkManager/dnsmasq-shared.d")
NM_DNSMASQ_SHARED_CONF = NM_DNSMASQ_SHARED_DIR / "ledmatrix-captive.conf"
HOSTAPD_SERVICE = "hostapd"
DNSMASQ_SERVICE = "dnsmasq"
@@ -142,11 +137,6 @@ class WiFiManager:
self._disconnected_checks = 0
self._disconnected_checks_required = 3 # Require 3 consecutive disconnected checks (90 seconds at 30s interval)
# Timestamp set when AP mode is enabled; used for the idle-timeout check
self._ap_enabled_at: Optional[float] = None
# Which redirect backend was used (iptables/nftables/None); set per-instance
self._redirect_backend: Optional[str] = None
logger.info(f"WiFi Manager initialized - nmcli: {self.has_nmcli}, iwlist: {self.has_iwlist}, "
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
@@ -210,24 +200,6 @@ class WiFiManager:
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
return False
def _find_command_path(self, command: str) -> Optional[str]:
"""
Return the absolute path of a command, checking sbin locations that may not
be on PATH in restricted service environments. Returns None if not found.
"""
try:
result = subprocess.run(["which", command], capture_output=True,
text=True, timeout=2)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
pass
for path in [f"/usr/sbin/{command}", f"/sbin/{command}",
f"/usr/local/sbin/{command}"]:
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
return None
def _discover_wifi_interface(self) -> str:
"""
Discover the primary WiFi interface name dynamically.
@@ -693,8 +665,9 @@ class WiFiManager:
def _validate_ap_config(self) -> Tuple[str, int]:
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
import re as _re
ssid = str(self.config.get("ap_ssid", DEFAULT_AP_SSID))
if not ssid or len(ssid) > 32 or not re.match(r'^[\x20-\x7E]+$', ssid):
if not ssid or len(ssid) > 32 or not _re.match(r'^[\x20-\x7E]+$', ssid):
logger.warning(f"AP SSID '{ssid}' is invalid, falling back to default")
ssid = DEFAULT_AP_SSID
try:
@@ -708,261 +681,109 @@ class WiFiManager:
def _setup_iptables_redirect(self) -> bool:
"""
Add port 80 → 5000 redirect rules for the captive portal.
Tries iptables first, falls back to nftables (used by Debian Trixie).
When neither tool is available, logs a warning and returns True — the AP
still works and DNS spoofing still triggers the OS popup; users just land
on port 5000 directly rather than being redirected from port 80.
Only returns False when a tool was found but the rule addition itself failed.
Add iptables rules that redirect port 80 → Flask on 5000 for the captive portal.
The INPUT rule must accept port 5000 (post-redirect destination), not port 80.
ip_forward state is saved to disk before enabling; call _teardown_iptables_redirect
to restore it.
Returns True if rules were applied.
"""
try:
iptables = self._find_command_path("iptables")
nft = self._find_command_path("nft")
if subprocess.run(["which", "iptables"], capture_output=True,
timeout=2).returncode != 0:
logger.debug("iptables unavailable; captive portal requires direct port-5000 access")
return False
if not iptables and not nft:
logger.warning(
"Neither iptables nor nft found; captive portal port-80 redirect unavailable. "
"DNS spoofing will still trigger the OS popup but HTTP on port 80 won't reach Flask."
)
self._redirect_backend = None
return True # AP works; redirect is best-effort
if iptables:
return self._setup_iptables_redirect_iptables(iptables)
else:
return self._setup_iptables_redirect_nftables(nft)
except Exception as e:
logger.warning(f"Could not set up port redirect: {e}")
# Save current ip_forward state so we can restore it exactly on teardown
fwd = subprocess.run(["sysctl", "-n", "net.ipv4.ip_forward"],
capture_output=True, text=True, timeout=3)
saved = fwd.stdout.strip() if fwd.returncode == 0 else "0"
try:
self._teardown_iptables_redirect()
except Exception as cleanup_e:
logger.warning(f"Cleanup after redirect exception also failed: {cleanup_e}")
return False
def _setup_iptables_redirect_iptables(self, iptables: str) -> bool:
"""Set up port 80→5000 redirect using iptables."""
# Save ip_forward state before enabling
try:
current_fwd = Path("/proc/sys/net/ipv4/ip_forward").read_text().strip()
except OSError:
current_fwd = None
if current_fwd is not None:
try:
self._IP_FORWARD_SAVE_PATH.write_text(current_fwd)
self._IP_FORWARD_SAVE_PATH.write_text(saved)
except OSError:
current_fwd = None
logger.warning("Could not write ip_forward save file; state will not be restored")
pass # non-fatal; restore will fall back to "0"
if current_fwd != "1":
sysctl = self._find_command_path("sysctl")
sysctl_bin = sysctl if sysctl else "sysctl"
r = subprocess.run(["sudo", sysctl_bin, "-w", "net.ipv4.ip_forward=1"],
capture_output=True, text=True, timeout=5)
if r.returncode != 0:
logger.error(f"Failed to enable ip_forward: {r.stderr.strip()}")
sysctl_r = subprocess.run(
["sudo", "sysctl", "-w", "net.ipv4.ip_forward=1"],
capture_output=True, text=True, timeout=5
)
if sysctl_r.returncode != 0:
logger.error(f"Failed to enable ip_forward: {sysctl_r.stderr.strip()}")
self._teardown_iptables_redirect()
return False
if subprocess.run(
["sudo", iptables, "-t", "nat", "-C", "PREROUTING",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
"-j", "REDIRECT", "--to-port", "5000"],
capture_output=True, timeout=5
).returncode != 0:
r = subprocess.run(
["sudo", iptables, "-t", "nat", "-A", "PREROUTING",
# PREROUTING: redirect HTTP → Flask
if subprocess.run(
["sudo", "iptables", "-t", "nat", "-C", "PREROUTING",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
"-j", "REDIRECT", "--to-port", "5000"],
capture_output=True, text=True, timeout=5
)
if r.returncode != 0:
logger.error(f"Failed to add PREROUTING rule: {r.stderr.strip()}")
self._teardown_iptables_redirect()
return False
if subprocess.run(
["sudo", iptables, "-C", "INPUT",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000", "-j", "ACCEPT"],
capture_output=True, timeout=5
).returncode != 0:
r = subprocess.run(
["sudo", iptables, "-A", "INPUT",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000", "-j", "ACCEPT"],
capture_output=True, text=True, timeout=5
)
if r.returncode != 0:
logger.error(f"Failed to add INPUT rule: {r.stderr.strip()}")
self._teardown_iptables_redirect()
return False
self._redirect_backend = "iptables"
logger.info("iptables: port 80→5000 redirect rules added")
return True
def _setup_iptables_redirect_nftables(self, nft: str) -> bool:
"""Set up port 80→5000 redirect using nftables (Debian Trixie / modern systems)."""
# NM's ipv4.method=shared already enables ip_forward; no sysctl needed.
cmds = [
["sudo", nft, "add", "table", "ip", "ledmatrix"],
["sudo", nft, "add", "chain", "ip", "ledmatrix", "prerouting",
"{", "type", "nat", "hook", "prerouting", "priority", "-100", ";", "}"],
["sudo", nft, "add", "rule", "ip", "ledmatrix", "prerouting",
"iif", self._wifi_interface, "tcp", "dport", "80", "redirect", "to", ":5000"],
]
for cmd in cmds:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if r.returncode != 0:
# Table/chain may already exist — only fail on rule add
if "add rule" in " ".join(cmd):
logger.error(f"Failed to add nftables redirect rule: {r.stderr.strip()}")
capture_output=True, timeout=5
).returncode != 0:
add_r = subprocess.run(
["sudo", "iptables", "-t", "nat", "-A", "PREROUTING",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
"-j", "REDIRECT", "--to-port", "5000"],
capture_output=True, text=True, timeout=5
)
if add_r.returncode != 0:
logger.error(f"Failed to add PREROUTING rule: {add_r.stderr.strip()}")
self._teardown_iptables_redirect()
return False
logger.debug(f"nft cmd non-zero (may already exist): {r.stderr.strip()}")
self._redirect_backend = "nftables"
logger.info("nftables: port 80→5000 redirect rule added")
return True
# INPUT: accept traffic on port 5000 (the post-redirect destination port)
if subprocess.run(
["sudo", "iptables", "-C", "INPUT",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
"-j", "ACCEPT"],
capture_output=True, timeout=5
).returncode != 0:
add_r = subprocess.run(
["sudo", "iptables", "-A", "INPUT",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
"-j", "ACCEPT"],
capture_output=True, text=True, timeout=5
)
if add_r.returncode != 0:
logger.error(f"Failed to add INPUT rule: {add_r.stderr.strip()}")
self._teardown_iptables_redirect()
return False
logger.info("iptables: port 80→5000 redirect and INPUT accept-5000 rules added")
return True
except Exception as e:
logger.warning(f"Could not set up iptables redirect: {e}")
return False
def _teardown_iptables_redirect(self) -> None:
"""Remove the port 80→5000 redirect rules and restore ip_forward if saved."""
"""Remove the port 80→5000 iptables rules and restore the saved ip_forward state."""
try:
backend = self._redirect_backend
self._redirect_backend = None
if subprocess.run(["which", "iptables"], capture_output=True,
timeout=2).returncode != 0:
return
if backend == "iptables":
iptables = self._find_command_path("iptables")
if iptables:
subprocess.run(
["sudo", iptables, "-t", "nat", "-D", "PREROUTING",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
"-j", "REDIRECT", "--to-port", "5000"],
capture_output=True, timeout=5
)
subprocess.run(
["sudo", iptables, "-D", "INPUT",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
"-j", "ACCEPT"],
capture_output=True, timeout=5
)
# Restore ip_forward only when we saved it
if self._IP_FORWARD_SAVE_PATH.exists():
try:
saved = self._IP_FORWARD_SAVE_PATH.read_text().strip()
self._IP_FORWARD_SAVE_PATH.unlink(missing_ok=True)
sysctl = self._find_command_path("sysctl")
sysctl_bin = sysctl if sysctl else "sysctl"
subprocess.run(["sudo", sysctl_bin, "-w", f"net.ipv4.ip_forward={saved}"],
capture_output=True, timeout=5)
logger.info(f"ip_forward restored to {saved}")
except OSError as e:
logger.warning(f"Could not restore ip_forward: {e}")
else:
logger.debug("ip_forward not modified by setup; leaving unchanged")
subprocess.run(
["sudo", "iptables", "-t", "nat", "-D", "PREROUTING",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
"-j", "REDIRECT", "--to-port", "5000"],
capture_output=True, timeout=5
)
subprocess.run(
["sudo", "iptables", "-D", "INPUT",
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
"-j", "ACCEPT"],
capture_output=True, timeout=5
)
elif backend == "nftables":
nft = self._find_command_path("nft")
if nft:
subprocess.run(
["sudo", nft, "delete", "table", "ip", "ledmatrix"],
capture_output=True, timeout=5
)
logger.info("nftables ledmatrix table removed")
else:
# No redirect was set up (neither tool available); nothing to tear down
# Restore ip_forward to whatever it was before we touched it
try:
saved = self._IP_FORWARD_SAVE_PATH.read_text().strip()
self._IP_FORWARD_SAVE_PATH.unlink(missing_ok=True)
except OSError:
saved = "0"
subprocess.run(["sudo", "sysctl", "-w", f"net.ipv4.ip_forward={saved}"],
capture_output=True, timeout=5)
logger.info(f"iptables redirect rules removed; ip_forward restored to {saved}")
except Exception as e:
logger.warning(f"Could not tear down port redirect: {e}")
def _write_nm_dnsmasq_captive_conf(self, ap_ip: str = "192.168.4.1") -> None:
"""
Write the NM dnsmasq-shared.d drop-in that makes NM's built-in dnsmasq
resolve every hostname to the AP IP. This triggers the OS captive-portal
popup automatically on iOS / Android / Windows / macOS as soon as the
device connects — no manual navigation required.
NetworkManager reads /etc/NetworkManager/dnsmasq-shared.d/*.conf when it
starts the dnsmasq instance for ipv4.method=shared connections.
"""
try:
content = f"# LEDMatrix captive portal: resolve all hostnames to AP\naddress=/#/{ap_ip}\n"
with open("/tmp/ledmatrix-nm-dnsmasq.conf", "w") as f:
f.write(content)
subprocess.run(
["sudo", "mkdir", "-p", str(NM_DNSMASQ_SHARED_DIR)],
capture_output=True, timeout=5
)
subprocess.run(
["sudo", "cp", "/tmp/ledmatrix-nm-dnsmasq.conf", str(NM_DNSMASQ_SHARED_CONF)],
capture_output=True, timeout=5
)
logger.info(f"Wrote NM dnsmasq captive-portal config: {NM_DNSMASQ_SHARED_CONF}")
except Exception as e:
logger.warning(f"Could not write NM dnsmasq captive config: {e}")
def _remove_nm_dnsmasq_captive_conf(self) -> None:
"""Remove the NM dnsmasq-shared.d drop-in written by _write_nm_dnsmasq_captive_conf."""
try:
subprocess.run(
["sudo", "rm", "-f", str(NM_DNSMASQ_SHARED_CONF)],
capture_output=True, timeout=5
)
logger.info("Removed NM dnsmasq captive-portal config")
except Exception as e:
logger.warning(f"Could not remove NM dnsmasq captive config: {e}")
def _check_internet_connectivity(self, timeout: int = 5) -> bool:
"""
Test actual internet reachability — not just nmcli association state.
A device can be 'connected' in nmcli (associated with an AP) while the
router has no WAN link. This check catches that case so the daemon can
auto-enable AP mode even when nmcli reports a connection.
Returns True if at least one reachability method succeeds.
"""
try:
r = subprocess.run(
["ping", "-c", "1", "-W", str(timeout), "8.8.8.8"],
capture_output=True, timeout=timeout + 1
)
if r.returncode == 0:
logger.debug("Internet connectivity confirmed via ping 8.8.8.8")
return True
except (subprocess.SubprocessError, OSError):
pass
try:
import urllib.request as _ureq
_ureq.urlopen("http://connectivity-check.ubuntu.com/", timeout=timeout)
logger.debug("Internet connectivity confirmed via HTTP check")
return True
except OSError:
pass
logger.debug("Internet connectivity check failed (both ping and HTTP)")
return False
def check_internet_connectivity(self, timeout: int = 5) -> bool:
"""Public wrapper around _check_internet_connectivity for use by the daemon."""
return self._check_internet_connectivity(timeout=timeout)
def _has_ap_clients(self) -> bool:
"""
Return True if at least one client is associated with the AP.
Uses 'iw dev <iface> station dump' which works for both hostapd and
nmcli AP modes.
"""
try:
result = subprocess.run(
["iw", "dev", self._wifi_interface, "station", "dump"],
capture_output=True, text=True, timeout=5
)
return bool(result.stdout.strip())
except Exception:
return False
logger.warning(f"Could not tear down iptables redirect: {e}")
def scan_networks(self, allow_cached: bool = True) -> Tuple[List[WiFiNetwork], bool]:
"""
@@ -1598,27 +1419,12 @@ class WiFiManager:
error_msg = result.stderr.strip() or result.stdout.strip()
logger.error(f"Failed to connect to {ssid}: {error_msg}")
self._show_led_message("Connection failed", duration=5)
if self._is_wrong_password_error(error_msg):
return False, f"wrong_password: {error_msg}"
return False, error_msg
except Exception as e:
logger.error(f"Error connecting with nmcli: {e}")
self._show_led_message("Connection error", duration=5)
return False, str(e)
@staticmethod
def _is_wrong_password_error(error_msg: str) -> bool:
"""Return True when nmcli's error output indicates an authentication failure."""
indicators = [
"secrets were required",
"no secret agent",
"802-11-wireless-security.psk",
"authentication rejected",
"association rejected",
]
lower = error_msg.lower()
return any(ind in lower for ind in indicators)
def _connect_wpa_supplicant(self, ssid: str, password: str) -> Tuple[bool, str]:
"""Connect using wpa_supplicant (fallback)"""
try:
@@ -1890,18 +1696,14 @@ class WiFiManager:
if self.has_hostapd and self.has_dnsmasq:
result = self._enable_ap_mode_hostapd()
if result[0]:
self._ap_enabled_at = time.time()
return result
# Fallback to nmcli hotspot (simpler, no captive portal)
if self.has_nmcli:
logger.info("hostapd/dnsmasq failed or unavailable, trying nmcli hotspot fallback...")
self._show_led_message("Setup Mode", duration=5)
result = self._enable_ap_mode_nmcli_hotspot()
if result[0]:
self._ap_enabled_at = time.time()
return result
return self._enable_ap_mode_nmcli_hotspot()
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
except Exception as e:
logger.error(f"Error in enable_ap_mode: {e}")
@@ -1974,17 +1776,10 @@ class WiFiManager:
return False, f"Failed to start dnsmasq: {result.stderr}"
# Set up iptables port forwarding (port 80 → 5000) and save ip_forward state
if not self._setup_iptables_redirect():
logger.error("Captive-portal redirect setup failed; stopping AP services")
subprocess.run(["sudo", "systemctl", "stop", HOSTAPD_SERVICE],
capture_output=True, timeout=10)
subprocess.run(["sudo", "systemctl", "stop", DNSMASQ_SERVICE],
capture_output=True, timeout=10)
return False, "AP started but captive-portal redirect setup failed"
self._setup_iptables_redirect()
logger.info("AP mode enabled successfully")
# Use the validated SSID so the displayed name matches what hostapd broadcast
ap_ssid, _ = self._validate_ap_config()
ap_ssid = self.config.get("ap_ssid", DEFAULT_AP_SSID)
self._show_led_message(
f"WiFi Setup\n{ap_ssid}\nNo password\n192.168.4.1:5000", duration=10
)
@@ -2044,10 +1839,10 @@ class WiFiManager:
# No 802-11-wireless-security section → open network
]
# PMF (Protected Management Frames) is only meaningful for WPA2/WPA3.
# An open AP has no security section, so adding 802-11-wireless-security.pmf
# would cause NM to require key-mgmt too, breaking the connection add on
# Trixie NM 1.52+. Leave PMF untouched — open APs have no frame protection.
# On Trixie disable PMF which can prevent older clients from connecting
if self._is_trixie:
cmd += ["802-11-wireless-security.pmf", "disable"]
logger.info("Trixie detected: disabling PMF for better client compatibility")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
@@ -2057,12 +1852,6 @@ class WiFiManager:
self._show_led_message("AP mode failed", duration=5)
return False, f"Failed to create AP profile: {error_msg}"
# Write the NM dnsmasq-shared.d captive-portal config BEFORE bringing up
# the connection so NM's dnsmasq picks it up at start time.
# This causes every hostname DNS query from a connected device to resolve
# to 192.168.4.1, automatically triggering the OS captive-portal popup.
self._write_nm_dnsmasq_captive_conf()
logger.info("AP connection profile created, bringing it up...")
up_result = subprocess.run(
["nmcli", "connection", "up", "LEDMatrix-Setup-AP"],
@@ -2071,7 +1860,6 @@ class WiFiManager:
if up_result.returncode != 0:
error_msg = up_result.stderr.strip() or up_result.stdout.strip()
logger.error(f"Failed to bring up AP connection: {error_msg}")
self._remove_nm_dnsmasq_captive_conf()
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
capture_output=True, timeout=10)
self._show_led_message("AP mode failed", duration=5)
@@ -2081,15 +1869,7 @@ class WiFiManager:
# NM's ipv4.method=shared manages ip_forward automatically, so we only
# need to add the iptables port-redirect rules for the captive portal.
if not self._setup_iptables_redirect():
logger.error("Captive-portal redirect setup failed; rolling back AP profile")
self._remove_nm_dnsmasq_captive_conf()
subprocess.run(["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
capture_output=True, timeout=10)
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
capture_output=True, timeout=10)
self._clear_led_message()
return False, "AP started but captive-portal redirect setup failed"
self._setup_iptables_redirect()
# Verify the AP is actually running
status = self._get_ap_status_nmcli()
@@ -2101,7 +1881,6 @@ class WiFiManager:
else:
logger.error("AP mode started but not verified by status check — rolling back")
self._teardown_iptables_redirect()
self._remove_nm_dnsmasq_captive_conf()
subprocess.run(["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
capture_output=True, timeout=10)
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
@@ -2111,7 +1890,6 @@ class WiFiManager:
except Exception as e:
logger.error(f"Error starting AP mode with nmcli: {e}")
self._remove_nm_dnsmasq_captive_conf()
self._show_led_message("Setup mode error", duration=5)
return False, str(e)
@@ -2283,13 +2061,11 @@ class WiFiManager:
# so we only need to remove the iptables redirect rules we added.
logger.info("Skipping NetworkManager restart (nmcli AP mode, restart not needed)")
self._teardown_iptables_redirect()
self._remove_nm_dnsmasq_captive_conf()
# Ensure WiFi radio is enabled after nmcli operations
wifi_enabled = self._ensure_wifi_radio_enabled(max_retries=3)
if not wifi_enabled:
logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
self._ap_enabled_at = None
logger.info("AP mode disabled successfully")
return True, "AP mode disabled"
except Exception as e:
@@ -2436,21 +2212,22 @@ address=/detectportal.firefox.com/192.168.4.1
f"Ethernet={ethernet_connected}, AP_active={ap_active}, "
f"auto_enable={auto_enable}, disconnected_checks={self._disconnected_checks}")
# Determine if we should have AP mode active.
# AP-enable uses only the nmcli association state (fast, no network calls).
# This keeps the same reliable behaviour as before: momentary packet loss
# while on working WiFi does NOT trigger AP mode. The internet-reachability
# check is performed separately in the daemon watchdog for NM recovery.
# Determine if we should have AP mode active
# AP mode should only be auto-enabled if:
# - auto_enable_ap_mode is True AND
# - WiFi is NOT connected AND
# - Ethernet is NOT connected AND
# - We've had multiple consecutive disconnected checks (grace period)
is_disconnected = not status.connected and not ethernet_connected
if is_disconnected:
# Increment disconnected check counter
self._disconnected_checks += 1
logger.debug(f"Network disconnected (check {self._disconnected_checks}/{self._disconnected_checks_required})")
else:
# Reset counter if we're associated
# Reset counter if we're connected
if self._disconnected_checks > 0:
logger.debug("Network connected, resetting disconnected check counter")
logger.debug(f"Network connected, resetting disconnected check counter")
self._disconnected_checks = 0
# Only enable AP if we've had enough consecutive disconnected checks
@@ -2495,24 +2272,6 @@ address=/detectportal.firefox.com/192.168.4.1
# AP is active but auto_enable is disabled - this means it was manually enabled
# Don't disable it automatically, let it stay active
logger.debug("AP mode is active (manually enabled), keeping active")
# Idle-timeout check: disable AP if no client has connected within the window.
# Only applies when AP is active and we haven't just decided to enable/disable it.
if ap_active and self._ap_enabled_at is not None:
try:
idle_timeout_min = max(1, min(1440, int(self.config.get("ap_idle_timeout_minutes", 15))))
except (TypeError, ValueError):
idle_timeout_min = 15
elapsed = time.time() - self._ap_enabled_at
if elapsed > idle_timeout_min * 60 and not self._has_ap_clients():
logger.info(
f"AP idle timeout ({idle_timeout_min} min, no clients) — disabling AP"
)
success, message = self.disable_ap_mode()
if success:
return True
else:
logger.warning(f"Failed to disable AP on idle timeout: {message}")
return False
except Exception as e:

View File

@@ -1,334 +0,0 @@
"""
Unit tests for WiFi AP mode — src.wifi_manager.
Each test exercises logic that can be verified through the subprocess calls the
manager emits, without requiring root access, hardware, or a running Pi.
Scenarios covered:
1. nmcli AP profile is created with no security parameters (open/passwordless).
2. iptables PREROUTING and INPUT rules are added when the nmcli AP starts.
3. iptables rules and ip_forward are reverted when the AP is torn down.
4. LED matrix message includes the SSID, 'No password', and the setup URL.
5. Known AP profile names are deleted before the new profile is created.
"""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from src.wifi_manager import WiFiManager
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _ok(stdout: str = "", stderr: str = "") -> MagicMock:
r = MagicMock()
r.returncode = 0
r.stdout = stdout
r.stderr = stderr
return r
def _fail(stdout: str = "", stderr: str = "error") -> MagicMock:
r = MagicMock()
r.returncode = 1
r.stdout = stdout
r.stderr = stderr
return r
def _find_path_side_effect(name: str) -> str:
"""Deterministic fake for _find_command_path."""
return {"iptables": "/usr/sbin/iptables", "sysctl": "/usr/sbin/sysctl"}.get(
name, f"/usr/bin/{name}"
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def wifi_config(tmp_path: Path) -> Path:
"""Minimal wifi_config.json in a temporary directory."""
cfg_dir = tmp_path / "config"
cfg_dir.mkdir()
cfg = {
"ap_ssid": "LEDMatrix-Setup",
"ap_channel": 7,
"auto_enable_ap_mode": True,
"saved_networks": [],
}
p = cfg_dir / "wifi_config.json"
p.write_text(json.dumps(cfg))
return p
@pytest.fixture()
def manager(wifi_config: Path, tmp_path: Path) -> WiFiManager:
"""
WiFiManager with all system calls stubbed out during construction and the
ip_forward save file redirected to a per-test temporary path.
"""
with patch("src.wifi_manager.subprocess.run", return_value=_ok(stdout="wlan0\n")), \
patch.object(WiFiManager, "_detect_trixie", return_value=False):
mgr = WiFiManager(config_path=wifi_config)
# Force clean, deterministic state regardless of what __init__ inferred
mgr._wifi_interface = "wlan0"
mgr.has_nmcli = True
mgr.has_hostapd = False
mgr.has_dnsmasq = False
mgr.has_iwlist = False
mgr._is_trixie = False
# Redirect the ip_forward save file to tmp so tests never share state
mgr._IP_FORWARD_SAVE_PATH = tmp_path / "ip_fwd_saved"
return mgr
# ---------------------------------------------------------------------------
# 1. AP profile is open (no password)
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_nmcli_ap_profile_has_no_security_params(manager: WiFiManager) -> None:
"""
The 'nmcli connection add' command must not include key-mgmt, psk, or any
WPA-related parameter. On Bookworm/Trixie, NM creates a WPA2-protected
hotspot even when those values are set to 'none'/empty via a later
'connection modify', so the profile must be created without a security
section from the start.
"""
captured: list[list[str]] = []
def _run(cmd, **kw):
captured.append(list(cmd))
return _ok()
with patch("src.wifi_manager.subprocess.run", side_effect=_run), \
patch.object(manager, "disconnect_from_network", return_value=(True, "ok")), \
patch.object(manager, "_setup_iptables_redirect", return_value=True), \
patch.object(manager, "_get_ap_status_nmcli",
return_value={"active": True, "ip": "192.168.4.1"}), \
patch.object(manager, "_show_led_message"):
success, _ = manager._enable_ap_mode_nmcli_hotspot()
assert success, "AP enable should report success"
add_calls = [c for c in captured if "nmcli" in c and "connection" in c and "add" in c]
assert add_calls, "Expected at least one 'nmcli connection add' invocation"
add_str = " ".join(add_calls[0])
assert "key-mgmt" not in add_str, "AP profile must not set key-mgmt"
assert "psk" not in add_str, "AP profile must not include a PSK/password"
assert "wpa" not in add_str.lower(), "AP profile must not reference WPA"
assert "802-11-wireless.mode" in add_str, "AP profile must declare wireless mode"
# Verify the value for 802-11-wireless.mode is exactly "ap" — check the element
# that immediately follows the key in the command list, not a loose substring match.
cmd = add_calls[0]
try:
mode_idx = cmd.index("802-11-wireless.mode")
assert cmd[mode_idx + 1] == "ap", \
f"802-11-wireless.mode value must be exactly 'ap', got {cmd[mode_idx + 1]!r}"
except ValueError:
pytest.fail("802-11-wireless.mode not found as a list element in nmcli command")
# ---------------------------------------------------------------------------
# 2. iptables NAT rules are added when the AP starts
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_iptables_nat_rules_added_on_ap_start(manager: WiFiManager) -> None:
"""
_setup_iptables_redirect must add:
- a PREROUTING REDIRECT rule that maps incoming TCP port 80 to port 5000, and
- an INPUT ACCEPT rule for port 5000 (the post-redirect destination port,
NOT port 80 which never hits the INPUT chain after PREROUTING rewrites it).
"""
captured: list[list[str]] = []
def _run(cmd, **kw):
captured.append(list(cmd))
# iptables -C (check) → rc=1 so the -A (add) branch executes
if "iptables" in " ".join(str(x) for x in cmd) and "-C" in cmd:
return _fail()
return _ok()
# Patch Path.read_text so /proc/sys/net/ipv4/ip_forward is readable on any OS
with patch("src.wifi_manager.subprocess.run", side_effect=_run), \
patch.object(manager, "_find_command_path", side_effect=_find_path_side_effect), \
patch("pathlib.Path.read_text", return_value="0\n"):
result = manager._setup_iptables_redirect()
assert result, "_setup_iptables_redirect must return True on success"
prerouting_adds = [c for c in captured if "iptables" in " ".join(c) and "-A" in c and "PREROUTING" in c]
assert prerouting_adds, "Expected 'iptables -A PREROUTING' invocation"
pr_str = " ".join(prerouting_adds[0])
assert "--dport" in pr_str and "80" in pr_str, "PREROUTING rule must match dport 80"
assert "5000" in pr_str, "PREROUTING rule must redirect to port 5000"
assert "REDIRECT" in pr_str, "PREROUTING rule must use REDIRECT target"
input_adds = [c for c in captured if "iptables" in " ".join(c) and "-A" in c and "INPUT" in c]
assert input_adds, "Expected 'iptables -A INPUT' invocation"
in_str = " ".join(input_adds[0])
assert "5000" in in_str, "INPUT rule must accept port 5000 (post-PREROUTING destination)"
assert "ACCEPT" in in_str, "INPUT rule must use ACCEPT target"
# Port 80 must NOT be used in the INPUT rule (it is already redirected by PREROUTING)
input_80 = [c for c in captured if "iptables" in " ".join(c) and "INPUT" in c and "--dport" in c and "80" in c]
assert not input_80, "INPUT rule must target port 5000, not port 80"
# ---------------------------------------------------------------------------
# 3a. iptables rules and ip_forward reverted on teardown
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_iptables_rules_and_ip_forward_reverted_on_teardown(manager: WiFiManager) -> None:
"""
_teardown_iptables_redirect must:
- remove the PREROUTING and INPUT iptables rules, and
- restore ip_forward to the exact value recorded in the save file.
"""
original_fwd = "0"
manager._IP_FORWARD_SAVE_PATH.write_text(original_fwd)
# Teardown dispatches on the backend recorded during setup
manager._redirect_backend = "iptables"
captured: list[list[str]] = []
with patch("src.wifi_manager.subprocess.run",
side_effect=lambda cmd, **kw: (captured.append(list(cmd)) or _ok())), \
patch.object(manager, "_find_command_path", side_effect=_find_path_side_effect):
manager._teardown_iptables_redirect()
assert [c for c in captured if "iptables" in " ".join(c) and "-D" in c and "PREROUTING" in c], \
"Expected 'iptables -D PREROUTING' invocation"
assert [c for c in captured if "iptables" in " ".join(c) and "-D" in c and "INPUT" in c], \
"Expected 'iptables -D INPUT' invocation"
restore_calls = [
c for c in captured
if "sysctl" in " ".join(c) and f"ip_forward={original_fwd}" in " ".join(c)
]
assert restore_calls, f"Expected sysctl to restore ip_forward to {original_fwd!r}"
assert not manager._IP_FORWARD_SAVE_PATH.exists(), \
"Save file must be removed after successful teardown"
# ---------------------------------------------------------------------------
# 3b. ip_forward untouched when no save file exists
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_ip_forward_not_restored_when_save_file_absent(manager: WiFiManager) -> None:
"""
When the save file is missing (setup never wrote it, e.g. because /proc was
unreadable or the write failed), teardown must NOT call sysctl so it does not
accidentally clobber ip_forward state owned by a VPN or NetworkManager.
"""
assert not manager._IP_FORWARD_SAVE_PATH.exists()
captured: list[list[str]] = []
with patch("src.wifi_manager.subprocess.run",
side_effect=lambda cmd, **kw: (captured.append(list(cmd)) or _ok())), \
patch.object(manager, "_find_command_path", side_effect=_find_path_side_effect):
manager._teardown_iptables_redirect()
sysctl_calls = [
c for c in captured
if "sysctl" in " ".join(c) and "ip_forward" in " ".join(c)
]
assert not sysctl_calls, \
"sysctl must not be called when no ip_forward save file exists"
# ---------------------------------------------------------------------------
# 4. LED message content
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_led_message_shows_ssid_no_password_and_url(manager: WiFiManager) -> None:
"""
When the nmcli AP activates, the LED message must include:
- the AP SSID ('LEDMatrix-Setup')
- the string 'No password'
- the AP IP address (192.168.4.1) and Flask port (5000)
"""
led_messages: list[str] = []
with patch("src.wifi_manager.subprocess.run", return_value=_ok()), \
patch.object(manager, "disconnect_from_network", return_value=(True, "ok")), \
patch.object(manager, "_setup_iptables_redirect", return_value=True), \
patch.object(manager, "_get_ap_status_nmcli",
return_value={"active": True, "ip": "192.168.4.1"}), \
patch.object(manager, "_show_led_message",
side_effect=lambda msg, **kw: led_messages.append(msg)):
success, _ = manager._enable_ap_mode_nmcli_hotspot()
assert success, "AP enable should report success"
assert led_messages, "Expected at least one _show_led_message call"
combined = "\n".join(led_messages)
assert "No password" in combined, "LED message must say 'No password'"
assert "LEDMatrix-Setup" in combined, "LED message must include the AP SSID"
assert "192.168.4.1" in combined, "LED message must include the AP IP address"
assert "5000" in combined, "LED message must include the Flask port"
# ---------------------------------------------------------------------------
# 5. Stale AP profiles deleted before the new one is created
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_existing_ap_profiles_deleted_before_new_profile_created(manager: WiFiManager) -> None:
"""
Before 'nmcli connection add', the manager must issue
'nmcli connection down/delete' for every known AP profile name so stale
profiles (from a previous crash or partial setup) cannot block the new one.
"""
captured: list[list[str]] = []
def _run(cmd, **kw):
captured.append(list(cmd))
return _ok()
with patch("src.wifi_manager.subprocess.run", side_effect=_run), \
patch.object(manager, "disconnect_from_network", return_value=(True, "ok")), \
patch.object(manager, "_setup_iptables_redirect", return_value=True), \
patch.object(manager, "_get_ap_status_nmcli",
return_value={"active": True, "ip": "192.168.4.1"}), \
patch.object(manager, "_show_led_message"):
success, _ = manager._enable_ap_mode_nmcli_hotspot()
assert success
cmd_strs = [" ".join(c) for c in captured]
for profile in ("LEDMatrix-Setup-AP", "Hotspot", "TickerSetup-AP"):
assert any("connection delete" in s and profile in s for s in cmd_strs), \
f"Expected 'nmcli connection delete {profile}' before creating the new profile"
add_indices = [i for i, s in enumerate(cmd_strs) if "connection add" in s]
del_indices = [i for i, s in enumerate(cmd_strs) if "connection delete" in s]
assert add_indices, "Expected 'nmcli connection add' call"
assert del_indices, "Expected 'nmcli connection delete' calls"
assert max(del_indices) < min(add_indices), \
"All connection deletions must complete before the new profile is created"

View File

@@ -2,7 +2,6 @@ from flask import Blueprint, request, jsonify, Response, send_from_directory
import json
import os
import re
import shutil
import socket
import sys
import subprocess
@@ -17,11 +16,6 @@ from typing import Optional, Tuple, Dict, Any, Type
logger = logging.getLogger(__name__)
SUDO_BIN = shutil.which("sudo") or "/usr/bin/sudo"
SYSTEMCTL_BIN = shutil.which("systemctl") or "/usr/bin/systemctl"
REBOOT_BIN = shutil.which("reboot") or "/usr/sbin/reboot"
POWEROFF_BIN = shutil.which("poweroff") or "/usr/sbin/poweroff"
# Import new infrastructure
from src.web_interface.api_helpers import success_response, error_response, validate_request_json
from src.web_interface.errors import ErrorCode
@@ -224,7 +218,7 @@ def _ensure_display_service_running():
if status.get('active'):
status['started'] = False
return status
result = _run_systemctl_command([SUDO_BIN, SYSTEMCTL_BIN, 'start', 'ledmatrix.service'])
result = _run_systemctl_command(['sudo', 'systemctl', 'start', 'ledmatrix'])
service_status = _get_display_service_status()
result['started'] = result.get('returncode') == 0
result['active'] = service_status.get('active')
@@ -233,7 +227,7 @@ def _ensure_display_service_running():
def _stop_display_service():
"""Stop the ledmatrix display service."""
result = _run_systemctl_command([SUDO_BIN, SYSTEMCTL_BIN, 'stop', 'ledmatrix.service'])
result = _run_systemctl_command(['sudo', 'systemctl', 'stop', 'ledmatrix'])
status = _get_display_service_status()
result['active'] = status.get('active')
result['status'] = status
@@ -1722,34 +1716,33 @@ def execute_system_action():
if mode:
# For on-demand modes, we would need to integrate with the display controller
# For now, just start the display service
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'start', 'ledmatrix.service'],
capture_output=True, text=True, timeout=15)
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
capture_output=True, text=True)
return jsonify({
'status': 'success' if result.returncode == 0 else 'error',
'message': f'Started display in {mode} mode' if result.returncode == 0
else f'Failed to start display in {mode} mode: {result.stderr.strip() or "check sudo systemctl status ledmatrix.service"}',
'message': f'Started display in {mode} mode',
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
})
else:
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'start', 'ledmatrix.service'],
capture_output=True, text=True, timeout=15)
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
capture_output=True, text=True)
elif action == 'stop_display':
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'stop', 'ledmatrix.service'],
capture_output=True, text=True, timeout=15)
result = subprocess.run(['sudo', 'systemctl', 'stop', 'ledmatrix'],
capture_output=True, text=True)
elif action == 'enable_autostart':
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'enable', 'ledmatrix.service'],
capture_output=True, text=True, timeout=15)
result = subprocess.run(['sudo', 'systemctl', 'enable', 'ledmatrix'],
capture_output=True, text=True)
elif action == 'disable_autostart':
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'disable', 'ledmatrix.service'],
capture_output=True, text=True, timeout=15)
result = subprocess.run(['sudo', 'systemctl', 'disable', 'ledmatrix'],
capture_output=True, text=True)
elif action == 'reboot_system':
result = subprocess.run([SUDO_BIN, REBOOT_BIN],
capture_output=True, text=True, timeout=10)
result = subprocess.run(['sudo', 'reboot'],
capture_output=True, text=True)
elif action == 'shutdown_system':
result = subprocess.run([SUDO_BIN, POWEROFF_BIN],
capture_output=True, text=True, timeout=10)
result = subprocess.run(['sudo', 'poweroff'],
capture_output=True, text=True)
elif action == 'git_pull':
# Use PROJECT_ROOT instead of hardcoded path
project_dir = str(PROJECT_ROOT)
@@ -1830,11 +1823,12 @@ def execute_system_action():
'stderr': result.stderr
})
elif action == 'restart_display_service':
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'restart', 'ledmatrix.service'],
capture_output=True, text=True, timeout=15)
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'],
capture_output=True, text=True)
elif action == 'restart_web_service':
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'restart', 'ledmatrix-web.service'],
capture_output=True, text=True, timeout=15)
# Try to restart the web service (assuming it's ledmatrix-web.service)
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'],
capture_output=True, text=True)
else:
return jsonify({'status': 'error', 'message': f'Unknown action: {action}'}), 400
@@ -1846,13 +1840,6 @@ def execute_system_action():
'stderr': result.stderr
})
except subprocess.TimeoutExpired:
if action == 'start_display' and mode:
msg = f'Failed to start display in {mode} mode: timed out'
else:
msg = f'Action {action} timed out'
logger.warning("[System] execute_system_action timed out: action=%s", action)
return jsonify({'status': 'error', 'message': msg, 'returncode': -1, 'stdout': '', 'stderr': 'timeout'}), 500
except Exception as e:
logger.exception("[System] execute_system_action failed")
return jsonify({'status': 'error', 'message': 'Failed to execute system action'}), 500
@@ -7146,14 +7133,9 @@ def connect_wifi():
'message': message
})
else:
# Propagate structured error type so the captive portal UI can show
# "Wrong password — try again" instead of a generic failure message.
error_type = "wrong_password" if (message or "").startswith("wrong_password:") else "connection_failed"
clean_message = (message or "").removeprefix("wrong_password:").lstrip() or "Failed to connect to network"
return jsonify({
'status': 'error',
'message': clean_message,
'error_type': error_type
'message': message or 'Failed to connect to network'
}), 400
except Exception as e:
logger.exception("[WiFi] Failed connecting to WiFi network")

File diff suppressed because it is too large Load Diff

View File

@@ -191,10 +191,7 @@ function doConnect() {
// Poll for the new IP
setTimeout(function() { checkNewIP(ssid); }, 3000);
} else {
var msg = data.error_type === 'wrong_password'
? 'Incorrect password — please try again'
: (data.message || 'Connection failed');
showMsg(msg, 'err');
showMsg(data.message || 'Connection failed', 'err');
connecting = false;
btn.disabled = false;
btn.innerHTML = 'Connect';