mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-06-21 20:18:36 +00:00
Compare commits
7 Commits
fix/post-i
...
dbb53da31d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dbb53da31d | ||
|
|
452afacd12 | ||
|
|
3b45a75f75 | ||
|
|
1a0f1c8015 | ||
|
|
b361866679 | ||
|
|
ceb4c4105f | ||
|
|
e9af18cdf1 |
@@ -1,4 +1,5 @@
|
||||
# LEDMatrix
|
||||
[](https://app.codacy.com/gh/ChuckBuilds/LEDMatrix/dashboard?utm_source=gh&utm_medium=referral&utm_content=&utm_campaign=Badge_grade)
|
||||
## Welcome to LEDMatrix!
|
||||
Welcome to the LEDMatrix Project! This open-source project enables you to run an information-rich display on a Raspberry Pi connected to an LED RGB Matrix panel. Whether you want to see your calendar, weather forecasts, sports scores, stock prices, or any other information at a glance, LEDMatrix brings it all together.
|
||||
|
||||
|
||||
@@ -1086,6 +1086,7 @@ SYSTEMCTL_PATH=$(which systemctl)
|
||||
REBOOT_PATH=$(which reboot)
|
||||
POWEROFF_PATH=$(which poweroff)
|
||||
BASH_PATH=$(which bash)
|
||||
JOURNALCTL_PATH=$(which journalctl 2>/dev/null || true)
|
||||
|
||||
# Create sudoers content
|
||||
cat > /tmp/ledmatrix_web_sudoers << EOF
|
||||
@@ -1101,10 +1102,23 @@ $ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH enable ledmatrix.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH disable ledmatrix.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH status ledmatrix.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH start ledmatrix-web.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH stop ledmatrix-web.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix-web.service
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $PYTHON_PATH $PROJECT_ROOT_DIR/display_controller.py
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $BASH_PATH $PROJECT_ROOT_DIR/start_display.sh
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $BASH_PATH $PROJECT_ROOT_DIR/stop_display.sh
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $BASH_PATH $PROJECT_ROOT_DIR/scripts/fix_perms/safe_plugin_rm.sh *
|
||||
EOF
|
||||
if [ -n "$JOURNALCTL_PATH" ]; then
|
||||
cat >> /tmp/ledmatrix_web_sudoers << EOF
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $JOURNALCTL_PATH -u ledmatrix.service *
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $JOURNALCTL_PATH -u ledmatrix *
|
||||
$ACTUAL_USER ALL=(ALL) NOPASSWD: $JOURNALCTL_PATH -t ledmatrix *
|
||||
EOF
|
||||
fi
|
||||
|
||||
if [ -f "$SUDOERS_FILE" ] && cmp -s /tmp/ledmatrix_web_sudoers "$SUDOERS_FILE"; then
|
||||
echo "Sudoers configuration already up to date"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
requests>=2.28.0
|
||||
Pillow>=9.1.0
|
||||
Pillow>=12.2.0
|
||||
pytz>=2022.1
|
||||
numpy>=1.24.0
|
||||
|
||||
@@ -118,7 +118,7 @@ total_count=${#ARCHITECTURES[@]}
|
||||
|
||||
for arch in "${!ARCHITECTURES[@]}"; do
|
||||
if download_binary "$arch" "${ARCHITECTURES[$arch]}"; then
|
||||
((success_count++))
|
||||
success_count=$((success_count + 1))
|
||||
fi
|
||||
done
|
||||
|
||||
|
||||
@@ -89,9 +89,9 @@ TEMP_SUDOERS="/tmp/ledmatrix_web_sudoers_$$"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH status ledmatrix.service"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH is-active ledmatrix.service"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH start ledmatrix-web"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH stop ledmatrix-web"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix-web"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH start ledmatrix-web.service"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH stop ledmatrix-web.service"
|
||||
echo "$WEB_USER ALL=(ALL) NOPASSWD: $SYSTEMCTL_PATH restart ledmatrix-web.service"
|
||||
|
||||
# Optional: journalctl (non-critical — skip if not found)
|
||||
if [ -n "$JOURNALCTL_PATH" ]; then
|
||||
|
||||
@@ -10,6 +10,7 @@ import sys
|
||||
import time
|
||||
import logging
|
||||
import signal
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
# Add project root to path (parent of scripts/utils/)
|
||||
@@ -43,6 +44,10 @@ class WiFiMonitorDaemon:
|
||||
self.wifi_manager = WiFiManager()
|
||||
self.running = True
|
||||
self.last_state = None
|
||||
# Counts consecutive checks where nmcli says "connected" but internet is unreachable.
|
||||
# After _nm_restart_threshold failures, NetworkManager is restarted as a recovery step.
|
||||
self._consecutive_internet_failures = 0
|
||||
self._nm_restart_threshold = 5 # ~2.5 min at 30s interval
|
||||
|
||||
# Register signal handlers for graceful shutdown
|
||||
signal.signal(signal.SIGINT, self._signal_handler)
|
||||
@@ -122,6 +127,43 @@ class WiFiMonitorDaemon:
|
||||
else:
|
||||
logger.debug(f"Status check: WiFi=disconnected, Ethernet={updated_ethernet}, AP={updated_status.ap_mode_active}")
|
||||
|
||||
# Escalating recovery: if nmcli reports connected but actual internet
|
||||
# is unreachable for several consecutive checks, restart NetworkManager.
|
||||
# This is done HERE (not inside check_and_manage_ap_mode) to keep the
|
||||
# AP-enable trigger clean and avoid false-positive AP enables from
|
||||
# transient packet loss on otherwise working WiFi.
|
||||
if updated_status.connected and not updated_status.ap_mode_active:
|
||||
if not self.wifi_manager.check_internet_connectivity():
|
||||
self._consecutive_internet_failures += 1
|
||||
logger.warning(
|
||||
f"Internet unreachable despite nmcli connection "
|
||||
f"({self._consecutive_internet_failures}/{self._nm_restart_threshold})"
|
||||
)
|
||||
if self._consecutive_internet_failures >= self._nm_restart_threshold:
|
||||
logger.warning("Restarting NetworkManager to recover internet connectivity")
|
||||
try:
|
||||
subprocess.run(
|
||||
["/usr/bin/systemctl", "restart", "NetworkManager"],
|
||||
capture_output=True, timeout=20, check=True
|
||||
)
|
||||
self._consecutive_internet_failures = 0
|
||||
# NM restart causes a brief WiFi drop; reset the AP-mode grace
|
||||
# counter so that transient disconnect doesn't count toward
|
||||
# triggering AP mode.
|
||||
self.wifi_manager._disconnected_checks = 0
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"NetworkManager restart failed (rc={e.returncode}); "
|
||||
"resetting failure counter to avoid tight retry loop")
|
||||
self._consecutive_internet_failures = 0
|
||||
except (subprocess.SubprocessError, OSError) as e:
|
||||
logger.error(f"NetworkManager restart error: {e}; "
|
||||
"resetting failure counter to avoid tight retry loop")
|
||||
self._consecutive_internet_failures = 0
|
||||
else:
|
||||
self._consecutive_internet_failures = 0
|
||||
else:
|
||||
self._consecutive_internet_failures = 0
|
||||
|
||||
# Sleep until next check
|
||||
time.sleep(self.check_interval)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ if os.getenv("EMULATOR", "false") == "true":
|
||||
from RGBMatrixEmulator import RGBMatrix, RGBMatrixOptions
|
||||
else:
|
||||
from rgbmatrix import RGBMatrix, RGBMatrixOptions
|
||||
from contextlib import contextmanager
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
import time
|
||||
from typing import Dict, Any, List, Tuple
|
||||
@@ -28,6 +29,8 @@ class DisplayManager:
|
||||
self.config = config or {}
|
||||
self._force_fallback = force_fallback
|
||||
self._suppress_test_pattern = suppress_test_pattern
|
||||
# When True, update_display() and clear() skip hardware writes (used during off-screen content capture)
|
||||
self._capture_mode_active = False
|
||||
# Snapshot settings for web preview integration (service writes, web reads)
|
||||
self._snapshot_path = "/tmp/led_matrix_preview.png"
|
||||
self._snapshot_min_interval_sec = 0.2 # max ~5 fps
|
||||
@@ -255,6 +258,22 @@ class DisplayManager:
|
||||
except Exception as e:
|
||||
logger.error(f"Error drawing test pattern: {e}", exc_info=True)
|
||||
|
||||
@contextmanager
|
||||
def capture_mode(self):
|
||||
"""Suppress hardware output during off-screen content capture.
|
||||
|
||||
Plugins call update_display() as part of their normal display() flow.
|
||||
When fetching content for Vegas mode the render loop is still running,
|
||||
so any incidental hardware write causes a visible flash on the matrix.
|
||||
Entering this context prevents those writes without affecting the PIL
|
||||
image buffer, which the adapter reads to extract content.
|
||||
"""
|
||||
self._capture_mode_active = True
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
self._capture_mode_active = False
|
||||
|
||||
def update_display(self):
|
||||
"""Update the display using double buffering with proper sync."""
|
||||
try:
|
||||
@@ -265,6 +284,9 @@ class DisplayManager:
|
||||
self._write_snapshot_if_due()
|
||||
return
|
||||
|
||||
if self._capture_mode_active:
|
||||
return # Skip hardware write — content is being captured off-screen
|
||||
|
||||
# Copy the current image to the offscreen canvas
|
||||
self.offscreen_canvas.SetImage(self.image)
|
||||
|
||||
@@ -305,20 +327,22 @@ class DisplayManager:
|
||||
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
|
||||
self.draw = ImageDraw.Draw(self.image)
|
||||
|
||||
# Clear both canvases and the underlying matrix to ensure no artifacts
|
||||
if not self._capture_mode_active:
|
||||
# Clear both canvases and the underlying matrix to ensure no artifacts.
|
||||
# Failures are non-fatal — the image buffer is already black above, so
|
||||
# the next update_display() call will push clean content regardless.
|
||||
try:
|
||||
self.offscreen_canvas.Clear()
|
||||
except Exception:
|
||||
pass
|
||||
except (RuntimeError, OSError) as e:
|
||||
logger.error("Failed to clear offscreen canvas: %s", e)
|
||||
try:
|
||||
self.current_canvas.Clear()
|
||||
except Exception:
|
||||
pass
|
||||
except (RuntimeError, OSError) as e:
|
||||
logger.error("Failed to clear current canvas: %s", e)
|
||||
try:
|
||||
# Extra safety: clear the matrix front buffer as well
|
||||
self.matrix.Clear()
|
||||
except Exception:
|
||||
pass
|
||||
except (RuntimeError, OSError) as e:
|
||||
logger.error("Failed to clear matrix front buffer: %s", e)
|
||||
|
||||
# Note: We do NOT call update_display() here to avoid black flashes.
|
||||
# The caller should call update_display() after drawing new content.
|
||||
|
||||
@@ -329,6 +329,7 @@ class PluginAdapter:
|
||||
# Save display state to restore after
|
||||
original_image = self.display_manager.image.copy()
|
||||
|
||||
with self.display_manager.capture_mode():
|
||||
# Method 1: Try _create_scrolling_display (stocks pattern)
|
||||
if hasattr(plugin, '_create_scrolling_display'):
|
||||
logger.info(
|
||||
@@ -408,10 +409,7 @@ class PluginAdapter:
|
||||
original_image = self.display_manager.image.copy()
|
||||
logger.info("[%s] Fallback: saved original display state", plugin_id)
|
||||
|
||||
# Lightweight in-memory data refresh before capturing.
|
||||
# Full update() is intentionally skipped here — the background
|
||||
# update tick in the Vegas coordinator handles periodic API
|
||||
# refreshes so we don't block the content-fetch thread.
|
||||
# Ensure plugin has fresh data before capturing
|
||||
has_update_data = hasattr(plugin, 'update_data')
|
||||
logger.info("[%s] Fallback: has update_data=%s", plugin_id, has_update_data)
|
||||
if has_update_data:
|
||||
@@ -421,7 +419,9 @@ class PluginAdapter:
|
||||
except (AttributeError, RuntimeError, OSError):
|
||||
logger.exception("[%s] Fallback: update_data() failed", plugin_id)
|
||||
|
||||
# Clear and call plugin display
|
||||
# Clear and call plugin display — use capture_mode to suppress hardware writes
|
||||
# that plugins may trigger internally via update_display().
|
||||
with self.display_manager.capture_mode():
|
||||
self.display_manager.clear()
|
||||
logger.info("[%s] Fallback: display cleared, calling display()", plugin_id)
|
||||
|
||||
@@ -436,6 +436,7 @@ class PluginAdapter:
|
||||
|
||||
# Capture the result
|
||||
captured = self.display_manager.image.copy()
|
||||
|
||||
logger.info(
|
||||
"[%s] Fallback: captured frame %dx%d, mode=%s",
|
||||
plugin_id, captured.width, captured.height, captured.mode
|
||||
@@ -454,6 +455,7 @@ class PluginAdapter:
|
||||
plugin_id
|
||||
)
|
||||
# Try once more with force_clear=True
|
||||
with self.display_manager.capture_mode():
|
||||
self.display_manager.clear()
|
||||
plugin.display(force_clear=True)
|
||||
captured = self.display_manager.image.copy()
|
||||
@@ -585,28 +587,6 @@ class PluginAdapter:
|
||||
else:
|
||||
self._content_cache.clear()
|
||||
|
||||
def invalidate_plugin_scroll_cache(self, plugin: 'BasePlugin', plugin_id: str) -> None:
|
||||
"""
|
||||
Clear a plugin's scroll_helper cache so Vegas re-fetches fresh visuals.
|
||||
|
||||
Uses scroll_helper.clear_cache() to reset all cached state (cached_image,
|
||||
cached_array, total_scroll_width, scroll_position, etc.) — not just the
|
||||
image. Without this, plugins that use scroll_helper (stocks, news,
|
||||
odds-ticker, etc.) would keep serving stale scroll images even after
|
||||
their data refreshes.
|
||||
|
||||
Args:
|
||||
plugin: Plugin instance
|
||||
plugin_id: Plugin identifier
|
||||
"""
|
||||
scroll_helper = getattr(plugin, 'scroll_helper', None)
|
||||
if scroll_helper is None:
|
||||
return
|
||||
|
||||
if getattr(scroll_helper, 'cached_image', None) is not None:
|
||||
scroll_helper.clear_cache()
|
||||
logger.debug("[%s] Cleared scroll_helper cache", plugin_id)
|
||||
|
||||
def get_content_type(self, plugin: 'BasePlugin', plugin_id: str) -> str:
|
||||
"""
|
||||
Get the type of content a plugin provides.
|
||||
|
||||
@@ -202,8 +202,25 @@ class RenderPipeline:
|
||||
# Update scroll position
|
||||
self.scroll_helper.update_scroll_position()
|
||||
|
||||
# Check if cycle is complete
|
||||
if self.scroll_helper.is_scroll_complete():
|
||||
# Determine if the cycle is done.
|
||||
#
|
||||
# scroll_helper considers a cycle complete only after
|
||||
# total_distance_scrolled >= total_scroll_width + display_width.
|
||||
# That extra display_width of travel causes a "wrap-around" phase
|
||||
# where scroll_position resets to ~0 and the first plugin's content
|
||||
# re-enters from the right — the user sees this 2-3 s window as
|
||||
# "a plugin partially displaying before the next one starts."
|
||||
#
|
||||
# We end the cycle as soon as total_distance_scrolled reaches
|
||||
# total_scroll_width (the wrap-around point), before any second-pass
|
||||
# content becomes visible. scroll_helper.is_scroll_complete() is
|
||||
# kept as a fallback for edge-cases where that threshold is skipped.
|
||||
at_wrap_point = (
|
||||
not self._cycle_complete and
|
||||
self.scroll_helper.total_distance_scrolled >= self.scroll_helper.total_scroll_width
|
||||
)
|
||||
|
||||
if at_wrap_point or self.scroll_helper.is_scroll_complete():
|
||||
if not self._cycle_complete:
|
||||
self._cycle_complete = True
|
||||
self.stats['scroll_cycles'] += 1
|
||||
@@ -211,6 +228,20 @@ class RenderPipeline:
|
||||
"Scroll cycle complete after %.1fs",
|
||||
time.time() - self._cycle_start_time
|
||||
)
|
||||
# Push blank immediately so the hardware never shows
|
||||
# post-wrap content while the coordinator recomposes.
|
||||
try:
|
||||
from PIL import Image as _Image
|
||||
blank = _Image.new('RGB', (self.display_width, self.display_height))
|
||||
self.display_manager.image = blank
|
||||
self.display_manager.update_display()
|
||||
except (ImportError, OSError, RuntimeError, ValueError, TypeError, MemoryError) as exc:
|
||||
logger.error(
|
||||
"Failed to push blank frame at cycle end "
|
||||
"(display=%dx%d): %s",
|
||||
self.display_width, self.display_height, exc
|
||||
)
|
||||
return True # Cycle done; coordinator starts new cycle next frame
|
||||
|
||||
# Get visible portion
|
||||
visible_frame = self.scroll_helper.get_visible_portion()
|
||||
|
||||
@@ -60,12 +60,16 @@ def get_wifi_config_path():
|
||||
|
||||
HOSTAPD_CONFIG_PATH = Path("/etc/hostapd/hostapd.conf")
|
||||
DNSMASQ_CONFIG_PATH = Path("/etc/dnsmasq.d/ledmatrix-captive.conf")
|
||||
# Drop-in config for NetworkManager's built-in dnsmasq (ipv4.method=shared).
|
||||
# Writing address=/#/<ap_ip> here causes NM to resolve every hostname to the AP,
|
||||
# triggering the OS captive-portal popup automatically on iOS/Android/Windows/macOS.
|
||||
NM_DNSMASQ_SHARED_DIR = Path("/etc/NetworkManager/dnsmasq-shared.d")
|
||||
NM_DNSMASQ_SHARED_CONF = NM_DNSMASQ_SHARED_DIR / "ledmatrix-captive.conf"
|
||||
HOSTAPD_SERVICE = "hostapd"
|
||||
DNSMASQ_SERVICE = "dnsmasq"
|
||||
|
||||
# Default AP settings
|
||||
DEFAULT_AP_SSID = "LEDMatrix-Setup"
|
||||
DEFAULT_AP_PASSWORD = "ledmatrix123"
|
||||
DEFAULT_AP_CHANNEL = 7
|
||||
|
||||
# LED status message file (for display_controller integration)
|
||||
@@ -138,6 +142,11 @@ class WiFiManager:
|
||||
self._disconnected_checks = 0
|
||||
self._disconnected_checks_required = 3 # Require 3 consecutive disconnected checks (90 seconds at 30s interval)
|
||||
|
||||
# Timestamp set when AP mode is enabled; used for the idle-timeout check
|
||||
self._ap_enabled_at: Optional[float] = None
|
||||
# Which redirect backend was used (iptables/nftables/None); set per-instance
|
||||
self._redirect_backend: Optional[str] = None
|
||||
|
||||
logger.info(f"WiFi Manager initialized - nmcli: {self.has_nmcli}, iwlist: {self.has_iwlist}, "
|
||||
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
|
||||
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
|
||||
@@ -201,6 +210,24 @@ class WiFiManager:
|
||||
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
||||
return False
|
||||
|
||||
def _find_command_path(self, command: str) -> Optional[str]:
|
||||
"""
|
||||
Return the absolute path of a command, checking sbin locations that may not
|
||||
be on PATH in restricted service environments. Returns None if not found.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(["which", command], capture_output=True,
|
||||
text=True, timeout=2)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return result.stdout.strip()
|
||||
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
for path in [f"/usr/sbin/{command}", f"/sbin/{command}",
|
||||
f"/usr/local/sbin/{command}"]:
|
||||
if os.path.isfile(path) and os.access(path, os.X_OK):
|
||||
return path
|
||||
return None
|
||||
|
||||
def _discover_wifi_interface(self) -> str:
|
||||
"""
|
||||
Discover the primary WiFi interface name dynamically.
|
||||
@@ -303,7 +330,6 @@ class WiFiManager:
|
||||
else:
|
||||
self.config = {
|
||||
"ap_ssid": DEFAULT_AP_SSID,
|
||||
"ap_password": DEFAULT_AP_PASSWORD,
|
||||
"ap_channel": DEFAULT_AP_CHANNEL,
|
||||
"auto_enable_ap_mode": True, # Default: auto-enable when no network (safe due to grace period)
|
||||
"saved_networks": []
|
||||
@@ -659,6 +685,285 @@ class WiFiManager:
|
||||
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
||||
return False
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved")
|
||||
|
||||
def _validate_ap_config(self) -> Tuple[str, int]:
|
||||
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
|
||||
ssid = str(self.config.get("ap_ssid", DEFAULT_AP_SSID))
|
||||
if not ssid or len(ssid) > 32 or not re.match(r'^[\x20-\x7E]+$', ssid):
|
||||
logger.warning(f"AP SSID '{ssid}' is invalid, falling back to default")
|
||||
ssid = DEFAULT_AP_SSID
|
||||
try:
|
||||
channel = int(self.config.get("ap_channel", DEFAULT_AP_CHANNEL))
|
||||
if channel < 1 or channel > 14:
|
||||
raise ValueError
|
||||
except (TypeError, ValueError):
|
||||
logger.warning("AP channel out of range, falling back to default")
|
||||
channel = DEFAULT_AP_CHANNEL
|
||||
return ssid, channel
|
||||
|
||||
def _setup_iptables_redirect(self) -> bool:
|
||||
"""
|
||||
Add port 80 → 5000 redirect rules for the captive portal.
|
||||
|
||||
Tries iptables first, falls back to nftables (used by Debian Trixie).
|
||||
When neither tool is available, logs a warning and returns True — the AP
|
||||
still works and DNS spoofing still triggers the OS popup; users just land
|
||||
on port 5000 directly rather than being redirected from port 80.
|
||||
|
||||
Only returns False when a tool was found but the rule addition itself failed.
|
||||
"""
|
||||
try:
|
||||
iptables = self._find_command_path("iptables")
|
||||
nft = self._find_command_path("nft")
|
||||
|
||||
if not iptables and not nft:
|
||||
logger.warning(
|
||||
"Neither iptables nor nft found; captive portal port-80 redirect unavailable. "
|
||||
"DNS spoofing will still trigger the OS popup but HTTP on port 80 won't reach Flask."
|
||||
)
|
||||
self._redirect_backend = None
|
||||
return True # AP works; redirect is best-effort
|
||||
|
||||
if iptables:
|
||||
return self._setup_iptables_redirect_iptables(iptables)
|
||||
else:
|
||||
return self._setup_iptables_redirect_nftables(nft)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not set up port redirect: {e}")
|
||||
try:
|
||||
self._teardown_iptables_redirect()
|
||||
except Exception as cleanup_e:
|
||||
logger.warning(f"Cleanup after redirect exception also failed: {cleanup_e}")
|
||||
return False
|
||||
|
||||
def _setup_iptables_redirect_iptables(self, iptables: str) -> bool:
|
||||
"""Set up port 80→5000 redirect using iptables."""
|
||||
# Save ip_forward state before enabling
|
||||
try:
|
||||
current_fwd = Path("/proc/sys/net/ipv4/ip_forward").read_text().strip()
|
||||
except OSError:
|
||||
current_fwd = None
|
||||
if current_fwd is not None:
|
||||
try:
|
||||
self._IP_FORWARD_SAVE_PATH.write_text(current_fwd)
|
||||
except OSError:
|
||||
current_fwd = None
|
||||
logger.warning("Could not write ip_forward save file; state will not be restored")
|
||||
|
||||
if current_fwd != "1":
|
||||
sysctl = self._find_command_path("sysctl")
|
||||
sysctl_bin = sysctl if sysctl else "sysctl"
|
||||
r = subprocess.run(["sudo", sysctl_bin, "-w", "net.ipv4.ip_forward=1"],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if r.returncode != 0:
|
||||
logger.error(f"Failed to enable ip_forward: {r.stderr.strip()}")
|
||||
self._teardown_iptables_redirect()
|
||||
return False
|
||||
|
||||
if subprocess.run(
|
||||
["sudo", iptables, "-t", "nat", "-C", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
).returncode != 0:
|
||||
r = subprocess.run(
|
||||
["sudo", iptables, "-t", "nat", "-A", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if r.returncode != 0:
|
||||
logger.error(f"Failed to add PREROUTING rule: {r.stderr.strip()}")
|
||||
self._teardown_iptables_redirect()
|
||||
return False
|
||||
|
||||
if subprocess.run(
|
||||
["sudo", iptables, "-C", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000", "-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
).returncode != 0:
|
||||
r = subprocess.run(
|
||||
["sudo", iptables, "-A", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000", "-j", "ACCEPT"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if r.returncode != 0:
|
||||
logger.error(f"Failed to add INPUT rule: {r.stderr.strip()}")
|
||||
self._teardown_iptables_redirect()
|
||||
return False
|
||||
|
||||
self._redirect_backend = "iptables"
|
||||
logger.info("iptables: port 80→5000 redirect rules added")
|
||||
return True
|
||||
|
||||
def _setup_iptables_redirect_nftables(self, nft: str) -> bool:
|
||||
"""Set up port 80→5000 redirect using nftables (Debian Trixie / modern systems)."""
|
||||
# NM's ipv4.method=shared already enables ip_forward; no sysctl needed.
|
||||
cmds = [
|
||||
["sudo", nft, "add", "table", "ip", "ledmatrix"],
|
||||
["sudo", nft, "add", "chain", "ip", "ledmatrix", "prerouting",
|
||||
"{", "type", "nat", "hook", "prerouting", "priority", "-100", ";", "}"],
|
||||
["sudo", nft, "add", "rule", "ip", "ledmatrix", "prerouting",
|
||||
"iif", self._wifi_interface, "tcp", "dport", "80", "redirect", "to", ":5000"],
|
||||
]
|
||||
for cmd in cmds:
|
||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
||||
if r.returncode != 0:
|
||||
# Table/chain may already exist — only fail on rule add
|
||||
if "add rule" in " ".join(cmd):
|
||||
logger.error(f"Failed to add nftables redirect rule: {r.stderr.strip()}")
|
||||
self._teardown_iptables_redirect()
|
||||
return False
|
||||
logger.debug(f"nft cmd non-zero (may already exist): {r.stderr.strip()}")
|
||||
|
||||
self._redirect_backend = "nftables"
|
||||
logger.info("nftables: port 80→5000 redirect rule added")
|
||||
return True
|
||||
|
||||
def _teardown_iptables_redirect(self) -> None:
|
||||
"""Remove the port 80→5000 redirect rules and restore ip_forward if saved."""
|
||||
try:
|
||||
backend = self._redirect_backend
|
||||
self._redirect_backend = None
|
||||
|
||||
if backend == "iptables":
|
||||
iptables = self._find_command_path("iptables")
|
||||
if iptables:
|
||||
subprocess.run(
|
||||
["sudo", iptables, "-t", "nat", "-D", "PREROUTING",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
||||
"-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
subprocess.run(
|
||||
["sudo", iptables, "-D", "INPUT",
|
||||
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
|
||||
"-j", "ACCEPT"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
# Restore ip_forward only when we saved it
|
||||
if self._IP_FORWARD_SAVE_PATH.exists():
|
||||
try:
|
||||
saved = self._IP_FORWARD_SAVE_PATH.read_text().strip()
|
||||
self._IP_FORWARD_SAVE_PATH.unlink(missing_ok=True)
|
||||
sysctl = self._find_command_path("sysctl")
|
||||
sysctl_bin = sysctl if sysctl else "sysctl"
|
||||
subprocess.run(["sudo", sysctl_bin, "-w", f"net.ipv4.ip_forward={saved}"],
|
||||
capture_output=True, timeout=5)
|
||||
logger.info(f"ip_forward restored to {saved}")
|
||||
except OSError as e:
|
||||
logger.warning(f"Could not restore ip_forward: {e}")
|
||||
else:
|
||||
logger.debug("ip_forward not modified by setup; leaving unchanged")
|
||||
|
||||
elif backend == "nftables":
|
||||
nft = self._find_command_path("nft")
|
||||
if nft:
|
||||
subprocess.run(
|
||||
["sudo", nft, "delete", "table", "ip", "ledmatrix"],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
logger.info("nftables ledmatrix table removed")
|
||||
|
||||
else:
|
||||
# No redirect was set up (neither tool available); nothing to tear down
|
||||
self._IP_FORWARD_SAVE_PATH.unlink(missing_ok=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not tear down port redirect: {e}")
|
||||
|
||||
def _write_nm_dnsmasq_captive_conf(self, ap_ip: str = "192.168.4.1") -> None:
|
||||
"""
|
||||
Write the NM dnsmasq-shared.d drop-in that makes NM's built-in dnsmasq
|
||||
resolve every hostname to the AP IP. This triggers the OS captive-portal
|
||||
popup automatically on iOS / Android / Windows / macOS as soon as the
|
||||
device connects — no manual navigation required.
|
||||
|
||||
NetworkManager reads /etc/NetworkManager/dnsmasq-shared.d/*.conf when it
|
||||
starts the dnsmasq instance for ipv4.method=shared connections.
|
||||
"""
|
||||
try:
|
||||
content = f"# LEDMatrix captive portal: resolve all hostnames to AP\naddress=/#/{ap_ip}\n"
|
||||
with open("/tmp/ledmatrix-nm-dnsmasq.conf", "w") as f:
|
||||
f.write(content)
|
||||
subprocess.run(
|
||||
["sudo", "mkdir", "-p", str(NM_DNSMASQ_SHARED_DIR)],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
subprocess.run(
|
||||
["sudo", "cp", "/tmp/ledmatrix-nm-dnsmasq.conf", str(NM_DNSMASQ_SHARED_CONF)],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
logger.info(f"Wrote NM dnsmasq captive-portal config: {NM_DNSMASQ_SHARED_CONF}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not write NM dnsmasq captive config: {e}")
|
||||
|
||||
def _remove_nm_dnsmasq_captive_conf(self) -> None:
|
||||
"""Remove the NM dnsmasq-shared.d drop-in written by _write_nm_dnsmasq_captive_conf."""
|
||||
try:
|
||||
subprocess.run(
|
||||
["sudo", "rm", "-f", str(NM_DNSMASQ_SHARED_CONF)],
|
||||
capture_output=True, timeout=5
|
||||
)
|
||||
logger.info("Removed NM dnsmasq captive-portal config")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not remove NM dnsmasq captive config: {e}")
|
||||
|
||||
def _check_internet_connectivity(self, timeout: int = 5) -> bool:
|
||||
"""
|
||||
Test actual internet reachability — not just nmcli association state.
|
||||
|
||||
A device can be 'connected' in nmcli (associated with an AP) while the
|
||||
router has no WAN link. This check catches that case so the daemon can
|
||||
auto-enable AP mode even when nmcli reports a connection.
|
||||
|
||||
Returns True if at least one reachability method succeeds.
|
||||
"""
|
||||
try:
|
||||
r = subprocess.run(
|
||||
["ping", "-c", "1", "-W", str(timeout), "8.8.8.8"],
|
||||
capture_output=True, timeout=timeout + 1
|
||||
)
|
||||
if r.returncode == 0:
|
||||
logger.debug("Internet connectivity confirmed via ping 8.8.8.8")
|
||||
return True
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
try:
|
||||
import urllib.request as _ureq
|
||||
_ureq.urlopen("http://connectivity-check.ubuntu.com/", timeout=timeout)
|
||||
logger.debug("Internet connectivity confirmed via HTTP check")
|
||||
return True
|
||||
except OSError:
|
||||
pass
|
||||
logger.debug("Internet connectivity check failed (both ping and HTTP)")
|
||||
return False
|
||||
|
||||
def check_internet_connectivity(self, timeout: int = 5) -> bool:
|
||||
"""Public wrapper around _check_internet_connectivity for use by the daemon."""
|
||||
return self._check_internet_connectivity(timeout=timeout)
|
||||
|
||||
def _has_ap_clients(self) -> bool:
|
||||
"""
|
||||
Return True if at least one client is associated with the AP.
|
||||
Uses 'iw dev <iface> station dump' which works for both hostapd and
|
||||
nmcli AP modes.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["iw", "dev", self._wifi_interface, "station", "dump"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def scan_networks(self, allow_cached: bool = True) -> Tuple[List[WiFiNetwork], bool]:
|
||||
"""
|
||||
Scan for available WiFi networks.
|
||||
@@ -1293,12 +1598,27 @@ class WiFiManager:
|
||||
error_msg = result.stderr.strip() or result.stdout.strip()
|
||||
logger.error(f"Failed to connect to {ssid}: {error_msg}")
|
||||
self._show_led_message("Connection failed", duration=5)
|
||||
if self._is_wrong_password_error(error_msg):
|
||||
return False, f"wrong_password: {error_msg}"
|
||||
return False, error_msg
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting with nmcli: {e}")
|
||||
self._show_led_message("Connection error", duration=5)
|
||||
return False, str(e)
|
||||
|
||||
@staticmethod
|
||||
def _is_wrong_password_error(error_msg: str) -> bool:
|
||||
"""Return True when nmcli's error output indicates an authentication failure."""
|
||||
indicators = [
|
||||
"secrets were required",
|
||||
"no secret agent",
|
||||
"802-11-wireless-security.psk",
|
||||
"authentication rejected",
|
||||
"association rejected",
|
||||
]
|
||||
lower = error_msg.lower()
|
||||
return any(ind in lower for ind in indicators)
|
||||
|
||||
def _connect_wpa_supplicant(self, ssid: str, password: str) -> Tuple[bool, str]:
|
||||
"""Connect using wpa_supplicant (fallback)"""
|
||||
try:
|
||||
@@ -1570,13 +1890,17 @@ class WiFiManager:
|
||||
if self.has_hostapd and self.has_dnsmasq:
|
||||
result = self._enable_ap_mode_hostapd()
|
||||
if result[0]:
|
||||
self._ap_enabled_at = time.time()
|
||||
return result
|
||||
|
||||
# Fallback to nmcli hotspot (simpler, no captive portal)
|
||||
if self.has_nmcli:
|
||||
logger.info("hostapd/dnsmasq failed or unavailable, trying nmcli hotspot fallback...")
|
||||
self._show_led_message("Setup Mode", duration=5)
|
||||
return self._enable_ap_mode_nmcli_hotspot()
|
||||
result = self._enable_ap_mode_nmcli_hotspot()
|
||||
if result[0]:
|
||||
self._ap_enabled_at = time.time()
|
||||
return result
|
||||
|
||||
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
|
||||
except Exception as e:
|
||||
@@ -1649,63 +1973,21 @@ class WiFiManager:
|
||||
subprocess.run(["sudo", "systemctl", "stop", HOSTAPD_SERVICE], timeout=5)
|
||||
return False, f"Failed to start dnsmasq: {result.stderr}"
|
||||
|
||||
# Set up iptables port forwarding: redirect port 80 to 5000
|
||||
# This makes the captive portal work on standard HTTP port
|
||||
try:
|
||||
# Check if iptables is available
|
||||
iptables_check = subprocess.run(
|
||||
["which", "iptables"],
|
||||
capture_output=True,
|
||||
timeout=2
|
||||
)
|
||||
|
||||
if iptables_check.returncode == 0:
|
||||
# Enable IP forwarding (needed for NAT)
|
||||
subprocess.run(
|
||||
["sudo", "sysctl", "-w", "net.ipv4.ip_forward=1"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Add NAT rule to redirect port 80 to 5000 on WiFi interface
|
||||
# First check if rule already exists
|
||||
check_result = subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-C", "PREROUTING", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
if check_result.returncode != 0:
|
||||
# Rule doesn't exist, add it
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-A", "PREROUTING", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
logger.info("Added iptables rule to redirect port 80 to 5000")
|
||||
|
||||
# Also allow incoming connections on port 80
|
||||
check_input = subprocess.run(
|
||||
["sudo", "iptables", "-C", "INPUT", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "ACCEPT"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
if check_input.returncode != 0:
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-A", "INPUT", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "ACCEPT"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
else:
|
||||
logger.debug("iptables not available, port forwarding not set up")
|
||||
logger.info("Note: Port 80 forwarding requires iptables. Users will need to access port 5000 directly.")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not set up iptables port forwarding: {e}")
|
||||
# Continue anyway - port 5000 will still work
|
||||
# Set up iptables port forwarding (port 80 → 5000) and save ip_forward state
|
||||
if not self._setup_iptables_redirect():
|
||||
logger.error("Captive-portal redirect setup failed; stopping AP services")
|
||||
subprocess.run(["sudo", "systemctl", "stop", HOSTAPD_SERVICE],
|
||||
capture_output=True, timeout=10)
|
||||
subprocess.run(["sudo", "systemctl", "stop", DNSMASQ_SERVICE],
|
||||
capture_output=True, timeout=10)
|
||||
return False, "AP started but captive-portal redirect setup failed"
|
||||
|
||||
logger.info("AP mode enabled successfully")
|
||||
self._show_led_message("Setup Mode Active", duration=5)
|
||||
# Use the validated SSID so the displayed name matches what hostapd broadcast
|
||||
ap_ssid, _ = self._validate_ap_config()
|
||||
self._show_led_message(
|
||||
f"WiFi Setup\n{ap_ssid}\nNo password\n192.168.4.1:5000", duration=10
|
||||
)
|
||||
return True, "AP mode enabled"
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting AP services: {e}")
|
||||
@@ -1716,245 +1998,120 @@ class WiFiManager:
|
||||
|
||||
def _enable_ap_mode_nmcli_hotspot(self) -> Tuple[bool, str]:
|
||||
"""
|
||||
Enable AP mode using nmcli hotspot.
|
||||
Enable AP mode using nmcli as an open (passwordless) access point.
|
||||
|
||||
This method is optimized for both Bookworm and Trixie:
|
||||
- Trixie: Uses Netplan, connections stored in /run/NetworkManager/system-connections
|
||||
- Bookworm: Traditional NetworkManager, connections in /etc/NetworkManager/system-connections
|
||||
Uses 'nmcli connection add type wifi 802-11-wireless.mode ap' instead of
|
||||
'nmcli device wifi hotspot' because the hotspot subcommand always creates a
|
||||
WPA2-protected network on Bookworm/Trixie and silently ignores attempts to
|
||||
strip security after creation.
|
||||
|
||||
On Trixie, we also disable PMF (Protected Management Frames) which can cause
|
||||
connection issues with certain WiFi adapters and clients.
|
||||
Tested for both Bookworm and Trixie (Netplan-based NetworkManager).
|
||||
"""
|
||||
try:
|
||||
# Stop any existing connection
|
||||
self.disconnect_from_network()
|
||||
time.sleep(1)
|
||||
|
||||
# Delete any existing hotspot connections (more thorough cleanup)
|
||||
# First, list all connections to find any with the same SSID or hotspot-related ones
|
||||
ap_ssid = self.config.get("ap_ssid", DEFAULT_AP_SSID)
|
||||
result = subprocess.run(
|
||||
["nmcli", "-t", "-f", "NAME,TYPE,802-11-wireless.ssid", "connection", "show"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10
|
||||
)
|
||||
if result.returncode == 0:
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
if ':' in line:
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2:
|
||||
conn_name = parts[0].strip()
|
||||
conn_type = parts[1].strip().lower() if len(parts) > 1 else ""
|
||||
conn_ssid = parts[2].strip() if len(parts) > 2 else ""
|
||||
ap_ssid, ap_channel = self._validate_ap_config()
|
||||
|
||||
# Delete if:
|
||||
# 1. It's a hotspot type
|
||||
# 2. It has the same SSID as our AP
|
||||
# 3. It matches our known connection names
|
||||
should_delete = (
|
||||
'hotspot' in conn_type or
|
||||
conn_ssid == ap_ssid or
|
||||
'hotspot' in conn_name.lower() or
|
||||
conn_name in ["Hotspot", "LEDMatrix-Setup-AP", "TickerSetup-AP"]
|
||||
)
|
||||
|
||||
if should_delete:
|
||||
logger.info(f"Deleting existing connection: {conn_name} (type: {conn_type}, SSID: {conn_ssid})")
|
||||
# First disconnect it if active
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "down", conn_name],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
# Then delete it
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "delete", conn_name],
|
||||
capture_output=True,
|
||||
timeout=10
|
||||
)
|
||||
|
||||
# Also explicitly delete known connection names (in case they weren't caught above)
|
||||
# Delete only the specific application-managed AP profiles by name.
|
||||
# Never delete by SSID — that would destroy a user's saved home network.
|
||||
for conn_name in ["Hotspot", "LEDMatrix-Setup-AP", "TickerSetup-AP"]:
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "down", conn_name],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "delete", conn_name],
|
||||
capture_output=True,
|
||||
timeout=10
|
||||
)
|
||||
subprocess.run(["nmcli", "connection", "down", conn_name],
|
||||
capture_output=True, timeout=5)
|
||||
subprocess.run(["nmcli", "connection", "delete", conn_name],
|
||||
capture_output=True, timeout=10)
|
||||
|
||||
# Wait a moment for deletions to complete
|
||||
time.sleep(1)
|
||||
|
||||
# Get AP settings from config
|
||||
ap_ssid = self.config.get("ap_ssid", DEFAULT_AP_SSID)
|
||||
ap_channel = self.config.get("ap_channel", DEFAULT_AP_CHANNEL)
|
||||
|
||||
# Use nmcli hotspot command (simpler, works with Broadcom chips)
|
||||
# Open network (no password) for easy setup access
|
||||
logger.info(f"Creating open hotspot with nmcli: {ap_ssid} on {self._wifi_interface} (no password)")
|
||||
|
||||
# Note: Some NetworkManager versions add a default password to hotspots
|
||||
# We'll create it and then immediately remove all security settings
|
||||
# Create an open AP connection profile from scratch.
|
||||
# Using 'connection add' instead of 'device wifi hotspot' because the
|
||||
# hotspot subcommand always attaches a WPA2 PSK on Bookworm/Trixie and
|
||||
# ignores post-creation security modifications.
|
||||
logger.info(f"Creating open AP with nmcli connection add: {ap_ssid} on "
|
||||
f"{self._wifi_interface} (no password)")
|
||||
cmd = [
|
||||
"nmcli", "device", "wifi", "hotspot",
|
||||
"ifname", self._wifi_interface,
|
||||
"nmcli", "connection", "add",
|
||||
"type", "wifi",
|
||||
"con-name", "LEDMatrix-Setup-AP",
|
||||
"ssid", ap_ssid,
|
||||
"band", "bg", # 2.4GHz for maximum compatibility
|
||||
"channel", str(ap_channel),
|
||||
# Don't pass password parameter - we'll remove security after creation
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if result.returncode == 0:
|
||||
# Always explicitly remove all security settings to ensure open network
|
||||
# NetworkManager sometimes adds default security even when not specified
|
||||
logger.info("Ensuring hotspot is open (no password)...")
|
||||
time.sleep(2) # Give it a moment to create
|
||||
|
||||
# Remove all possible security settings
|
||||
security_settings = [
|
||||
("802-11-wireless-security.key-mgmt", "none"),
|
||||
("802-11-wireless-security.psk", ""),
|
||||
("802-11-wireless-security.wep-key", ""),
|
||||
("802-11-wireless-security.wep-key-type", ""),
|
||||
("802-11-wireless-security.auth-alg", "open"),
|
||||
]
|
||||
|
||||
# On Trixie, also disable PMF (Protected Management Frames)
|
||||
# This can cause connection issues with certain WiFi adapters and clients
|
||||
if self._is_trixie:
|
||||
security_settings.append(("802-11-wireless-security.pmf", "disable"))
|
||||
logger.info("Trixie detected: disabling PMF for better client compatibility")
|
||||
|
||||
for setting, value in security_settings:
|
||||
result_modify = subprocess.run(
|
||||
["nmcli", "connection", "modify", "LEDMatrix-Setup-AP", setting, str(value)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
if result_modify.returncode != 0:
|
||||
logger.debug(f"Could not set {setting} to {value}: {result_modify.stderr}")
|
||||
|
||||
# On Trixie, set static IP address for the hotspot (default is 10.42.0.1)
|
||||
# We want 192.168.4.1 for consistency
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "modify", "LEDMatrix-Setup-AP",
|
||||
"ipv4.addresses", "192.168.4.1/24",
|
||||
"ipv4.method", "shared"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Verify it's open
|
||||
verify_result = subprocess.run(
|
||||
["nmcli", "-t", "-f", "802-11-wireless-security.key-mgmt,802-11-wireless-security.psk", "connection", "show", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
if verify_result.returncode == 0:
|
||||
output = verify_result.stdout.strip()
|
||||
key_mgmt = ""
|
||||
psk = ""
|
||||
for line in output.split('\n'):
|
||||
if 'key-mgmt:' in line:
|
||||
key_mgmt = line.split(':', 1)[1].strip() if ':' in line else ""
|
||||
elif 'psk:' in line:
|
||||
psk = line.split(':', 1)[1].strip() if ':' in line else ""
|
||||
|
||||
if key_mgmt != "none" or (psk and psk != ""):
|
||||
logger.warning(f"Hotspot still has security (key-mgmt={key_mgmt}, psk={'set' if psk else 'empty'}), deleting and recreating...")
|
||||
# Delete and recreate as last resort
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
time.sleep(1)
|
||||
# Recreate without any password parameters
|
||||
cmd_recreate = [
|
||||
"nmcli", "device", "wifi", "hotspot",
|
||||
"ifname", self._wifi_interface,
|
||||
"con-name", "LEDMatrix-Setup-AP",
|
||||
"ssid", ap_ssid,
|
||||
"band", "bg",
|
||||
"channel", str(ap_channel),
|
||||
]
|
||||
subprocess.run(cmd_recreate, capture_output=True, timeout=30)
|
||||
# Set IP address for consistency
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "modify", "LEDMatrix-Setup-AP",
|
||||
"802-11-wireless.mode", "ap",
|
||||
"802-11-wireless.band", "bg", # 2.4 GHz for maximum compatibility
|
||||
"802-11-wireless.channel", str(ap_channel),
|
||||
"ipv4.method", "shared",
|
||||
"ipv4.addresses", "192.168.4.1/24",
|
||||
"ipv4.method", "shared"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
# Disable PMF on Trixie
|
||||
if self._is_trixie:
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "modify", "LEDMatrix-Setup-AP",
|
||||
"802-11-wireless-security.pmf", "disable"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
logger.info("Recreated hotspot as open network")
|
||||
else:
|
||||
logger.info("Hotspot verified as open (no password)")
|
||||
# No 802-11-wireless-security section → open network
|
||||
]
|
||||
|
||||
# Restart the connection to apply all changes
|
||||
subprocess.run(
|
||||
["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
time.sleep(1)
|
||||
subprocess.run(
|
||||
# PMF (Protected Management Frames) is only meaningful for WPA2/WPA3.
|
||||
# An open AP has no security section, so adding 802-11-wireless-security.pmf
|
||||
# would cause NM to require key-mgmt too, breaking the connection add on
|
||||
# Trixie NM 1.52+. Leave PMF untouched — open APs have no frame protection.
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
|
||||
if result.returncode != 0:
|
||||
error_msg = result.stderr.strip() or result.stdout.strip()
|
||||
logger.error(f"Failed to create AP connection profile: {error_msg}")
|
||||
self._show_led_message("AP mode failed", duration=5)
|
||||
return False, f"Failed to create AP profile: {error_msg}"
|
||||
|
||||
# Write the NM dnsmasq-shared.d captive-portal config BEFORE bringing up
|
||||
# the connection so NM's dnsmasq picks it up at start time.
|
||||
# This causes every hostname DNS query from a connected device to resolve
|
||||
# to 192.168.4.1, automatically triggering the OS captive-portal popup.
|
||||
self._write_nm_dnsmasq_captive_conf()
|
||||
|
||||
logger.info("AP connection profile created, bringing it up...")
|
||||
up_result = subprocess.run(
|
||||
["nmcli", "connection", "up", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True,
|
||||
timeout=10
|
||||
capture_output=True, text=True, timeout=20
|
||||
)
|
||||
logger.info("Hotspot restarted with open network settings")
|
||||
logger.info(f"AP mode started via nmcli hotspot: {ap_ssid}")
|
||||
if up_result.returncode != 0:
|
||||
error_msg = up_result.stderr.strip() or up_result.stdout.strip()
|
||||
logger.error(f"Failed to bring up AP connection: {error_msg}")
|
||||
self._remove_nm_dnsmasq_captive_conf()
|
||||
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True, timeout=10)
|
||||
self._show_led_message("AP mode failed", duration=5)
|
||||
return False, f"Failed to start AP: {error_msg}"
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
# Verify hotspot is running
|
||||
# NM's ipv4.method=shared manages ip_forward automatically, so we only
|
||||
# need to add the iptables port-redirect rules for the captive portal.
|
||||
if not self._setup_iptables_redirect():
|
||||
logger.error("Captive-portal redirect setup failed; rolling back AP profile")
|
||||
self._remove_nm_dnsmasq_captive_conf()
|
||||
subprocess.run(["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True, timeout=10)
|
||||
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True, timeout=10)
|
||||
self._clear_led_message()
|
||||
return False, "AP started but captive-portal redirect setup failed"
|
||||
|
||||
# Verify the AP is actually running
|
||||
status = self._get_ap_status_nmcli()
|
||||
if status.get('active'):
|
||||
ip = status.get('ip', '192.168.4.1')
|
||||
logger.info(f"AP mode confirmed active at {ip}")
|
||||
self._show_led_message(f"Setup: {ip}", duration=5)
|
||||
return True, f"AP mode enabled (hotspot mode) - Access at {ip}:5000"
|
||||
logger.info(f"AP mode confirmed active at {ip} (open network, no password)")
|
||||
self._show_led_message(f"WiFi Setup\n{ap_ssid}\nNo password\n{ip}:5000", duration=10)
|
||||
return True, f"AP mode enabled (open network) - Access at {ip}:5000"
|
||||
else:
|
||||
logger.error("AP mode started but not verified")
|
||||
logger.error("AP mode started but not verified by status check — rolling back")
|
||||
self._teardown_iptables_redirect()
|
||||
self._remove_nm_dnsmasq_captive_conf()
|
||||
subprocess.run(["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True, timeout=10)
|
||||
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
|
||||
capture_output=True, timeout=10)
|
||||
self._clear_led_message()
|
||||
return False, "AP mode started but verification failed"
|
||||
else:
|
||||
error_msg = result.stderr.strip() or result.stdout.strip()
|
||||
logger.error(f"Failed to start AP mode via nmcli: {error_msg}")
|
||||
self._show_led_message("AP mode failed", duration=5)
|
||||
return False, f"Failed to start AP mode: {error_msg}"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting AP mode with nmcli hotspot: {e}")
|
||||
logger.error(f"Error starting AP mode with nmcli: {e}")
|
||||
self._remove_nm_dnsmasq_captive_conf()
|
||||
self._show_led_message("Setup mode error", duration=5)
|
||||
return False, str(e)
|
||||
|
||||
@@ -1976,7 +2133,12 @@ class WiFiManager:
|
||||
|
||||
for line in result.stdout.strip().split('\n'):
|
||||
parts = line.split(':')
|
||||
if len(parts) >= 2 and 'hotspot' in parts[1].lower():
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
conn_name = parts[0].strip()
|
||||
conn_type = parts[1].strip().lower()
|
||||
# Match our known AP profile name OR the legacy nmcli hotspot type
|
||||
if conn_name == "LEDMatrix-Setup-AP" or 'hotspot' in conn_type:
|
||||
# Get actual IP address (may be 192.168.4.1 or 10.42.0.1 depending on config)
|
||||
ip = '192.168.4.1'
|
||||
interface = parts[2] if len(parts) > 2 else self._wifi_interface
|
||||
@@ -2072,45 +2234,9 @@ class WiFiManager:
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not remove dnsmasq drop-in config: {e}")
|
||||
|
||||
# Remove iptables port forwarding rules and disable IP forwarding (only for hostapd mode)
|
||||
# Remove iptables redirect rules and restore ip_forward state (hostapd mode only)
|
||||
if hostapd_active:
|
||||
try:
|
||||
# Check if iptables is available
|
||||
iptables_check = subprocess.run(
|
||||
["which", "iptables"],
|
||||
capture_output=True,
|
||||
timeout=2
|
||||
)
|
||||
|
||||
if iptables_check.returncode == 0:
|
||||
# Remove NAT redirect rule
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-t", "nat", "-D", "PREROUTING", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "REDIRECT", "--to-port", "5000"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
# Remove INPUT rule
|
||||
subprocess.run(
|
||||
["sudo", "iptables", "-D", "INPUT", "-i", self._wifi_interface, "-p", "tcp", "--dport", "80", "-j", "ACCEPT"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
logger.info("Removed iptables port forwarding rules")
|
||||
else:
|
||||
logger.debug("iptables not available, skipping rule removal")
|
||||
|
||||
# Disable IP forwarding (restore to default client mode)
|
||||
subprocess.run(
|
||||
["sudo", "sysctl", "-w", "net.ipv4.ip_forward=0"],
|
||||
capture_output=True,
|
||||
timeout=5
|
||||
)
|
||||
logger.info("Disabled IP forwarding")
|
||||
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError) as e:
|
||||
logger.warning(f"Could not remove iptables rules or disable forwarding: {e}")
|
||||
# Continue anyway
|
||||
self._teardown_iptables_redirect()
|
||||
|
||||
# Clean up WiFi interface IP configuration
|
||||
subprocess.run(
|
||||
@@ -2153,14 +2279,17 @@ class WiFiManager:
|
||||
except Exception as e:
|
||||
logger.error(f"Final WiFi radio unblock attempt failed: {e}")
|
||||
else:
|
||||
# nmcli hotspot mode - restart not needed, just ensure WiFi radio is enabled
|
||||
logger.info("Skipping NetworkManager restart (nmcli hotspot mode, restart not needed)")
|
||||
# Still ensure WiFi radio is enabled (may have been disabled by nmcli operations)
|
||||
# Use retries for safety
|
||||
# nmcli AP mode — NM's ipv4.method=shared manages ip_forward automatically,
|
||||
# so we only need to remove the iptables redirect rules we added.
|
||||
logger.info("Skipping NetworkManager restart (nmcli AP mode, restart not needed)")
|
||||
self._teardown_iptables_redirect()
|
||||
self._remove_nm_dnsmasq_captive_conf()
|
||||
# Ensure WiFi radio is enabled after nmcli operations
|
||||
wifi_enabled = self._ensure_wifi_radio_enabled(max_retries=3)
|
||||
if not wifi_enabled:
|
||||
logger.warning("WiFi radio may be disabled after nmcli hotspot cleanup")
|
||||
logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
|
||||
|
||||
self._ap_enabled_at = None
|
||||
logger.info("AP mode disabled successfully")
|
||||
return True, "AP mode disabled"
|
||||
except Exception as e:
|
||||
@@ -2176,8 +2305,10 @@ class WiFiManager:
|
||||
config_dir = HOSTAPD_CONFIG_PATH.parent
|
||||
config_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
ap_ssid = self.config.get("ap_ssid", DEFAULT_AP_SSID)
|
||||
ap_channel = self.config.get("ap_channel", DEFAULT_AP_CHANNEL)
|
||||
# Use validated values — strips invalid chars and ensures channel is an int.
|
||||
# Also strip newlines from SSID to prevent config-file injection.
|
||||
ap_ssid, ap_channel = self._validate_ap_config()
|
||||
ap_ssid = ap_ssid.replace('\n', '').replace('\r', '')
|
||||
|
||||
# Open network configuration (no password) for easy setup access
|
||||
config_content = f"""interface={self._wifi_interface}
|
||||
@@ -2305,12 +2436,11 @@ address=/detectportal.firefox.com/192.168.4.1
|
||||
f"Ethernet={ethernet_connected}, AP_active={ap_active}, "
|
||||
f"auto_enable={auto_enable}, disconnected_checks={self._disconnected_checks}")
|
||||
|
||||
# Determine if we should have AP mode active
|
||||
# AP mode should only be auto-enabled if:
|
||||
# - auto_enable_ap_mode is True AND
|
||||
# - WiFi is NOT connected AND
|
||||
# - Ethernet is NOT connected AND
|
||||
# - We've had multiple consecutive disconnected checks (grace period)
|
||||
# Determine if we should have AP mode active.
|
||||
# AP-enable uses only the nmcli association state (fast, no network calls).
|
||||
# This keeps the same reliable behaviour as before: momentary packet loss
|
||||
# while on working WiFi does NOT trigger AP mode. The internet-reachability
|
||||
# check is performed separately in the daemon watchdog for NM recovery.
|
||||
is_disconnected = not status.connected and not ethernet_connected
|
||||
|
||||
if is_disconnected:
|
||||
@@ -2318,9 +2448,9 @@ address=/detectportal.firefox.com/192.168.4.1
|
||||
self._disconnected_checks += 1
|
||||
logger.debug(f"Network disconnected (check {self._disconnected_checks}/{self._disconnected_checks_required})")
|
||||
else:
|
||||
# Reset counter if we're connected
|
||||
# Reset counter if we're associated
|
||||
if self._disconnected_checks > 0:
|
||||
logger.debug(f"Network connected, resetting disconnected check counter")
|
||||
logger.debug("Network connected, resetting disconnected check counter")
|
||||
self._disconnected_checks = 0
|
||||
|
||||
# Only enable AP if we've had enough consecutive disconnected checks
|
||||
@@ -2366,6 +2496,24 @@ address=/detectportal.firefox.com/192.168.4.1
|
||||
# Don't disable it automatically, let it stay active
|
||||
logger.debug("AP mode is active (manually enabled), keeping active")
|
||||
|
||||
# Idle-timeout check: disable AP if no client has connected within the window.
|
||||
# Only applies when AP is active and we haven't just decided to enable/disable it.
|
||||
if ap_active and self._ap_enabled_at is not None:
|
||||
try:
|
||||
idle_timeout_min = max(1, min(1440, int(self.config.get("ap_idle_timeout_minutes", 15))))
|
||||
except (TypeError, ValueError):
|
||||
idle_timeout_min = 15
|
||||
elapsed = time.time() - self._ap_enabled_at
|
||||
if elapsed > idle_timeout_min * 60 and not self._has_ap_clients():
|
||||
logger.info(
|
||||
f"AP idle timeout ({idle_timeout_min} min, no clients) — disabling AP"
|
||||
)
|
||||
success, message = self.disable_ap_mode()
|
||||
if success:
|
||||
return True
|
||||
else:
|
||||
logger.warning(f"Failed to disable AP on idle timeout: {message}")
|
||||
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking AP mode: {e}", exc_info=True)
|
||||
|
||||
@@ -7,7 +7,7 @@ Wants=network.target
|
||||
Type=simple
|
||||
User=root
|
||||
WorkingDirectory=__PROJECT_ROOT_DIR__
|
||||
ExecStart=/usr/bin/python3 /home/ledpi/LEDMatrix/scripts/utils/wifi_monitor_daemon.py --interval 30
|
||||
ExecStart=/usr/bin/python3 __PROJECT_ROOT_DIR__/scripts/utils/wifi_monitor_daemon.py --interval 30
|
||||
Restart=on-failure
|
||||
RestartSec=10
|
||||
StandardOutput=syslog
|
||||
|
||||
334
test/test_wifi_manager_ap.py
Normal file
334
test/test_wifi_manager_ap.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""
|
||||
Unit tests for WiFi AP mode — src.wifi_manager.
|
||||
|
||||
Each test exercises logic that can be verified through the subprocess calls the
|
||||
manager emits, without requiring root access, hardware, or a running Pi.
|
||||
|
||||
Scenarios covered:
|
||||
1. nmcli AP profile is created with no security parameters (open/passwordless).
|
||||
2. iptables PREROUTING and INPUT rules are added when the nmcli AP starts.
|
||||
3. iptables rules and ip_forward are reverted when the AP is torn down.
|
||||
4. LED matrix message includes the SSID, 'No password', and the setup URL.
|
||||
5. Known AP profile names are deleted before the new profile is created.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from src.wifi_manager import WiFiManager
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ok(stdout: str = "", stderr: str = "") -> MagicMock:
|
||||
r = MagicMock()
|
||||
r.returncode = 0
|
||||
r.stdout = stdout
|
||||
r.stderr = stderr
|
||||
return r
|
||||
|
||||
|
||||
def _fail(stdout: str = "", stderr: str = "error") -> MagicMock:
|
||||
r = MagicMock()
|
||||
r.returncode = 1
|
||||
r.stdout = stdout
|
||||
r.stderr = stderr
|
||||
return r
|
||||
|
||||
|
||||
def _find_path_side_effect(name: str) -> str:
|
||||
"""Deterministic fake for _find_command_path."""
|
||||
return {"iptables": "/usr/sbin/iptables", "sysctl": "/usr/sbin/sysctl"}.get(
|
||||
name, f"/usr/bin/{name}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture()
|
||||
def wifi_config(tmp_path: Path) -> Path:
|
||||
"""Minimal wifi_config.json in a temporary directory."""
|
||||
cfg_dir = tmp_path / "config"
|
||||
cfg_dir.mkdir()
|
||||
cfg = {
|
||||
"ap_ssid": "LEDMatrix-Setup",
|
||||
"ap_channel": 7,
|
||||
"auto_enable_ap_mode": True,
|
||||
"saved_networks": [],
|
||||
}
|
||||
p = cfg_dir / "wifi_config.json"
|
||||
p.write_text(json.dumps(cfg))
|
||||
return p
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def manager(wifi_config: Path, tmp_path: Path) -> WiFiManager:
|
||||
"""
|
||||
WiFiManager with all system calls stubbed out during construction and the
|
||||
ip_forward save file redirected to a per-test temporary path.
|
||||
"""
|
||||
with patch("src.wifi_manager.subprocess.run", return_value=_ok(stdout="wlan0\n")), \
|
||||
patch.object(WiFiManager, "_detect_trixie", return_value=False):
|
||||
mgr = WiFiManager(config_path=wifi_config)
|
||||
|
||||
# Force clean, deterministic state regardless of what __init__ inferred
|
||||
mgr._wifi_interface = "wlan0"
|
||||
mgr.has_nmcli = True
|
||||
mgr.has_hostapd = False
|
||||
mgr.has_dnsmasq = False
|
||||
mgr.has_iwlist = False
|
||||
mgr._is_trixie = False
|
||||
# Redirect the ip_forward save file to tmp so tests never share state
|
||||
mgr._IP_FORWARD_SAVE_PATH = tmp_path / "ip_fwd_saved"
|
||||
return mgr
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. AP profile is open (no password)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_nmcli_ap_profile_has_no_security_params(manager: WiFiManager) -> None:
|
||||
"""
|
||||
The 'nmcli connection add' command must not include key-mgmt, psk, or any
|
||||
WPA-related parameter. On Bookworm/Trixie, NM creates a WPA2-protected
|
||||
hotspot even when those values are set to 'none'/empty via a later
|
||||
'connection modify', so the profile must be created without a security
|
||||
section from the start.
|
||||
"""
|
||||
captured: list[list[str]] = []
|
||||
|
||||
def _run(cmd, **kw):
|
||||
captured.append(list(cmd))
|
||||
return _ok()
|
||||
|
||||
with patch("src.wifi_manager.subprocess.run", side_effect=_run), \
|
||||
patch.object(manager, "disconnect_from_network", return_value=(True, "ok")), \
|
||||
patch.object(manager, "_setup_iptables_redirect", return_value=True), \
|
||||
patch.object(manager, "_get_ap_status_nmcli",
|
||||
return_value={"active": True, "ip": "192.168.4.1"}), \
|
||||
patch.object(manager, "_show_led_message"):
|
||||
|
||||
success, _ = manager._enable_ap_mode_nmcli_hotspot()
|
||||
|
||||
assert success, "AP enable should report success"
|
||||
|
||||
add_calls = [c for c in captured if "nmcli" in c and "connection" in c and "add" in c]
|
||||
assert add_calls, "Expected at least one 'nmcli connection add' invocation"
|
||||
|
||||
add_str = " ".join(add_calls[0])
|
||||
assert "key-mgmt" not in add_str, "AP profile must not set key-mgmt"
|
||||
assert "psk" not in add_str, "AP profile must not include a PSK/password"
|
||||
assert "wpa" not in add_str.lower(), "AP profile must not reference WPA"
|
||||
assert "802-11-wireless.mode" in add_str, "AP profile must declare wireless mode"
|
||||
# Verify the value for 802-11-wireless.mode is exactly "ap" — check the element
|
||||
# that immediately follows the key in the command list, not a loose substring match.
|
||||
cmd = add_calls[0]
|
||||
try:
|
||||
mode_idx = cmd.index("802-11-wireless.mode")
|
||||
assert cmd[mode_idx + 1] == "ap", \
|
||||
f"802-11-wireless.mode value must be exactly 'ap', got {cmd[mode_idx + 1]!r}"
|
||||
except ValueError:
|
||||
pytest.fail("802-11-wireless.mode not found as a list element in nmcli command")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. iptables NAT rules are added when the AP starts
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_iptables_nat_rules_added_on_ap_start(manager: WiFiManager) -> None:
|
||||
"""
|
||||
_setup_iptables_redirect must add:
|
||||
- a PREROUTING REDIRECT rule that maps incoming TCP port 80 to port 5000, and
|
||||
- an INPUT ACCEPT rule for port 5000 (the post-redirect destination port,
|
||||
NOT port 80 which never hits the INPUT chain after PREROUTING rewrites it).
|
||||
"""
|
||||
captured: list[list[str]] = []
|
||||
|
||||
def _run(cmd, **kw):
|
||||
captured.append(list(cmd))
|
||||
# iptables -C (check) → rc=1 so the -A (add) branch executes
|
||||
if "iptables" in " ".join(str(x) for x in cmd) and "-C" in cmd:
|
||||
return _fail()
|
||||
return _ok()
|
||||
|
||||
# Patch Path.read_text so /proc/sys/net/ipv4/ip_forward is readable on any OS
|
||||
with patch("src.wifi_manager.subprocess.run", side_effect=_run), \
|
||||
patch.object(manager, "_find_command_path", side_effect=_find_path_side_effect), \
|
||||
patch("pathlib.Path.read_text", return_value="0\n"):
|
||||
|
||||
result = manager._setup_iptables_redirect()
|
||||
|
||||
assert result, "_setup_iptables_redirect must return True on success"
|
||||
|
||||
prerouting_adds = [c for c in captured if "iptables" in " ".join(c) and "-A" in c and "PREROUTING" in c]
|
||||
assert prerouting_adds, "Expected 'iptables -A PREROUTING' invocation"
|
||||
pr_str = " ".join(prerouting_adds[0])
|
||||
assert "--dport" in pr_str and "80" in pr_str, "PREROUTING rule must match dport 80"
|
||||
assert "5000" in pr_str, "PREROUTING rule must redirect to port 5000"
|
||||
assert "REDIRECT" in pr_str, "PREROUTING rule must use REDIRECT target"
|
||||
|
||||
input_adds = [c for c in captured if "iptables" in " ".join(c) and "-A" in c and "INPUT" in c]
|
||||
assert input_adds, "Expected 'iptables -A INPUT' invocation"
|
||||
in_str = " ".join(input_adds[0])
|
||||
assert "5000" in in_str, "INPUT rule must accept port 5000 (post-PREROUTING destination)"
|
||||
assert "ACCEPT" in in_str, "INPUT rule must use ACCEPT target"
|
||||
|
||||
# Port 80 must NOT be used in the INPUT rule (it is already redirected by PREROUTING)
|
||||
input_80 = [c for c in captured if "iptables" in " ".join(c) and "INPUT" in c and "--dport" in c and "80" in c]
|
||||
assert not input_80, "INPUT rule must target port 5000, not port 80"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3a. iptables rules and ip_forward reverted on teardown
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_iptables_rules_and_ip_forward_reverted_on_teardown(manager: WiFiManager) -> None:
|
||||
"""
|
||||
_teardown_iptables_redirect must:
|
||||
- remove the PREROUTING and INPUT iptables rules, and
|
||||
- restore ip_forward to the exact value recorded in the save file.
|
||||
"""
|
||||
original_fwd = "0"
|
||||
manager._IP_FORWARD_SAVE_PATH.write_text(original_fwd)
|
||||
# Teardown dispatches on the backend recorded during setup
|
||||
manager._redirect_backend = "iptables"
|
||||
|
||||
captured: list[list[str]] = []
|
||||
|
||||
with patch("src.wifi_manager.subprocess.run",
|
||||
side_effect=lambda cmd, **kw: (captured.append(list(cmd)) or _ok())), \
|
||||
patch.object(manager, "_find_command_path", side_effect=_find_path_side_effect):
|
||||
|
||||
manager._teardown_iptables_redirect()
|
||||
|
||||
assert [c for c in captured if "iptables" in " ".join(c) and "-D" in c and "PREROUTING" in c], \
|
||||
"Expected 'iptables -D PREROUTING' invocation"
|
||||
|
||||
assert [c for c in captured if "iptables" in " ".join(c) and "-D" in c and "INPUT" in c], \
|
||||
"Expected 'iptables -D INPUT' invocation"
|
||||
|
||||
restore_calls = [
|
||||
c for c in captured
|
||||
if "sysctl" in " ".join(c) and f"ip_forward={original_fwd}" in " ".join(c)
|
||||
]
|
||||
assert restore_calls, f"Expected sysctl to restore ip_forward to {original_fwd!r}"
|
||||
|
||||
assert not manager._IP_FORWARD_SAVE_PATH.exists(), \
|
||||
"Save file must be removed after successful teardown"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3b. ip_forward untouched when no save file exists
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_ip_forward_not_restored_when_save_file_absent(manager: WiFiManager) -> None:
|
||||
"""
|
||||
When the save file is missing (setup never wrote it, e.g. because /proc was
|
||||
unreadable or the write failed), teardown must NOT call sysctl so it does not
|
||||
accidentally clobber ip_forward state owned by a VPN or NetworkManager.
|
||||
"""
|
||||
assert not manager._IP_FORWARD_SAVE_PATH.exists()
|
||||
|
||||
captured: list[list[str]] = []
|
||||
|
||||
with patch("src.wifi_manager.subprocess.run",
|
||||
side_effect=lambda cmd, **kw: (captured.append(list(cmd)) or _ok())), \
|
||||
patch.object(manager, "_find_command_path", side_effect=_find_path_side_effect):
|
||||
|
||||
manager._teardown_iptables_redirect()
|
||||
|
||||
sysctl_calls = [
|
||||
c for c in captured
|
||||
if "sysctl" in " ".join(c) and "ip_forward" in " ".join(c)
|
||||
]
|
||||
assert not sysctl_calls, \
|
||||
"sysctl must not be called when no ip_forward save file exists"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. LED message content
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_led_message_shows_ssid_no_password_and_url(manager: WiFiManager) -> None:
|
||||
"""
|
||||
When the nmcli AP activates, the LED message must include:
|
||||
- the AP SSID ('LEDMatrix-Setup')
|
||||
- the string 'No password'
|
||||
- the AP IP address (192.168.4.1) and Flask port (5000)
|
||||
"""
|
||||
led_messages: list[str] = []
|
||||
|
||||
with patch("src.wifi_manager.subprocess.run", return_value=_ok()), \
|
||||
patch.object(manager, "disconnect_from_network", return_value=(True, "ok")), \
|
||||
patch.object(manager, "_setup_iptables_redirect", return_value=True), \
|
||||
patch.object(manager, "_get_ap_status_nmcli",
|
||||
return_value={"active": True, "ip": "192.168.4.1"}), \
|
||||
patch.object(manager, "_show_led_message",
|
||||
side_effect=lambda msg, **kw: led_messages.append(msg)):
|
||||
|
||||
success, _ = manager._enable_ap_mode_nmcli_hotspot()
|
||||
|
||||
assert success, "AP enable should report success"
|
||||
assert led_messages, "Expected at least one _show_led_message call"
|
||||
|
||||
combined = "\n".join(led_messages)
|
||||
assert "No password" in combined, "LED message must say 'No password'"
|
||||
assert "LEDMatrix-Setup" in combined, "LED message must include the AP SSID"
|
||||
assert "192.168.4.1" in combined, "LED message must include the AP IP address"
|
||||
assert "5000" in combined, "LED message must include the Flask port"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. Stale AP profiles deleted before the new one is created
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_existing_ap_profiles_deleted_before_new_profile_created(manager: WiFiManager) -> None:
|
||||
"""
|
||||
Before 'nmcli connection add', the manager must issue
|
||||
'nmcli connection down/delete' for every known AP profile name so stale
|
||||
profiles (from a previous crash or partial setup) cannot block the new one.
|
||||
"""
|
||||
captured: list[list[str]] = []
|
||||
|
||||
def _run(cmd, **kw):
|
||||
captured.append(list(cmd))
|
||||
return _ok()
|
||||
|
||||
with patch("src.wifi_manager.subprocess.run", side_effect=_run), \
|
||||
patch.object(manager, "disconnect_from_network", return_value=(True, "ok")), \
|
||||
patch.object(manager, "_setup_iptables_redirect", return_value=True), \
|
||||
patch.object(manager, "_get_ap_status_nmcli",
|
||||
return_value={"active": True, "ip": "192.168.4.1"}), \
|
||||
patch.object(manager, "_show_led_message"):
|
||||
|
||||
success, _ = manager._enable_ap_mode_nmcli_hotspot()
|
||||
|
||||
assert success
|
||||
|
||||
cmd_strs = [" ".join(c) for c in captured]
|
||||
|
||||
for profile in ("LEDMatrix-Setup-AP", "Hotspot", "TickerSetup-AP"):
|
||||
assert any("connection delete" in s and profile in s for s in cmd_strs), \
|
||||
f"Expected 'nmcli connection delete {profile}' before creating the new profile"
|
||||
|
||||
add_indices = [i for i, s in enumerate(cmd_strs) if "connection add" in s]
|
||||
del_indices = [i for i, s in enumerate(cmd_strs) if "connection delete" in s]
|
||||
|
||||
assert add_indices, "Expected 'nmcli connection add' call"
|
||||
assert del_indices, "Expected 'nmcli connection delete' calls"
|
||||
assert max(del_indices) < min(add_indices), \
|
||||
"All connection deletions must complete before the new profile is created"
|
||||
@@ -226,13 +226,24 @@ def serve_plugin_asset(plugin_id, filename):
|
||||
'message': 'Internal server error'
|
||||
}), 500
|
||||
|
||||
# Prime psutil CPU measurement once at startup so interval=None returns a real value
|
||||
try:
|
||||
import psutil as _psutil_prime
|
||||
_psutil_prime.cpu_percent(interval=None)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Cached AP mode check — avoids creating a WiFiManager per request
|
||||
_ap_mode_cache = {'value': False, 'timestamp': 0}
|
||||
_AP_MODE_CACHE_TTL = 5 # seconds
|
||||
_AP_MODE_CACHE_TTL = 30 # seconds — AP mode is user-initiated; 30s is fine
|
||||
|
||||
# Cached ledmatrix service status for SSE stats stream
|
||||
_ledmatrix_service_cache = {'active': False, 'timestamp': 0}
|
||||
_LEDMATRIX_SERVICE_CACHE_TTL = 15 # seconds
|
||||
|
||||
def is_ap_mode_active():
|
||||
"""
|
||||
Check if access point mode is currently active (cached, 5s TTL).
|
||||
Check if access point mode is currently active (cached, 30s TTL).
|
||||
Uses a direct systemctl check instead of instantiating WiFiManager.
|
||||
"""
|
||||
now = time.time()
|
||||
@@ -444,7 +455,8 @@ def system_status_generator():
|
||||
# Try to import psutil for system stats
|
||||
try:
|
||||
import psutil
|
||||
cpu_percent = round(psutil.cpu_percent(interval=1), 1)
|
||||
# interval=None is non-blocking; primed at module startup above
|
||||
cpu_percent = round(psutil.cpu_percent(interval=None), 1)
|
||||
memory = psutil.virtual_memory()
|
||||
memory_used_percent = round(memory.percent, 1)
|
||||
|
||||
@@ -461,14 +473,17 @@ def system_status_generator():
|
||||
memory_used_percent = 0
|
||||
cpu_temp = 0
|
||||
|
||||
# Check if display service is running
|
||||
service_active = False
|
||||
# Check if display service is running (cached to avoid per-client subprocess forks)
|
||||
now = time.time()
|
||||
if (now - _ledmatrix_service_cache['timestamp']) >= _LEDMATRIX_SERVICE_CACHE_TTL:
|
||||
try:
|
||||
result = subprocess.run(['systemctl', 'is-active', 'ledmatrix'],
|
||||
capture_output=True, text=True, timeout=2)
|
||||
service_active = result.stdout.strip() == 'active'
|
||||
_ledmatrix_service_cache['active'] = result.stdout.strip() == 'active'
|
||||
except (subprocess.SubprocessError, OSError):
|
||||
pass
|
||||
_ledmatrix_service_cache['timestamp'] = now
|
||||
service_active = _ledmatrix_service_cache['active']
|
||||
|
||||
status = {
|
||||
'timestamp': time.time(),
|
||||
@@ -546,7 +561,7 @@ def display_preview_generator():
|
||||
except Exception as e:
|
||||
yield {'error': str(e)}
|
||||
|
||||
time.sleep(0.5) # Check 2 times per second (reduced frequency for better performance)
|
||||
time.sleep(1.0) # Check once per second — halves PIL encode overhead vs 0.5s
|
||||
|
||||
# Logs generator for SSE
|
||||
def logs_generator():
|
||||
|
||||
@@ -2,6 +2,7 @@ from flask import Blueprint, request, jsonify, Response, send_from_directory
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import socket
|
||||
import sys
|
||||
import subprocess
|
||||
@@ -16,6 +17,11 @@ from typing import Optional, Tuple, Dict, Any, Type
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SUDO_BIN = shutil.which("sudo") or "/usr/bin/sudo"
|
||||
SYSTEMCTL_BIN = shutil.which("systemctl") or "/usr/bin/systemctl"
|
||||
REBOOT_BIN = shutil.which("reboot") or "/usr/sbin/reboot"
|
||||
POWEROFF_BIN = shutil.which("poweroff") or "/usr/sbin/poweroff"
|
||||
|
||||
# Import new infrastructure
|
||||
from src.web_interface.api_helpers import success_response, error_response, validate_request_json
|
||||
from src.web_interface.errors import ErrorCode
|
||||
@@ -218,7 +224,7 @@ def _ensure_display_service_running():
|
||||
if status.get('active'):
|
||||
status['started'] = False
|
||||
return status
|
||||
result = _run_systemctl_command(['sudo', 'systemctl', 'start', 'ledmatrix'])
|
||||
result = _run_systemctl_command([SUDO_BIN, SYSTEMCTL_BIN, 'start', 'ledmatrix.service'])
|
||||
service_status = _get_display_service_status()
|
||||
result['started'] = result.get('returncode') == 0
|
||||
result['active'] = service_status.get('active')
|
||||
@@ -227,7 +233,7 @@ def _ensure_display_service_running():
|
||||
|
||||
def _stop_display_service():
|
||||
"""Stop the ledmatrix display service."""
|
||||
result = _run_systemctl_command(['sudo', 'systemctl', 'stop', 'ledmatrix'])
|
||||
result = _run_systemctl_command([SUDO_BIN, SYSTEMCTL_BIN, 'stop', 'ledmatrix.service'])
|
||||
status = _get_display_service_status()
|
||||
result['active'] = status.get('active')
|
||||
result['status'] = status
|
||||
@@ -1716,33 +1722,34 @@ def execute_system_action():
|
||||
if mode:
|
||||
# For on-demand modes, we would need to integrate with the display controller
|
||||
# For now, just start the display service
|
||||
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'start', 'ledmatrix.service'],
|
||||
capture_output=True, text=True, timeout=15)
|
||||
return jsonify({
|
||||
'status': 'success' if result.returncode == 0 else 'error',
|
||||
'message': f'Started display in {mode} mode',
|
||||
'message': f'Started display in {mode} mode' if result.returncode == 0
|
||||
else f'Failed to start display in {mode} mode: {result.stderr.strip() or "check sudo systemctl status ledmatrix.service"}',
|
||||
'returncode': result.returncode,
|
||||
'stdout': result.stdout,
|
||||
'stderr': result.stderr
|
||||
})
|
||||
else:
|
||||
result = subprocess.run(['sudo', 'systemctl', 'start', 'ledmatrix'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'start', 'ledmatrix.service'],
|
||||
capture_output=True, text=True, timeout=15)
|
||||
elif action == 'stop_display':
|
||||
result = subprocess.run(['sudo', 'systemctl', 'stop', 'ledmatrix'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'stop', 'ledmatrix.service'],
|
||||
capture_output=True, text=True, timeout=15)
|
||||
elif action == 'enable_autostart':
|
||||
result = subprocess.run(['sudo', 'systemctl', 'enable', 'ledmatrix'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'enable', 'ledmatrix.service'],
|
||||
capture_output=True, text=True, timeout=15)
|
||||
elif action == 'disable_autostart':
|
||||
result = subprocess.run(['sudo', 'systemctl', 'disable', 'ledmatrix'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'disable', 'ledmatrix.service'],
|
||||
capture_output=True, text=True, timeout=15)
|
||||
elif action == 'reboot_system':
|
||||
result = subprocess.run(['sudo', 'reboot'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, REBOOT_BIN],
|
||||
capture_output=True, text=True, timeout=10)
|
||||
elif action == 'shutdown_system':
|
||||
result = subprocess.run(['sudo', 'poweroff'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, POWEROFF_BIN],
|
||||
capture_output=True, text=True, timeout=10)
|
||||
elif action == 'git_pull':
|
||||
# Use PROJECT_ROOT instead of hardcoded path
|
||||
project_dir = str(PROJECT_ROOT)
|
||||
@@ -1823,12 +1830,11 @@ def execute_system_action():
|
||||
'stderr': result.stderr
|
||||
})
|
||||
elif action == 'restart_display_service':
|
||||
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'restart', 'ledmatrix.service'],
|
||||
capture_output=True, text=True, timeout=15)
|
||||
elif action == 'restart_web_service':
|
||||
# Try to restart the web service (assuming it's ledmatrix-web.service)
|
||||
result = subprocess.run(['sudo', 'systemctl', 'restart', 'ledmatrix-web'],
|
||||
capture_output=True, text=True)
|
||||
result = subprocess.run([SUDO_BIN, SYSTEMCTL_BIN, 'restart', 'ledmatrix-web.service'],
|
||||
capture_output=True, text=True, timeout=15)
|
||||
else:
|
||||
return jsonify({'status': 'error', 'message': f'Unknown action: {action}'}), 400
|
||||
|
||||
@@ -1840,6 +1846,13 @@ def execute_system_action():
|
||||
'stderr': result.stderr
|
||||
})
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
if action == 'start_display' and mode:
|
||||
msg = f'Failed to start display in {mode} mode: timed out'
|
||||
else:
|
||||
msg = f'Action {action} timed out'
|
||||
logger.warning("[System] execute_system_action timed out: action=%s", action)
|
||||
return jsonify({'status': 'error', 'message': msg, 'returncode': -1, 'stdout': '', 'stderr': 'timeout'}), 500
|
||||
except Exception as e:
|
||||
logger.exception("[System] execute_system_action failed")
|
||||
return jsonify({'status': 'error', 'message': 'Failed to execute system action'}), 500
|
||||
@@ -4264,8 +4277,13 @@ def _set_missing_booleans_to_false(config, schema_props, form_keys, prefix='', c
|
||||
elif prop_type == 'object' and 'properties' in prop_schema:
|
||||
# Recurse into nested objects
|
||||
if config_node is not None:
|
||||
# Inside an array item — ensure nested dict exists in item
|
||||
if prop_name not in node or not isinstance(node[prop_name], dict):
|
||||
# Inside an array item — only recurse if the sub-object already exists.
|
||||
# Never create optional sub-objects that weren't submitted; doing so
|
||||
# produces e.g. logo:{} on feed items with no logo, which then fails
|
||||
# schema validation when the object has required fields (id, path).
|
||||
if prop_name not in node:
|
||||
continue
|
||||
if not isinstance(node[prop_name], dict):
|
||||
node[prop_name] = {}
|
||||
_set_missing_booleans_to_false(
|
||||
config, prop_schema['properties'], form_keys, full_path,
|
||||
@@ -4405,10 +4423,22 @@ def _filter_config_by_schema(config, schema, prefix=''):
|
||||
prop_schema = schema_props[key]
|
||||
|
||||
# Handle nested objects recursively
|
||||
item_prefix = f"{prefix}.{key}" if prefix else key
|
||||
if isinstance(value, dict) and prop_schema.get('type') == 'object' and 'properties' in prop_schema:
|
||||
filtered[key] = _filter_config_by_schema(value, prop_schema, f"{prefix}.{key}" if prefix else key)
|
||||
filtered[key] = _filter_config_by_schema(value, prop_schema, item_prefix)
|
||||
elif isinstance(value, list) and prop_schema.get('type') == 'array':
|
||||
items_schema = prop_schema.get('items', {})
|
||||
if isinstance(items_schema, dict) and items_schema.get('type') == 'object' and 'properties' in items_schema:
|
||||
# Filter each item in the array so extra fields are stripped before
|
||||
# schema validation (important when items has additionalProperties: false)
|
||||
filtered[key] = [
|
||||
_filter_config_by_schema(item, items_schema, item_prefix) if isinstance(item, dict) else item
|
||||
for item in value
|
||||
]
|
||||
else:
|
||||
# Keep the value as-is for non-object types
|
||||
filtered[key] = value
|
||||
else:
|
||||
# Keep the value as-is for non-object/non-array types
|
||||
filtered[key] = value
|
||||
|
||||
return filtered
|
||||
@@ -7133,9 +7163,14 @@ def connect_wifi():
|
||||
'message': message
|
||||
})
|
||||
else:
|
||||
# Propagate structured error type so the captive portal UI can show
|
||||
# "Wrong password — try again" instead of a generic failure message.
|
||||
error_type = "wrong_password" if (message or "").startswith("wrong_password:") else "connection_failed"
|
||||
clean_message = (message or "").removeprefix("wrong_password:").lstrip() or "Failed to connect to network"
|
||||
return jsonify({
|
||||
'status': 'error',
|
||||
'message': message or 'Failed to connect to network'
|
||||
'message': clean_message,
|
||||
'error_type': error_type
|
||||
}), 400
|
||||
except Exception as e:
|
||||
logger.exception("[WiFi] Failed connecting to WiFi network")
|
||||
@@ -7559,6 +7594,126 @@ _STARLARK_APPS_DIR = PROJECT_ROOT / 'starlark-apps'
|
||||
_STARLARK_MANIFEST_FILE = _STARLARK_APPS_DIR / 'manifest.json'
|
||||
|
||||
|
||||
def _find_pixlet_binary(explicit_path: Optional[str] = None) -> Optional[str]:
|
||||
"""Find pixlet binary: explicit path → bundled binary → system PATH."""
|
||||
import platform
|
||||
if explicit_path and os.path.isfile(explicit_path) and os.access(explicit_path, os.X_OK):
|
||||
return explicit_path
|
||||
bin_dir = PROJECT_ROOT / "bin" / "pixlet"
|
||||
system = platform.system().lower()
|
||||
machine = platform.machine().lower()
|
||||
if system == "linux":
|
||||
if "aarch64" in machine or "arm64" in machine:
|
||||
name = "pixlet-linux-arm64"
|
||||
elif "x86_64" in machine or "amd64" in machine:
|
||||
name = "pixlet-linux-amd64"
|
||||
else:
|
||||
name = None
|
||||
elif system == "darwin":
|
||||
name = "pixlet-darwin-arm64" if "arm64" in machine else "pixlet-darwin-amd64"
|
||||
else:
|
||||
name = None
|
||||
if name:
|
||||
bundled = bin_dir / name
|
||||
if bundled.is_file():
|
||||
if os.access(str(bundled), os.X_OK):
|
||||
return str(bundled)
|
||||
try:
|
||||
bundled.chmod(0o755)
|
||||
except OSError:
|
||||
logger.warning("Could not make pixlet bundled binary executable (%s); falling back to PATH", bundled)
|
||||
else:
|
||||
if os.access(str(bundled), os.X_OK):
|
||||
return str(bundled)
|
||||
logger.warning("Pixlet bundled binary still not executable after chmod (%s); falling back to PATH", bundled)
|
||||
return shutil.which("pixlet")
|
||||
|
||||
|
||||
def _standalone_render_starlark_app(app_id: str) -> Tuple[bool, int, Optional[str]]:
|
||||
"""Render a Starlark app via pixlet directly (no plugin required).
|
||||
|
||||
Reads the .star file and config from starlark-apps/{app_id}/, runs pixlet,
|
||||
and saves the output to cached_render.webp in the same directory.
|
||||
This is the web-service fallback when starlark-apps plugin is not loaded.
|
||||
|
||||
Returns (success, http_status_code, error_message).
|
||||
"""
|
||||
manifest = _read_starlark_manifest()
|
||||
if not isinstance(manifest, dict):
|
||||
return False, 400, "Invalid manifest shape: expected object with 'apps' mapping"
|
||||
apps = manifest.get('apps', {})
|
||||
if not isinstance(apps, dict):
|
||||
return False, 400, "Invalid manifest shape: expected object with 'apps' mapping"
|
||||
app_data = apps.get(app_id)
|
||||
if not app_data:
|
||||
return False, 404, f"App not found: {app_id}"
|
||||
|
||||
app_dir = _STARLARK_APPS_DIR / app_id
|
||||
star_file = app_dir / app_data.get('star_file', f'{app_id}.star')
|
||||
if not star_file.exists():
|
||||
return False, 404, f"Star file not found: {star_file}"
|
||||
|
||||
full_config = api_v3.config_manager.load_config() if api_v3.config_manager else {}
|
||||
plugin_config = full_config.get('starlark-apps', {})
|
||||
|
||||
pixlet_path = _find_pixlet_binary(plugin_config.get('pixlet_path'))
|
||||
if not pixlet_path:
|
||||
return False, 503, "Pixlet binary not found — install pixlet first"
|
||||
|
||||
magnify = plugin_config.get('magnify')
|
||||
if magnify is None:
|
||||
hw = full_config.get('display', {}).get('hardware', {})
|
||||
cols = hw.get('cols', 64)
|
||||
chain = hw.get('chain_length', 1)
|
||||
rows = hw.get('rows', 32)
|
||||
magnify = max(1, min(8, int(min((cols * chain) / 64, rows / 32))))
|
||||
else:
|
||||
try:
|
||||
magnify = max(1, min(8, int(magnify)))
|
||||
except (ValueError, TypeError):
|
||||
magnify = 1
|
||||
|
||||
config_file = app_dir / 'config.json'
|
||||
app_config: Dict[str, Any] = {}
|
||||
if config_file.exists():
|
||||
try:
|
||||
with open(config_file) as f:
|
||||
app_config = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
return False, 400, f"Invalid config.json for {app_id} ({config_file}): {e}"
|
||||
except OSError as e:
|
||||
return False, 400, f"Cannot read config.json for {app_id} ({config_file}): {e}"
|
||||
if not isinstance(app_config, dict):
|
||||
return False, 400, (
|
||||
f"config.json for {app_id} must be a JSON object, "
|
||||
f"got {type(app_config).__name__}"
|
||||
)
|
||||
|
||||
INTERNAL_KEYS = {'render_interval', 'display_duration'}
|
||||
pixlet_config = {k: v for k, v in app_config.items() if k not in INTERNAL_KEYS}
|
||||
|
||||
output_path = str(app_dir / 'cached_render.webp')
|
||||
cmd = [pixlet_path, 'render', str(star_file)]
|
||||
for key, value in pixlet_config.items():
|
||||
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', key):
|
||||
continue
|
||||
value_str = 'true' if value is True else 'false' if value is False else str(value)
|
||||
if re.search(r'[`$|<>&;\x00]|\$\(', value_str):
|
||||
continue
|
||||
cmd.append(f'{key}={value_str}')
|
||||
cmd.extend(['-o', output_path, '-m', str(magnify)])
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30, cwd=str(app_dir))
|
||||
if result.returncode == 0 and os.path.isfile(output_path):
|
||||
return True, 200, None
|
||||
return False, 502, f"Pixlet failed (exit {result.returncode}): {result.stderr.strip()}"
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, 504, "Render timed out after 30s"
|
||||
except Exception as e:
|
||||
return False, 500, f"Render error: {e}"
|
||||
|
||||
|
||||
def _read_starlark_manifest() -> Dict[str, Any]:
|
||||
"""Read the starlark-apps manifest.json directly from disk."""
|
||||
try:
|
||||
@@ -7678,24 +7833,11 @@ def get_starlark_status():
|
||||
'display_info': magnify_info
|
||||
})
|
||||
|
||||
# Plugin not loaded - check Pixlet availability directly
|
||||
import shutil
|
||||
import platform
|
||||
|
||||
system = platform.system().lower()
|
||||
machine = platform.machine().lower()
|
||||
bin_dir = PROJECT_ROOT / 'bin' / 'pixlet'
|
||||
|
||||
pixlet_binary = None
|
||||
if system == "linux":
|
||||
if "aarch64" in machine or "arm64" in machine:
|
||||
pixlet_binary = bin_dir / "pixlet-linux-arm64"
|
||||
elif "x86_64" in machine or "amd64" in machine:
|
||||
pixlet_binary = bin_dir / "pixlet-linux-amd64"
|
||||
elif system == "darwin":
|
||||
pixlet_binary = bin_dir / ("pixlet-darwin-arm64" if "arm64" in machine else "pixlet-darwin-amd64")
|
||||
|
||||
pixlet_available = (pixlet_binary and pixlet_binary.exists()) or shutil.which('pixlet') is not None
|
||||
# Plugin not loaded - check Pixlet availability via shared resolver
|
||||
# (respects user-configured pixlet_path, bundled binary, and system PATH)
|
||||
full_config = api_v3.config_manager.load_config() if api_v3.config_manager else {}
|
||||
pixlet_path = _find_pixlet_binary(full_config.get('starlark-apps', {}).get('pixlet_path'))
|
||||
pixlet_available = pixlet_path is not None
|
||||
|
||||
# Read app counts from manifest
|
||||
manifest = _read_starlark_manifest()
|
||||
@@ -8148,20 +8290,27 @@ def toggle_starlark_app(app_id):
|
||||
def render_starlark_app(app_id):
|
||||
"""Force render a Starlark app."""
|
||||
try:
|
||||
starlark_plugin = _get_starlark_plugin()
|
||||
if not starlark_plugin:
|
||||
return jsonify({'status': 'error', 'message': 'Rendering requires the main LEDMatrix service (plugin not loaded in web service)'}), 503
|
||||
is_valid, err = _validate_starlark_app_path(app_id)
|
||||
if not is_valid:
|
||||
return jsonify({'status': 'error', 'message': err}), 400
|
||||
|
||||
starlark_plugin = _get_starlark_plugin()
|
||||
if starlark_plugin:
|
||||
app = starlark_plugin.apps.get(app_id)
|
||||
if not app:
|
||||
return jsonify({'status': 'error', 'message': f'App not found: {app_id}'}), 404
|
||||
|
||||
success = starlark_plugin._render_app(app, force=True)
|
||||
if success:
|
||||
return jsonify({'status': 'success', 'message': 'App rendered', 'frame_count': len(app.frames) if app.frames else 0})
|
||||
else:
|
||||
return jsonify({'status': 'success', 'message': 'App rendered',
|
||||
'frame_count': len(app.frames) if app.frames else 0})
|
||||
return jsonify({'status': 'error', 'message': 'Failed to render app'}), 500
|
||||
|
||||
# Web-service context: plugin not loaded, call pixlet directly
|
||||
success, status_code, error = _standalone_render_starlark_app(app_id)
|
||||
if success:
|
||||
return jsonify({'status': 'success', 'message': 'App rendered successfully', 'frame_count': 0}), status_code
|
||||
return jsonify({'status': 'error', 'message': error or 'Render failed', 'frame_count': 0}), status_code
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("[Starlark] render_starlark_app failed")
|
||||
return jsonify({'status': 'error', 'message': 'Failed to render Starlark app'}), 500
|
||||
|
||||
@@ -898,6 +898,10 @@ window.currentPluginConfig = null;
|
||||
const CACHE_DURATION = 5 * 60 * 1000; // 5 minutes in milliseconds
|
||||
let storeFilteredList = [];
|
||||
|
||||
function storeCacheExpired() {
|
||||
return !cacheTimestamp || (Date.now() - cacheTimestamp >= CACHE_DURATION);
|
||||
}
|
||||
|
||||
// ── Plugin Store Filter State ───────────────────────────────────────────
|
||||
const storeFilterState = {
|
||||
sort: safeLocalStorage.getItem('storeSort') || 'a-z',
|
||||
@@ -1214,12 +1218,18 @@ function initializePlugins() {
|
||||
}
|
||||
|
||||
// Load both installed plugins and plugin store.
|
||||
// On HTMX re-swaps use cached store data (fetchCommitInfo=false) to avoid
|
||||
// re-hitting GitHub on every tab switch; only fetch fresh on first load.
|
||||
const isReswap = !!window.pluginManager._reswap;
|
||||
// On HTMX re-swaps with a still-warm cache, skip GitHub metadata to avoid
|
||||
// re-hitting the API on every tab switch. If the cache TTL has expired even
|
||||
// during a re-swap, fetch fresh data including GitHub commit/version info.
|
||||
const isReswapWarm = !!window.pluginManager._reswap && !storeCacheExpired();
|
||||
window.pluginManager._reswap = false;
|
||||
loadInstalledPlugins();
|
||||
searchPluginStore(!isReswap);
|
||||
// Await the installed-plugins fetch so window.installedPlugins is populated before
|
||||
// searchPluginStore renders Installed/Reinstall badges against it.
|
||||
loadInstalledPlugins().catch(err => {
|
||||
console.error('[PluginStore] loadInstalledPlugins failed:', err);
|
||||
}).finally(() => {
|
||||
searchPluginStore(!isReswapWarm);
|
||||
});
|
||||
|
||||
// Setup search functionality (with guard against duplicate listeners)
|
||||
const searchInput = document.getElementById('plugin-search');
|
||||
@@ -4843,9 +4853,9 @@ window.executePluginAction = function(actionId, actionIndex, pluginIdParam = nul
|
||||
showNotification(data.message || 'Action completed successfully!', 'success');
|
||||
}
|
||||
} else {
|
||||
statusDiv.innerHTML = `<div class="text-red-600"><i class="fas fa-exclamation-circle mr-2"></i>${data.message}</div>`;
|
||||
statusDiv.innerHTML = `<div class="text-red-600"><i class="fas fa-exclamation-circle mr-2"></i>${escapeHtml(data.message || 'Error')}</div>`;
|
||||
if (data.output) {
|
||||
statusDiv.innerHTML += `<pre class="mt-2 text-xs bg-red-50 p-2 rounded overflow-auto max-h-32">${data.output}</pre>`;
|
||||
statusDiv.innerHTML += `<pre class="mt-2 text-xs bg-red-50 p-2 rounded overflow-auto max-h-32">${escapeHtml(data.output)}</pre>`;
|
||||
}
|
||||
btn.innerHTML = originalText;
|
||||
btn.disabled = false;
|
||||
@@ -4889,8 +4899,8 @@ window.executePluginAction = function(actionId, actionIndex, pluginIdParam = nul
|
||||
</div>
|
||||
<div class="mb-3">
|
||||
<p class="text-sm text-blue-700 mb-2">1. Click the link below to authorize:</p>
|
||||
<a href="${data.auth_url}" target="_blank" class="text-blue-600 hover:text-blue-800 underline break-all">
|
||||
${data.auth_url}
|
||||
<a href="${data.auth_url && data.auth_url.startsWith('http') ? escapeHtml(data.auth_url) : '#'}" target="_blank" class="text-blue-600 hover:text-blue-800 underline break-all">
|
||||
${escapeHtml(data.auth_url || '')}
|
||||
</a>
|
||||
</div>
|
||||
<div class="mb-2">
|
||||
@@ -4912,7 +4922,7 @@ window.executePluginAction = function(actionId, actionIndex, pluginIdParam = nul
|
||||
<div class="text-green-900 font-medium mb-2">
|
||||
<i class="fas fa-check-circle mr-2"></i>${data.message || 'Action completed successfully'}
|
||||
</div>
|
||||
${data.output ? `<pre class="mt-2 text-xs bg-green-50 p-2 rounded overflow-auto max-h-32">${data.output}</pre>` : ''}
|
||||
${data.output ? `<pre class="mt-2 text-xs bg-green-50 p-2 rounded overflow-auto max-h-32">${escapeHtml(data.output)}</pre>` : ''}
|
||||
</div>
|
||||
`;
|
||||
btn.innerHTML = originalText;
|
||||
@@ -4925,9 +4935,9 @@ window.executePluginAction = function(actionId, actionIndex, pluginIdParam = nul
|
||||
statusDiv.innerHTML = `
|
||||
<div class="bg-red-50 border border-red-200 rounded p-3">
|
||||
<div class="text-red-900 font-medium mb-2">
|
||||
<i class="fas fa-exclamation-circle mr-2"></i>${data.message || 'Action failed'}
|
||||
<i class="fas fa-exclamation-circle mr-2"></i>${escapeHtml(data.message || 'Action failed')}
|
||||
</div>
|
||||
${data.output ? `<pre class="mt-2 text-xs bg-red-50 p-2 rounded overflow-auto max-h-32">${data.output}</pre>` : ''}
|
||||
${data.output ? `<pre class="mt-2 text-xs bg-red-50 p-2 rounded overflow-auto max-h-32">${escapeHtml(data.output)}</pre>` : ''}
|
||||
</div>
|
||||
`;
|
||||
btn.innerHTML = originalText;
|
||||
@@ -5133,10 +5143,13 @@ function refreshPlugins() {
|
||||
pluginStoreCache = null;
|
||||
cacheTimestamp = null;
|
||||
|
||||
refreshInstalledPlugins(); // invalidates cache before fetching
|
||||
// Fetch latest metadata from GitHub when refreshing
|
||||
// refreshInstalledPlugins() is async (returns a Promise via loadInstalledPlugins).
|
||||
// Only search the store and notify after window.installedPlugins is updated so
|
||||
// that Installed/Reinstall badges reflect the freshly fetched state.
|
||||
refreshInstalledPlugins().then(() => {
|
||||
searchPluginStore(true);
|
||||
showNotification('Plugins refreshed with latest metadata from GitHub', 'success');
|
||||
});
|
||||
}
|
||||
|
||||
function restartDisplay() {
|
||||
@@ -8054,4 +8067,3 @@ setTimeout(function() {
|
||||
}
|
||||
});
|
||||
})();
|
||||
|
||||
|
||||
@@ -191,7 +191,10 @@ function doConnect() {
|
||||
// Poll for the new IP
|
||||
setTimeout(function() { checkNewIP(ssid); }, 3000);
|
||||
} else {
|
||||
showMsg(data.message || 'Connection failed', 'err');
|
||||
var msg = data.error_type === 'wrong_password'
|
||||
? 'Incorrect password — please try again'
|
||||
: (data.message || 'Connection failed');
|
||||
showMsg(msg, 'err');
|
||||
connecting = false;
|
||||
btn.disabled = false;
|
||||
btn.innerHTML = 'Connect';
|
||||
|
||||
Reference in New Issue
Block a user