mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-05-14 09:33:32 +00:00
- wifi_monitor_daemon: reset _consecutive_internet_failures = 0 in both NM-restart exception handlers; previously both left the counter at threshold, causing an immediate retry on the next iteration instead of waiting another full backoff period - api_v3: fix start_display failure message — when mode is set and systemctl returns non-zero, message now includes the failure reason and a hint rather than always reporting success phrasing - wifi_manager: move _redirect_backend from class variable to instance variable in __init__ alongside _ap_enabled_at; class-level default shadowed correctly in practice (single instance) but was misleading - wifi_manager: narrow broad except Exception in _check_internet_connectivity to (subprocess.SubprocessError, OSError) for ping and OSError for HTTP (urllib.error.URLError is an OSError subclass in Python 3) - wifi_manager: remove redundant local 'import re as _re' in _validate_ap_config; re is already imported at module level (line 37) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2546 lines
114 KiB
Python
2546 lines
114 KiB
Python
"""
|
|
WiFi Manager for Raspberry Pi LED Matrix
|
|
|
|
Handles WiFi connection management, access point mode, and network scanning.
|
|
Only enables AP mode when there is no active WiFi connection.
|
|
|
|
Tested and optimized for:
|
|
- Raspberry Pi OS Trixie (Debian 13) with NetworkManager/Netplan
|
|
- Raspberry Pi OS Bookworm (Debian 12) with NetworkManager
|
|
- Raspberry Pi 3B+, 4, 5 with built-in WiFi
|
|
|
|
Sudoers Requirements:
|
|
The following sudoers entries are required for passwordless operation.
|
|
Add to /etc/sudoers.d/ledmatrix_wifi:
|
|
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/nmcli
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/systemctl start hostapd
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/systemctl stop hostapd
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/systemctl start dnsmasq
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/systemctl stop dnsmasq
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/systemctl restart NetworkManager
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/sbin/ip
|
|
ledpi ALL=(ALL) NOPASSWD: /sbin/ip
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/sbin/rfkill
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/sbin/iptables
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/sbin/sysctl
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/cp /tmp/hostapd.conf /etc/hostapd/hostapd.conf
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/cp /tmp/dnsmasq.conf /etc/dnsmasq.d/ledmatrix-captive.conf
|
|
ledpi ALL=(ALL) NOPASSWD: /usr/bin/rm -f /etc/dnsmasq.d/ledmatrix-captive.conf
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
from dataclasses import dataclass, field
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Path for storing WiFi configuration (will be set dynamically)
|
|
# Default location, can be overridden
|
|
def get_wifi_config_path():
|
|
"""Get the WiFi configuration file path dynamically"""
|
|
# Try to determine project root
|
|
project_root = os.environ.get('LEDMATRIX_ROOT')
|
|
if not project_root:
|
|
# Try to find project root by looking for config directory
|
|
current = Path(__file__).resolve().parent.parent
|
|
if (current / 'config').exists():
|
|
project_root = str(current)
|
|
else:
|
|
# Fallback to common location
|
|
project_root = "/home/ledpi/LEDMatrix"
|
|
|
|
return Path(project_root) / "config" / "wifi_config.json"
|
|
|
|
HOSTAPD_CONFIG_PATH = Path("/etc/hostapd/hostapd.conf")
|
|
DNSMASQ_CONFIG_PATH = Path("/etc/dnsmasq.d/ledmatrix-captive.conf")
|
|
# Drop-in config for NetworkManager's built-in dnsmasq (ipv4.method=shared).
|
|
# Writing address=/#/<ap_ip> here causes NM to resolve every hostname to the AP,
|
|
# triggering the OS captive-portal popup automatically on iOS/Android/Windows/macOS.
|
|
NM_DNSMASQ_SHARED_DIR = Path("/etc/NetworkManager/dnsmasq-shared.d")
|
|
NM_DNSMASQ_SHARED_CONF = NM_DNSMASQ_SHARED_DIR / "ledmatrix-captive.conf"
|
|
HOSTAPD_SERVICE = "hostapd"
|
|
DNSMASQ_SERVICE = "dnsmasq"
|
|
|
|
# Default AP settings
|
|
DEFAULT_AP_SSID = "LEDMatrix-Setup"
|
|
DEFAULT_AP_CHANNEL = 7
|
|
|
|
# LED status message file (for display_controller integration)
|
|
LED_STATUS_FILE = None # Will be set dynamically
|
|
|
|
# NetworkManager connection file locations (Trixie uses /run, Bookworm uses /etc)
|
|
NM_CONNECTIONS_PATHS = [
|
|
Path("/etc/NetworkManager/system-connections"),
|
|
Path("/run/NetworkManager/system-connections"), # Trixie with Netplan
|
|
]
|
|
|
|
|
|
@dataclass
|
|
class WiFiNetwork:
|
|
"""Represents a WiFi network"""
|
|
ssid: str
|
|
signal: int
|
|
security: str # 'open', 'wpa', 'wpa2', 'wpa3'
|
|
frequency: float = 0.0
|
|
bssid: str = ""
|
|
|
|
|
|
@dataclass
|
|
class WiFiStatus:
|
|
"""Current WiFi connection status"""
|
|
connected: bool
|
|
ssid: Optional[str] = None
|
|
ip_address: Optional[str] = None
|
|
signal: int = 0
|
|
ap_mode_active: bool = False
|
|
|
|
|
|
class WiFiManager:
|
|
"""Manages WiFi connections and access point mode"""
|
|
|
|
def __init__(self, config_path: Optional[Path] = None):
|
|
"""
|
|
Initialize WiFi Manager
|
|
|
|
Args:
|
|
config_path: Path to WiFi configuration file (defaults to project config directory)
|
|
"""
|
|
if config_path is None:
|
|
self.config_path = get_wifi_config_path()
|
|
else:
|
|
self.config_path = config_path
|
|
self.config_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._load_config()
|
|
|
|
# Set LED status file path (for display_controller integration)
|
|
global LED_STATUS_FILE
|
|
if LED_STATUS_FILE is None:
|
|
project_root = self.config_path.parent.parent
|
|
LED_STATUS_FILE = project_root / "config" / "wifi_status.json"
|
|
|
|
# Check which tools are available
|
|
self.has_nmcli = self._check_command("nmcli")
|
|
self.has_iwlist = self._check_command("iwlist")
|
|
self.has_hostapd = self._check_command("hostapd")
|
|
self.has_dnsmasq = self._check_command("dnsmasq")
|
|
|
|
# Discover WiFi interface (don't hardcode wlan0)
|
|
self._wifi_interface = self._discover_wifi_interface()
|
|
|
|
# Detect if we're running on Trixie (Netplan-based NetworkManager)
|
|
self._is_trixie = self._detect_trixie()
|
|
|
|
# Initialize disconnected check counter for grace period
|
|
# This prevents AP mode from enabling on transient network hiccups
|
|
self._disconnected_checks = 0
|
|
self._disconnected_checks_required = 3 # Require 3 consecutive disconnected checks (90 seconds at 30s interval)
|
|
|
|
# Timestamp set when AP mode is enabled; used for the idle-timeout check
|
|
self._ap_enabled_at: Optional[float] = None
|
|
# Which redirect backend was used (iptables/nftables/None); set per-instance
|
|
self._redirect_backend: Optional[str] = None
|
|
|
|
logger.info(f"WiFi Manager initialized - nmcli: {self.has_nmcli}, iwlist: {self.has_iwlist}, "
|
|
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
|
|
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
|
|
|
|
def _show_led_message(self, message: str, duration: int = 5):
|
|
"""
|
|
Show a WiFi status message on the LED display.
|
|
Writes to a JSON file that display_controller can read.
|
|
|
|
Args:
|
|
message: Text to display
|
|
duration: How long to show message (seconds)
|
|
"""
|
|
try:
|
|
if LED_STATUS_FILE is None:
|
|
return
|
|
|
|
status = {
|
|
'message': message,
|
|
'timestamp': time.time(),
|
|
'duration': duration
|
|
}
|
|
LED_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(LED_STATUS_FILE, 'w') as f:
|
|
json.dump(status, f)
|
|
logger.info(f"LED message: {message}")
|
|
except Exception as e:
|
|
logger.debug(f"Could not write LED status message: {e}")
|
|
|
|
def _clear_led_message(self):
|
|
"""Clear any WiFi status message from LED display."""
|
|
try:
|
|
if LED_STATUS_FILE and LED_STATUS_FILE.exists():
|
|
LED_STATUS_FILE.unlink()
|
|
except Exception as e:
|
|
logger.debug(f"Could not clear LED status message: {e}")
|
|
|
|
def _check_command(self, command: str) -> bool:
|
|
"""Check if a command is available"""
|
|
try:
|
|
# First try 'which' command
|
|
result = subprocess.run(
|
|
["which", command],
|
|
capture_output=True,
|
|
timeout=2
|
|
)
|
|
if result.returncode == 0:
|
|
return True
|
|
|
|
# Check common sbin paths (not in standard user PATH)
|
|
sbin_paths = [
|
|
f"/usr/sbin/{command}",
|
|
f"/sbin/{command}",
|
|
f"/usr/local/sbin/{command}"
|
|
]
|
|
for path in sbin_paths:
|
|
if os.path.isfile(path) and os.access(path, os.X_OK):
|
|
return True
|
|
|
|
return False
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
|
return False
|
|
|
|
def _find_command_path(self, command: str) -> Optional[str]:
|
|
"""
|
|
Return the absolute path of a command, checking sbin locations that may not
|
|
be on PATH in restricted service environments. Returns None if not found.
|
|
"""
|
|
try:
|
|
result = subprocess.run(["which", command], capture_output=True,
|
|
text=True, timeout=2)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return result.stdout.strip()
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
|
pass
|
|
for path in [f"/usr/sbin/{command}", f"/sbin/{command}",
|
|
f"/usr/local/sbin/{command}"]:
|
|
if os.path.isfile(path) and os.access(path, os.X_OK):
|
|
return path
|
|
return None
|
|
|
|
def _discover_wifi_interface(self) -> str:
|
|
"""
|
|
Discover the primary WiFi interface name dynamically.
|
|
|
|
Returns the first WiFi interface found, or 'wlan0' as fallback.
|
|
Supports various interface naming schemes:
|
|
- Traditional: wlan0, wlan1
|
|
- Predictable: wlp2s0, wlx<mac>
|
|
- USB adapters: wlan1, wlx*
|
|
"""
|
|
try:
|
|
if self.has_nmcli:
|
|
# Use nmcli to find WiFi devices (most reliable on NetworkManager systems)
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "DEVICE,TYPE", "device", "status"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.split(':')
|
|
if len(parts) >= 2 and parts[1].strip() == 'wifi':
|
|
interface = parts[0].strip()
|
|
logger.debug(f"Discovered WiFi interface via nmcli: {interface}")
|
|
return interface
|
|
|
|
# Fallback: Check /sys/class/net for wireless interfaces
|
|
net_path = Path("/sys/class/net")
|
|
if net_path.exists():
|
|
for iface in net_path.iterdir():
|
|
wireless_path = iface / "wireless"
|
|
if wireless_path.exists():
|
|
interface = iface.name
|
|
logger.debug(f"Discovered WiFi interface via /sys: {interface}")
|
|
return interface
|
|
|
|
# Last resort: Check common interface names
|
|
for iface in ["wlan0", "wlan1", "wlp2s0", "wlp3s0"]:
|
|
iface_path = Path(f"/sys/class/net/{iface}")
|
|
if iface_path.exists():
|
|
logger.debug(f"Found WiFi interface by name probe: {iface}")
|
|
return iface
|
|
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError) as e:
|
|
logger.warning(f"Error discovering WiFi interface: {e}")
|
|
|
|
logger.warning("Could not discover WiFi interface, defaulting to wlan0")
|
|
return "wlan0"
|
|
|
|
def _detect_trixie(self) -> bool:
|
|
"""
|
|
Detect if running on Raspberry Pi OS Trixie (Debian 13).
|
|
|
|
Trixie uses Netplan with NetworkManager, which changes behavior:
|
|
- Connection files are stored in /run/NetworkManager/system-connections
|
|
- nmcli hotspot requires different handling
|
|
- PMF (Protected Management Frames) may need to be disabled
|
|
"""
|
|
try:
|
|
# Check for Netplan (primary indicator of Trixie)
|
|
netplan_path = Path("/etc/netplan")
|
|
if netplan_path.exists() and any(netplan_path.glob("*.yaml")):
|
|
logger.debug("Detected Trixie: Netplan configuration found")
|
|
return True
|
|
|
|
# Check Debian version
|
|
os_release = Path("/etc/os-release")
|
|
if os_release.exists():
|
|
content = os_release.read_text()
|
|
if 'VERSION_CODENAME=trixie' in content or 'VERSION_ID="13"' in content:
|
|
logger.debug("Detected Trixie: os-release indicates Debian 13")
|
|
return True
|
|
|
|
# Check if NM connections are in /run (Trixie behavior)
|
|
# NM_CONNECTIONS_PATHS[0] = /etc/..., NM_CONNECTIONS_PATHS[1] = /run/...
|
|
etc_nm_path = NM_CONNECTIONS_PATHS[0] # Bookworm location
|
|
run_nm_path = NM_CONNECTIONS_PATHS[1] # Trixie location
|
|
if run_nm_path.exists() and any(run_nm_path.glob("*.nmconnection")):
|
|
if not etc_nm_path.exists() or not any(etc_nm_path.glob("*.nmconnection")):
|
|
logger.debug("Detected Trixie: NM connections in /run only")
|
|
return True
|
|
|
|
except (OSError, PermissionError) as e:
|
|
logger.debug(f"Could not detect Trixie: {e}")
|
|
|
|
return False
|
|
|
|
def _load_config(self):
|
|
"""Load WiFi configuration from file"""
|
|
if self.config_path.exists():
|
|
try:
|
|
with open(self.config_path, 'r') as f:
|
|
self.config = json.load(f)
|
|
logger.info(f"Loaded WiFi config from {self.config_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load WiFi config: {e}")
|
|
self.config = {}
|
|
else:
|
|
self.config = {
|
|
"ap_ssid": DEFAULT_AP_SSID,
|
|
"ap_channel": DEFAULT_AP_CHANNEL,
|
|
"auto_enable_ap_mode": True, # Default: auto-enable when no network (safe due to grace period)
|
|
"saved_networks": []
|
|
}
|
|
self._save_config()
|
|
|
|
# Ensure auto_enable_ap_mode exists in config (for existing configs)
|
|
if "auto_enable_ap_mode" not in self.config:
|
|
self.config["auto_enable_ap_mode"] = True # Default: auto-enable when no network (safe due to grace period)
|
|
self._save_config()
|
|
|
|
def _save_config(self):
|
|
"""Save WiFi configuration to file"""
|
|
try:
|
|
with open(self.config_path, 'w') as f:
|
|
json.dump(self.config, f, indent=2)
|
|
logger.info(f"Saved WiFi config to {self.config_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save WiFi config: {e}")
|
|
|
|
def get_wifi_status(self) -> WiFiStatus:
|
|
"""
|
|
Get current WiFi connection status
|
|
|
|
Returns:
|
|
WiFiStatus object with connection information
|
|
"""
|
|
try:
|
|
if self.has_nmcli:
|
|
return self._get_status_nmcli()
|
|
else:
|
|
return self._get_status_iwconfig()
|
|
except Exception as e:
|
|
logger.error(f"Error getting WiFi status: {e}")
|
|
return WiFiStatus(connected=False)
|
|
|
|
def _get_status_nmcli(self) -> WiFiStatus:
|
|
"""Get WiFi status using nmcli"""
|
|
try:
|
|
# Check if connected - use device status first (more reliable)
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "DEVICE,TYPE,STATE", "device", "status"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.warning("nmcli device status failed, assuming disconnected")
|
|
return WiFiStatus(connected=False)
|
|
|
|
wifi_connected = False
|
|
ssid = None
|
|
ip_address = None
|
|
signal = 0
|
|
wlan_device = None
|
|
|
|
# Find WiFi device and check its state
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(':')
|
|
if len(parts) >= 3:
|
|
device = parts[0].strip()
|
|
dev_type = parts[1].strip().lower()
|
|
state = parts[2].strip().lower()
|
|
|
|
# Check if it's a WiFi device
|
|
if dev_type == "wifi" or device.startswith("wlan"):
|
|
wlan_device = device
|
|
if state == "connected":
|
|
wifi_connected = True
|
|
break
|
|
elif state in ["disconnected", "unavailable", "unmanaged"]:
|
|
# Explicitly disconnected
|
|
wifi_connected = False
|
|
break
|
|
|
|
# Get actual SSID and signal strength from WiFi device if connected
|
|
# Use device show to get the real SSID and signal, not the connection name
|
|
if wifi_connected and wlan_device:
|
|
# Get both SSID and signal in one query for efficiency
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "802-11-wireless.ssid,WIFI.SIGNAL", "device", "show", wlan_device],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if '802-11-wireless.ssid:' in line:
|
|
ssid = line.split(':', 1)[1].strip()
|
|
if ssid:
|
|
continue
|
|
elif 'WIFI.SIGNAL:' in line:
|
|
try:
|
|
signal = int(line.split(':', 1)[1].strip())
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
# Fallback: Get SSID from active WiFi connection list if not found
|
|
if not ssid:
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "active,ssid", "device", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
parts = line.split(':')
|
|
if len(parts) >= 2 and parts[0].strip() == "yes":
|
|
ssid = parts[1].strip()
|
|
if ssid:
|
|
break
|
|
|
|
# Fallback: Get signal strength if not already retrieved
|
|
if signal == 0 and wlan_device:
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "WIFI.SIGNAL", "device", "show", wlan_device],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if 'WIFI.SIGNAL:' in line:
|
|
try:
|
|
signal = int(line.split(':', 1)[1].strip())
|
|
break
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
# Get IP address if connected
|
|
if wifi_connected and wlan_device:
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "IP4.ADDRESS", "device", "show", wlan_device],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if '/' in line:
|
|
ip_address = line.split('/')[0].strip()
|
|
break
|
|
|
|
# Final fallback: Get signal strength by matching SSID in WiFi list
|
|
# (Only if we still don't have signal from device properties)
|
|
if signal == 0 and ssid:
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "SSID,SIGNAL", "device", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
parts = line.split(':')
|
|
if len(parts) >= 2:
|
|
line_ssid = parts[0].strip()
|
|
if line_ssid == ssid:
|
|
try:
|
|
signal = int(parts[1].strip())
|
|
break
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
# Check if AP mode is active
|
|
ap_active = self._is_ap_mode_active()
|
|
|
|
return WiFiStatus(
|
|
connected=wifi_connected,
|
|
ssid=ssid,
|
|
ip_address=ip_address,
|
|
signal=signal,
|
|
ap_mode_active=ap_active
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting status with nmcli: {e}")
|
|
return WiFiStatus(connected=False)
|
|
|
|
def _get_status_iwconfig(self) -> WiFiStatus:
|
|
"""Get WiFi status using iwconfig (fallback)"""
|
|
try:
|
|
result = subprocess.run(
|
|
["iwconfig", self._wifi_interface],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return WiFiStatus(connected=False)
|
|
|
|
output = result.stdout
|
|
connected = "ESSID:" in output and "not-associated" not in output
|
|
|
|
ssid = None
|
|
if connected:
|
|
match = re.search(r'ESSID:"([^"]+)"', output)
|
|
if match:
|
|
ssid = match.group(1)
|
|
|
|
# Get IP address
|
|
ip_address = None
|
|
if connected:
|
|
result = subprocess.run(
|
|
["hostname", "-I"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
ips = result.stdout.strip().split()
|
|
for ip in ips:
|
|
if not ip.startswith('192.168.4.1'): # Exclude AP IP
|
|
ip_address = ip
|
|
break
|
|
|
|
ap_active = self._is_ap_mode_active()
|
|
|
|
return WiFiStatus(
|
|
connected=connected,
|
|
ssid=ssid,
|
|
ip_address=ip_address,
|
|
ap_mode_active=ap_active
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting status with iwconfig: {e}")
|
|
return WiFiStatus(connected=False)
|
|
|
|
def _is_ethernet_connected(self) -> bool:
|
|
"""
|
|
Check if Ethernet connection is active
|
|
|
|
Returns:
|
|
True if Ethernet is connected and has an IP address
|
|
"""
|
|
try:
|
|
# Check for Ethernet interfaces (eth0, enp*, etc.)
|
|
# First try nmcli if available
|
|
if self.has_nmcli:
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "DEVICE,TYPE,STATE", "device", "status"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
parts = line.split(':')
|
|
if len(parts) >= 3:
|
|
device = parts[0].strip()
|
|
dev_type = parts[1].strip().lower()
|
|
state = parts[2].strip().lower()
|
|
|
|
# Check if it's an Ethernet interface and connected
|
|
if dev_type == "ethernet" and state == "connected":
|
|
# Verify it has an IP address
|
|
ip_result = subprocess.run(
|
|
["nmcli", "-t", "-f", "IP4.ADDRESS", "device", "show", device],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if ip_result.returncode == 0 and ip_result.stdout.strip():
|
|
return True
|
|
|
|
# Fallback: Check using ip command
|
|
result = subprocess.run(
|
|
["ip", "addr", "show"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
# Look for Ethernet interfaces (eth0, enp*, etc.)
|
|
lines = result.stdout.split('\n')
|
|
in_ethernet = False
|
|
for line in lines:
|
|
# Check if line starts interface name (e.g., "2: eth0:")
|
|
if re.match(r'^\d+:\s+(eth\d+|enp\d+s\d+|enx[0-9a-f]+):', line):
|
|
in_ethernet = True
|
|
elif in_ethernet and 'inet ' in line and not '127.0.0.1' in line:
|
|
# Found an IP address on Ethernet interface
|
|
return True
|
|
elif re.match(r'^\d+:', line) and in_ethernet:
|
|
# Moved to next interface
|
|
in_ethernet = False
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.debug(f"Error checking Ethernet connection: {e}")
|
|
return False
|
|
|
|
def _has_connectivity_safety(self) -> bool:
|
|
"""
|
|
Check if there's a safe fallback connectivity option available.
|
|
|
|
Returns True if either:
|
|
- Ethernet is connected, OR
|
|
- WiFi radio is enabled (even if not connected to a network)
|
|
|
|
This helps prevent lockout scenarios where we might disable WiFi
|
|
without having Ethernet as backup.
|
|
|
|
Returns:
|
|
True if there's a safe connectivity option available
|
|
"""
|
|
try:
|
|
# Check if Ethernet is connected (safest fallback)
|
|
if self._is_ethernet_connected():
|
|
return True
|
|
|
|
# Check if WiFi radio is enabled (at least WiFi is available)
|
|
result = subprocess.run(
|
|
["nmcli", "radio", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
status = result.stdout.strip().lower()
|
|
if status == "enabled":
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.debug(f"Error checking connectivity safety: {e}")
|
|
# If we can't determine, assume unsafe to be conservative
|
|
return False
|
|
|
|
def _is_ap_mode_active(self) -> bool:
|
|
"""Check if access point mode is currently active"""
|
|
try:
|
|
# Check if hostapd is running (captive portal mode)
|
|
result = subprocess.run(
|
|
["systemctl", "is-active", HOSTAPD_SERVICE],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2
|
|
)
|
|
if result.stdout.strip() == "active":
|
|
return True
|
|
|
|
# Check if nmcli hotspot is active (fallback mode)
|
|
hotspot_status = self._get_ap_status_nmcli()
|
|
if hotspot_status.get('active'):
|
|
return True
|
|
|
|
return False
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
|
return False
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved")
|
|
|
|
def _validate_ap_config(self) -> Tuple[str, int]:
|
|
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
|
|
ssid = str(self.config.get("ap_ssid", DEFAULT_AP_SSID))
|
|
if not ssid or len(ssid) > 32 or not re.match(r'^[\x20-\x7E]+$', ssid):
|
|
logger.warning(f"AP SSID '{ssid}' is invalid, falling back to default")
|
|
ssid = DEFAULT_AP_SSID
|
|
try:
|
|
channel = int(self.config.get("ap_channel", DEFAULT_AP_CHANNEL))
|
|
if channel < 1 or channel > 14:
|
|
raise ValueError
|
|
except (TypeError, ValueError):
|
|
logger.warning("AP channel out of range, falling back to default")
|
|
channel = DEFAULT_AP_CHANNEL
|
|
return ssid, channel
|
|
|
|
def _setup_iptables_redirect(self) -> bool:
|
|
"""
|
|
Add port 80 → 5000 redirect rules for the captive portal.
|
|
|
|
Tries iptables first, falls back to nftables (used by Debian Trixie).
|
|
When neither tool is available, logs a warning and returns True — the AP
|
|
still works and DNS spoofing still triggers the OS popup; users just land
|
|
on port 5000 directly rather than being redirected from port 80.
|
|
|
|
Only returns False when a tool was found but the rule addition itself failed.
|
|
"""
|
|
try:
|
|
iptables = self._find_command_path("iptables")
|
|
nft = self._find_command_path("nft")
|
|
|
|
if not iptables and not nft:
|
|
logger.warning(
|
|
"Neither iptables nor nft found; captive portal port-80 redirect unavailable. "
|
|
"DNS spoofing will still trigger the OS popup but HTTP on port 80 won't reach Flask."
|
|
)
|
|
self._redirect_backend = None
|
|
return True # AP works; redirect is best-effort
|
|
|
|
if iptables:
|
|
return self._setup_iptables_redirect_iptables(iptables)
|
|
else:
|
|
return self._setup_iptables_redirect_nftables(nft)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not set up port redirect: {e}")
|
|
try:
|
|
self._teardown_iptables_redirect()
|
|
except Exception as cleanup_e:
|
|
logger.warning(f"Cleanup after redirect exception also failed: {cleanup_e}")
|
|
return False
|
|
|
|
def _setup_iptables_redirect_iptables(self, iptables: str) -> bool:
|
|
"""Set up port 80→5000 redirect using iptables."""
|
|
# Save ip_forward state before enabling
|
|
try:
|
|
current_fwd = Path("/proc/sys/net/ipv4/ip_forward").read_text().strip()
|
|
except OSError:
|
|
current_fwd = None
|
|
if current_fwd is not None:
|
|
try:
|
|
self._IP_FORWARD_SAVE_PATH.write_text(current_fwd)
|
|
except OSError:
|
|
current_fwd = None
|
|
logger.warning("Could not write ip_forward save file; state will not be restored")
|
|
|
|
if current_fwd != "1":
|
|
sysctl = self._find_command_path("sysctl")
|
|
sysctl_bin = sysctl if sysctl else "sysctl"
|
|
r = subprocess.run(["sudo", sysctl_bin, "-w", "net.ipv4.ip_forward=1"],
|
|
capture_output=True, text=True, timeout=5)
|
|
if r.returncode != 0:
|
|
logger.error(f"Failed to enable ip_forward: {r.stderr.strip()}")
|
|
self._teardown_iptables_redirect()
|
|
return False
|
|
|
|
if subprocess.run(
|
|
["sudo", iptables, "-t", "nat", "-C", "PREROUTING",
|
|
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
|
"-j", "REDIRECT", "--to-port", "5000"],
|
|
capture_output=True, timeout=5
|
|
).returncode != 0:
|
|
r = subprocess.run(
|
|
["sudo", iptables, "-t", "nat", "-A", "PREROUTING",
|
|
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
|
"-j", "REDIRECT", "--to-port", "5000"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if r.returncode != 0:
|
|
logger.error(f"Failed to add PREROUTING rule: {r.stderr.strip()}")
|
|
self._teardown_iptables_redirect()
|
|
return False
|
|
|
|
if subprocess.run(
|
|
["sudo", iptables, "-C", "INPUT",
|
|
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000", "-j", "ACCEPT"],
|
|
capture_output=True, timeout=5
|
|
).returncode != 0:
|
|
r = subprocess.run(
|
|
["sudo", iptables, "-A", "INPUT",
|
|
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000", "-j", "ACCEPT"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if r.returncode != 0:
|
|
logger.error(f"Failed to add INPUT rule: {r.stderr.strip()}")
|
|
self._teardown_iptables_redirect()
|
|
return False
|
|
|
|
self._redirect_backend = "iptables"
|
|
logger.info("iptables: port 80→5000 redirect rules added")
|
|
return True
|
|
|
|
def _setup_iptables_redirect_nftables(self, nft: str) -> bool:
|
|
"""Set up port 80→5000 redirect using nftables (Debian Trixie / modern systems)."""
|
|
# NM's ipv4.method=shared already enables ip_forward; no sysctl needed.
|
|
cmds = [
|
|
["sudo", nft, "add", "table", "ip", "ledmatrix"],
|
|
["sudo", nft, "add", "chain", "ip", "ledmatrix", "prerouting",
|
|
"{", "type", "nat", "hook", "prerouting", "priority", "-100", ";", "}"],
|
|
["sudo", nft, "add", "rule", "ip", "ledmatrix", "prerouting",
|
|
"iif", self._wifi_interface, "tcp", "dport", "80", "redirect", "to", ":5000"],
|
|
]
|
|
for cmd in cmds:
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
if r.returncode != 0:
|
|
# Table/chain may already exist — only fail on rule add
|
|
if "add rule" in " ".join(cmd):
|
|
logger.error(f"Failed to add nftables redirect rule: {r.stderr.strip()}")
|
|
self._teardown_iptables_redirect()
|
|
return False
|
|
logger.debug(f"nft cmd non-zero (may already exist): {r.stderr.strip()}")
|
|
|
|
self._redirect_backend = "nftables"
|
|
logger.info("nftables: port 80→5000 redirect rule added")
|
|
return True
|
|
|
|
def _teardown_iptables_redirect(self) -> None:
|
|
"""Remove the port 80→5000 redirect rules and restore ip_forward if saved."""
|
|
try:
|
|
backend = self._redirect_backend
|
|
self._redirect_backend = None
|
|
|
|
if backend == "iptables":
|
|
iptables = self._find_command_path("iptables")
|
|
if iptables:
|
|
subprocess.run(
|
|
["sudo", iptables, "-t", "nat", "-D", "PREROUTING",
|
|
"-i", self._wifi_interface, "-p", "tcp", "--dport", "80",
|
|
"-j", "REDIRECT", "--to-port", "5000"],
|
|
capture_output=True, timeout=5
|
|
)
|
|
subprocess.run(
|
|
["sudo", iptables, "-D", "INPUT",
|
|
"-i", self._wifi_interface, "-p", "tcp", "--dport", "5000",
|
|
"-j", "ACCEPT"],
|
|
capture_output=True, timeout=5
|
|
)
|
|
# Restore ip_forward only when we saved it
|
|
if self._IP_FORWARD_SAVE_PATH.exists():
|
|
try:
|
|
saved = self._IP_FORWARD_SAVE_PATH.read_text().strip()
|
|
self._IP_FORWARD_SAVE_PATH.unlink(missing_ok=True)
|
|
sysctl = self._find_command_path("sysctl")
|
|
sysctl_bin = sysctl if sysctl else "sysctl"
|
|
subprocess.run(["sudo", sysctl_bin, "-w", f"net.ipv4.ip_forward={saved}"],
|
|
capture_output=True, timeout=5)
|
|
logger.info(f"ip_forward restored to {saved}")
|
|
except OSError as e:
|
|
logger.warning(f"Could not restore ip_forward: {e}")
|
|
else:
|
|
logger.debug("ip_forward not modified by setup; leaving unchanged")
|
|
|
|
elif backend == "nftables":
|
|
nft = self._find_command_path("nft")
|
|
if nft:
|
|
subprocess.run(
|
|
["sudo", nft, "delete", "table", "ip", "ledmatrix"],
|
|
capture_output=True, timeout=5
|
|
)
|
|
logger.info("nftables ledmatrix table removed")
|
|
|
|
else:
|
|
# No redirect was set up (neither tool available); nothing to tear down
|
|
self._IP_FORWARD_SAVE_PATH.unlink(missing_ok=True)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not tear down port redirect: {e}")
|
|
|
|
def _write_nm_dnsmasq_captive_conf(self, ap_ip: str = "192.168.4.1") -> None:
|
|
"""
|
|
Write the NM dnsmasq-shared.d drop-in that makes NM's built-in dnsmasq
|
|
resolve every hostname to the AP IP. This triggers the OS captive-portal
|
|
popup automatically on iOS / Android / Windows / macOS as soon as the
|
|
device connects — no manual navigation required.
|
|
|
|
NetworkManager reads /etc/NetworkManager/dnsmasq-shared.d/*.conf when it
|
|
starts the dnsmasq instance for ipv4.method=shared connections.
|
|
"""
|
|
try:
|
|
content = f"# LEDMatrix captive portal: resolve all hostnames to AP\naddress=/#/{ap_ip}\n"
|
|
with open("/tmp/ledmatrix-nm-dnsmasq.conf", "w") as f:
|
|
f.write(content)
|
|
subprocess.run(
|
|
["sudo", "mkdir", "-p", str(NM_DNSMASQ_SHARED_DIR)],
|
|
capture_output=True, timeout=5
|
|
)
|
|
subprocess.run(
|
|
["sudo", "cp", "/tmp/ledmatrix-nm-dnsmasq.conf", str(NM_DNSMASQ_SHARED_CONF)],
|
|
capture_output=True, timeout=5
|
|
)
|
|
logger.info(f"Wrote NM dnsmasq captive-portal config: {NM_DNSMASQ_SHARED_CONF}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not write NM dnsmasq captive config: {e}")
|
|
|
|
def _remove_nm_dnsmasq_captive_conf(self) -> None:
|
|
"""Remove the NM dnsmasq-shared.d drop-in written by _write_nm_dnsmasq_captive_conf."""
|
|
try:
|
|
subprocess.run(
|
|
["sudo", "rm", "-f", str(NM_DNSMASQ_SHARED_CONF)],
|
|
capture_output=True, timeout=5
|
|
)
|
|
logger.info("Removed NM dnsmasq captive-portal config")
|
|
except Exception as e:
|
|
logger.warning(f"Could not remove NM dnsmasq captive config: {e}")
|
|
|
|
def _check_internet_connectivity(self, timeout: int = 5) -> bool:
|
|
"""
|
|
Test actual internet reachability — not just nmcli association state.
|
|
|
|
A device can be 'connected' in nmcli (associated with an AP) while the
|
|
router has no WAN link. This check catches that case so the daemon can
|
|
auto-enable AP mode even when nmcli reports a connection.
|
|
|
|
Returns True if at least one reachability method succeeds.
|
|
"""
|
|
try:
|
|
r = subprocess.run(
|
|
["ping", "-c", "1", "-W", str(timeout), "8.8.8.8"],
|
|
capture_output=True, timeout=timeout + 1
|
|
)
|
|
if r.returncode == 0:
|
|
logger.debug("Internet connectivity confirmed via ping 8.8.8.8")
|
|
return True
|
|
except (subprocess.SubprocessError, OSError):
|
|
pass
|
|
try:
|
|
import urllib.request as _ureq
|
|
_ureq.urlopen("http://connectivity-check.ubuntu.com/", timeout=timeout)
|
|
logger.debug("Internet connectivity confirmed via HTTP check")
|
|
return True
|
|
except OSError:
|
|
pass
|
|
logger.debug("Internet connectivity check failed (both ping and HTTP)")
|
|
return False
|
|
|
|
def check_internet_connectivity(self, timeout: int = 5) -> bool:
|
|
"""Public wrapper around _check_internet_connectivity for use by the daemon."""
|
|
return self._check_internet_connectivity(timeout=timeout)
|
|
|
|
def _has_ap_clients(self) -> bool:
|
|
"""
|
|
Return True if at least one client is associated with the AP.
|
|
Uses 'iw dev <iface> station dump' which works for both hostapd and
|
|
nmcli AP modes.
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
["iw", "dev", self._wifi_interface, "station", "dump"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
return bool(result.stdout.strip())
|
|
except Exception:
|
|
return False
|
|
|
|
def scan_networks(self, allow_cached: bool = True) -> Tuple[List[WiFiNetwork], bool]:
|
|
"""
|
|
Scan for available WiFi networks.
|
|
|
|
When AP mode is active, returns cached scan results instead of
|
|
disabling AP (which would disconnect the user). Cached results
|
|
come from either nmcli's internal cache or a pre-scan file saved
|
|
before AP mode was enabled.
|
|
|
|
Returns:
|
|
Tuple of (list of WiFiNetwork objects, was_cached bool)
|
|
"""
|
|
try:
|
|
ap_active = self._is_ap_mode_active()
|
|
|
|
if ap_active:
|
|
# Don't disable AP — user would lose their connection.
|
|
# Try nmcli cached results first (no rescan trigger).
|
|
logger.info("AP mode active — returning cached scan results")
|
|
networks = self._scan_nmcli_cached()
|
|
if not networks and allow_cached:
|
|
networks = self._load_cached_scan()
|
|
return networks, True
|
|
|
|
# Normal scan (not in AP mode)
|
|
if self.has_nmcli:
|
|
networks = self._scan_nmcli()
|
|
elif self.has_iwlist:
|
|
networks = self._scan_iwlist()
|
|
else:
|
|
logger.error("No WiFi scanning tools available")
|
|
networks = []
|
|
|
|
# Save results for later use in AP mode
|
|
if networks:
|
|
self._save_cached_scan(networks)
|
|
|
|
return networks, False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scanning networks: {e}")
|
|
return [], False
|
|
|
|
def _scan_nmcli_cached(self) -> List[WiFiNetwork]:
|
|
"""Return nmcli's cached WiFi list without triggering a rescan."""
|
|
networks = []
|
|
try:
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY,FREQ", "device", "wifi", "list"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if result.returncode != 0:
|
|
return []
|
|
|
|
seen_ssids = set()
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or ':' not in line:
|
|
continue
|
|
parts = line.split(':')
|
|
if len(parts) >= 3:
|
|
ssid = parts[0].strip()
|
|
if not ssid or ssid in seen_ssids:
|
|
continue
|
|
seen_ssids.add(ssid)
|
|
try:
|
|
signal = int(parts[1].strip())
|
|
security = parts[2].strip() if len(parts) > 2 else "open"
|
|
frequency_str = parts[3].strip() if len(parts) > 3 else "0"
|
|
frequency_str = frequency_str.replace(" MHz", "").replace("MHz", "").strip()
|
|
frequency = float(frequency_str) if frequency_str else 0.0
|
|
if "WPA3" in security:
|
|
sec_type = "wpa3"
|
|
elif "WPA2" in security:
|
|
sec_type = "wpa2"
|
|
elif "WPA" in security:
|
|
sec_type = "wpa"
|
|
else:
|
|
sec_type = "open"
|
|
networks.append(WiFiNetwork(ssid=ssid, signal=signal, security=sec_type, frequency=frequency))
|
|
except (ValueError, IndexError):
|
|
continue
|
|
networks.sort(key=lambda x: x.signal, reverse=True)
|
|
except Exception as e:
|
|
logger.debug(f"nmcli cached list failed: {e}")
|
|
return networks
|
|
|
|
def _save_cached_scan(self, networks: List[WiFiNetwork]) -> None:
|
|
"""Save scan results to a cache file for use during AP mode."""
|
|
try:
|
|
cache_path = get_wifi_config_path().parent / "cached_networks.json"
|
|
data = [{"ssid": n.ssid, "signal": n.signal, "security": n.security, "frequency": n.frequency} for n in networks]
|
|
with open(cache_path, 'w') as f:
|
|
json.dump({"timestamp": time.time(), "networks": data}, f)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to save cached scan: {e}")
|
|
|
|
def _load_cached_scan(self) -> List[WiFiNetwork]:
|
|
"""Load pre-cached scan results (saved before AP mode was enabled)."""
|
|
try:
|
|
cache_path = get_wifi_config_path().parent / "cached_networks.json"
|
|
if not cache_path.exists():
|
|
return []
|
|
with open(cache_path) as f:
|
|
data = json.load(f)
|
|
# Accept cache up to 10 minutes old
|
|
if time.time() - data.get("timestamp", 0) > 600:
|
|
return []
|
|
return [WiFiNetwork(ssid=n["ssid"], signal=n["signal"], security=n["security"], frequency=n.get("frequency", 0.0))
|
|
for n in data.get("networks", [])]
|
|
except Exception as e:
|
|
logger.debug(f"Failed to load cached scan: {e}")
|
|
return []
|
|
|
|
def _scan_nmcli(self) -> List[WiFiNetwork]:
|
|
"""Scan networks using nmcli"""
|
|
networks = []
|
|
try:
|
|
# Trigger scan
|
|
subprocess.run(
|
|
["nmcli", "device", "wifi", "rescan"],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
time.sleep(2) # Wait for scan to complete
|
|
|
|
# Get scan results
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "SSID,SIGNAL,SECURITY,FREQ", "device", "wifi", "list"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return []
|
|
|
|
seen_ssids = set()
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line or ':' not in line:
|
|
continue
|
|
|
|
parts = line.split(':')
|
|
if len(parts) >= 3:
|
|
ssid = parts[0].strip()
|
|
if not ssid or ssid in seen_ssids:
|
|
continue
|
|
|
|
seen_ssids.add(ssid)
|
|
|
|
try:
|
|
signal = int(parts[1].strip())
|
|
security = parts[2].strip() if len(parts) > 2 else "open"
|
|
|
|
# Parse frequency - strip " MHz" if present
|
|
frequency_str = parts[3].strip() if len(parts) > 3 else "0"
|
|
frequency_str = frequency_str.replace(" MHz", "").replace("MHz", "").strip()
|
|
frequency = float(frequency_str) if frequency_str else 0.0
|
|
|
|
# Normalize security type
|
|
if "WPA3" in security:
|
|
sec_type = "wpa3"
|
|
elif "WPA2" in security:
|
|
sec_type = "wpa2"
|
|
elif "WPA" in security:
|
|
sec_type = "wpa"
|
|
else:
|
|
sec_type = "open"
|
|
|
|
networks.append(WiFiNetwork(
|
|
ssid=ssid,
|
|
signal=signal,
|
|
security=sec_type,
|
|
frequency=frequency
|
|
))
|
|
except (ValueError, IndexError) as e:
|
|
logger.debug(f"Skipping network line due to parsing error: {line[:50]}... Error: {e}")
|
|
continue
|
|
|
|
# Sort by signal strength
|
|
networks.sort(key=lambda x: x.signal, reverse=True)
|
|
return networks
|
|
except Exception as e:
|
|
logger.error(f"Error scanning with nmcli: {e}")
|
|
return []
|
|
|
|
def _scan_iwlist(self) -> List[WiFiNetwork]:
|
|
"""Scan networks using iwlist (fallback)"""
|
|
networks = []
|
|
try:
|
|
result = subprocess.run(
|
|
["iwlist", self._wifi_interface, "scan"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return []
|
|
|
|
output = result.stdout
|
|
seen_ssids = set()
|
|
current_ssid = None
|
|
current_signal = 0
|
|
current_security = "open"
|
|
|
|
for line in output.split('\n'):
|
|
line = line.strip()
|
|
|
|
# Extract SSID
|
|
if 'ESSID:' in line:
|
|
match = re.search(r'ESSID:"([^"]+)"', line)
|
|
if match:
|
|
if current_ssid and current_ssid not in seen_ssids:
|
|
networks.append(WiFiNetwork(
|
|
ssid=current_ssid,
|
|
signal=current_signal,
|
|
security=current_security
|
|
))
|
|
seen_ssids.add(current_ssid)
|
|
current_ssid = match.group(1)
|
|
current_signal = 0
|
|
current_security = "open"
|
|
|
|
# Extract signal strength
|
|
elif 'Signal level=' in line:
|
|
match = re.search(r'Signal level=(-?\d+)', line)
|
|
if match:
|
|
# Convert to percentage (approximate)
|
|
dbm = int(match.group(1))
|
|
current_signal = max(0, min(100, (dbm + 100) * 2))
|
|
|
|
# Extract security
|
|
elif 'Encryption key:' in line:
|
|
if 'on' in line.lower():
|
|
current_security = "wpa" # Default, will check for WPA2/WPA3
|
|
elif 'WPA2' in line:
|
|
current_security = "wpa2"
|
|
elif 'WPA3' in line:
|
|
current_security = "wpa3"
|
|
|
|
# Add last network
|
|
if current_ssid and current_ssid not in seen_ssids:
|
|
networks.append(WiFiNetwork(
|
|
ssid=current_ssid,
|
|
signal=current_signal,
|
|
security=current_security
|
|
))
|
|
|
|
# Sort by signal strength
|
|
networks.sort(key=lambda x: x.signal, reverse=True)
|
|
return networks
|
|
except Exception as e:
|
|
logger.error(f"Error scanning with iwlist: {e}")
|
|
return []
|
|
|
|
def connect_to_network(self, ssid: str, password: str) -> Tuple[bool, str]:
|
|
"""
|
|
Connect to a WiFi network with failsafe to restore original connection on failure.
|
|
|
|
Args:
|
|
ssid: Network SSID
|
|
password: Network password (empty for open networks)
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
# Save current connection info for failsafe restoration
|
|
original_connection = None
|
|
original_ssid = None
|
|
try:
|
|
status = self.get_wifi_status()
|
|
if status.connected and status.ssid:
|
|
original_ssid = status.ssid
|
|
# Get the active connection name/UUID for WiFi interface
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "GENERAL.CONNECTION", "device", "show", self._wifi_interface],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if 'GENERAL.CONNECTION:' in line:
|
|
connection_name = line.split(':', 1)[1].strip()
|
|
if connection_name and connection_name != '--':
|
|
original_connection = connection_name
|
|
break
|
|
|
|
# Fallback: try to find connection by SSID
|
|
if not original_connection:
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "NAME", "connection", "show"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if original_ssid.lower() in line.lower():
|
|
original_connection = line.strip()
|
|
break
|
|
|
|
logger.info(f"Saving original connection for failsafe: {original_ssid} ({original_connection})")
|
|
except Exception as e:
|
|
logger.debug(f"Could not save original connection info: {e}")
|
|
|
|
try:
|
|
# Check if already connected to the target network
|
|
if original_ssid and original_ssid == ssid:
|
|
logger.info(f"Already connected to {ssid}, verifying connection...")
|
|
status = self.get_wifi_status()
|
|
if status.connected and status.ssid == ssid:
|
|
logger.info(f"Already connected to {ssid} with IP {status.ip_address}")
|
|
return True, f"Already connected to {ssid}"
|
|
else:
|
|
logger.warning(f"Status shows not connected to {ssid}, attempting reconnection...")
|
|
|
|
# First, disable AP mode if active
|
|
# This is critical - if AP mode is active, we must disable it before connecting
|
|
if self._is_ap_mode_active():
|
|
logger.info("AP mode is active, disabling before connecting to WiFi network...")
|
|
disable_success, disable_msg = self.disable_ap_mode()
|
|
if not disable_success:
|
|
error_msg = f"Failed to disable AP mode: {disable_msg}. Cannot connect to WiFi while AP mode is active."
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
|
|
# Wait for NetworkManager to restart and stabilize (if it was restarted)
|
|
# NetworkManager restart can take 3-5 seconds, so wait a bit longer
|
|
logger.info("Waiting for NetworkManager to stabilize after AP mode disable...")
|
|
time.sleep(5)
|
|
|
|
# Verify AP mode is actually disabled
|
|
max_verify_attempts = 5
|
|
for attempt in range(max_verify_attempts):
|
|
if not self._is_ap_mode_active():
|
|
logger.info("AP mode successfully disabled, proceeding with connection")
|
|
break
|
|
if attempt < max_verify_attempts - 1:
|
|
logger.debug(f"AP mode still active, waiting... (attempt {attempt + 1}/{max_verify_attempts})")
|
|
time.sleep(2)
|
|
else:
|
|
error_msg = "AP mode disable reported success but AP mode is still active. Cannot connect to WiFi."
|
|
logger.error(error_msg)
|
|
return False, error_msg
|
|
|
|
# If we're currently connected to a different network, disconnect first
|
|
# This ensures a clean switch between networks
|
|
if original_ssid and original_ssid != ssid:
|
|
logger.info(f"Switching networks: disconnecting from {original_ssid} before connecting to {ssid}")
|
|
self._show_led_message(f"Switching networks...", duration=3)
|
|
# Skip AP mode check since we're about to connect to a new network
|
|
disconnect_success, disconnect_msg = self.disconnect_from_network(skip_ap_check=True)
|
|
if disconnect_success:
|
|
logger.info(f"Disconnected from {original_ssid}: {disconnect_msg}")
|
|
# Wait for device to be ready for new connection
|
|
# Check device state before proceeding
|
|
max_wait = 5
|
|
wait_count = 0
|
|
while wait_count < max_wait:
|
|
time.sleep(1)
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "STATE", "device", "status", self._wifi_interface],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
state = result.stdout.strip().split(':')[-1] if ':' in result.stdout else result.stdout.strip()
|
|
# Device is ready if it's disconnected or unavailable (not connecting/connected)
|
|
if state in ["disconnected", "unavailable", "unmanaged"]:
|
|
logger.info(f"Device ready for new connection (state: {state})")
|
|
break
|
|
wait_count += 1
|
|
|
|
if wait_count >= max_wait:
|
|
logger.warning("Device may not be ready, but proceeding with connection attempt")
|
|
else:
|
|
logger.warning(f"Failed to disconnect from {original_ssid}: {disconnect_msg}")
|
|
# Continue anyway - NetworkManager might handle it, but wait a bit
|
|
time.sleep(2)
|
|
|
|
# Ensure WiFi radio is enabled before attempting connection (safety measure)
|
|
if not self._ensure_wifi_radio_enabled():
|
|
logger.warning("WiFi radio enable check failed, but continuing with connection attempt")
|
|
|
|
if self.has_nmcli:
|
|
success, message = self._connect_nmcli(ssid, password)
|
|
|
|
# If connection failed, try to restore original connection
|
|
if not success and original_connection and original_ssid:
|
|
logger.warning(f"Connection to {ssid} failed, attempting to restore original connection: {original_ssid}")
|
|
self._show_led_message(f"Restoring {original_ssid}...", duration=5)
|
|
|
|
restore_success = self._restore_original_connection(original_connection, original_ssid)
|
|
if restore_success:
|
|
logger.info(f"Successfully restored original connection: {original_ssid}")
|
|
self._show_led_message("Restored!", duration=3)
|
|
return False, f"Failed to connect to {ssid}, restored {original_ssid}"
|
|
else:
|
|
logger.error(f"Failed to restore original connection: {original_ssid}")
|
|
# Trigger AP mode as last resort
|
|
self._show_led_message("Enabling AP mode...", duration=5)
|
|
ap_success, ap_msg = self.enable_ap_mode()
|
|
if ap_success:
|
|
logger.info("AP mode enabled as failsafe")
|
|
return False, f"Connection failed and restoration failed. AP mode enabled."
|
|
else:
|
|
logger.error(f"Failed to enable AP mode: {ap_msg}")
|
|
return False, f"Connection failed, restoration failed, and AP mode failed: {ap_msg}"
|
|
|
|
# If connection failed and no original connection to restore, enable AP mode
|
|
elif not success:
|
|
logger.warning(f"Connection to {ssid} failed and no original connection to restore")
|
|
self._show_led_message("Enabling AP mode...", duration=5)
|
|
ap_success, ap_msg = self.enable_ap_mode()
|
|
if ap_success:
|
|
logger.info("AP mode enabled as failsafe")
|
|
return False, f"Connection failed. AP mode enabled."
|
|
else:
|
|
return False, f"Connection failed and AP mode failed: {ap_msg}"
|
|
|
|
return success, message
|
|
else:
|
|
return self._connect_wpa_supplicant(ssid, password)
|
|
except Exception as e:
|
|
logger.error(f"Error connecting to network: {e}")
|
|
# Try to restore original connection on exception
|
|
if original_connection and original_ssid:
|
|
try:
|
|
logger.warning(f"Exception during connection, attempting to restore: {original_ssid}")
|
|
self._restore_original_connection(original_connection, original_ssid)
|
|
except Exception as restore_error:
|
|
logger.error(f"Failed to restore after exception: {restore_error}")
|
|
# Last resort: enable AP mode
|
|
try:
|
|
self.enable_ap_mode()
|
|
except Exception:
|
|
pass
|
|
return False, str(e)
|
|
|
|
def _restore_original_connection(self, connection_name: str, ssid: str) -> bool:
|
|
"""
|
|
Restore a previously active WiFi connection.
|
|
|
|
Args:
|
|
connection_name: NetworkManager connection name or UUID
|
|
ssid: SSID for verification
|
|
|
|
Returns:
|
|
True if restoration successful, False otherwise
|
|
"""
|
|
try:
|
|
logger.info(f"Attempting to restore connection: {connection_name} ({ssid})")
|
|
|
|
# Try to activate the connection
|
|
result = subprocess.run(
|
|
["nmcli", "connection", "up", connection_name],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Wait for connection to stabilize
|
|
time.sleep(3)
|
|
|
|
# Verify connection
|
|
status = self.get_wifi_status()
|
|
if status.connected:
|
|
# Double-check SSID matches (if we can get it)
|
|
if status.ssid:
|
|
if status.ssid == ssid:
|
|
logger.info(f"Successfully restored connection to {ssid}")
|
|
return True
|
|
else:
|
|
logger.warning(f"Restored connection but SSID mismatch: expected {ssid}, got {status.ssid}")
|
|
# Still consider it success if we're connected
|
|
return True
|
|
else:
|
|
# Connected but can't verify SSID - assume success
|
|
logger.info("Restored connection (SSID verification unavailable)")
|
|
return True
|
|
else:
|
|
logger.warning("Connection activation succeeded but not connected")
|
|
return False
|
|
else:
|
|
error_msg = result.stderr.strip() or result.stdout.strip()
|
|
logger.error(f"Failed to restore connection {connection_name}: {error_msg}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error restoring connection: {e}")
|
|
return False
|
|
|
|
def _connect_nmcli(self, ssid: str, password: str) -> Tuple[bool, str]:
|
|
"""Connect using nmcli"""
|
|
try:
|
|
# Show LED message
|
|
self._show_led_message(f"Connecting to {ssid}...", duration=10)
|
|
|
|
# First, check if connection already exists and try to activate it
|
|
# NetworkManager connection names might not match SSID exactly, so search by SSID
|
|
check_result = subprocess.run(
|
|
["nmcli", "-t", "-f", "NAME,802-11-wireless.ssid", "connection", "show"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
existing_conn_name = None
|
|
if check_result.returncode == 0:
|
|
for line in check_result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.split(':')
|
|
if len(parts) >= 2:
|
|
conn_name = parts[0].strip()
|
|
conn_ssid = parts[1].strip() if len(parts) > 1 else ""
|
|
if conn_ssid == ssid:
|
|
existing_conn_name = conn_name
|
|
break
|
|
|
|
# Also try direct lookup by SSID (in case connection name matches SSID)
|
|
if not existing_conn_name:
|
|
direct_check = subprocess.run(
|
|
["nmcli", "connection", "show", ssid],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if direct_check.returncode == 0:
|
|
existing_conn_name = ssid
|
|
|
|
if existing_conn_name:
|
|
# Connection exists, try to activate it first (faster and more reliable)
|
|
logger.info(f"Found existing connection for {ssid}, activating...")
|
|
|
|
# Ensure device is ready before activating
|
|
# Wait for device to be in disconnected/unavailable state
|
|
max_wait = 3
|
|
for wait_attempt in range(max_wait):
|
|
device_result = subprocess.run(
|
|
["nmcli", "-t", "-f", "STATE", "device", "status", self._wifi_interface],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if device_result.returncode == 0:
|
|
state = device_result.stdout.strip().split(':')[-1] if ':' in device_result.stdout else device_result.stdout.strip()
|
|
if state in ["disconnected", "unavailable", "unmanaged"]:
|
|
break
|
|
if wait_attempt < max_wait - 1:
|
|
time.sleep(1)
|
|
|
|
result = subprocess.run(
|
|
["nmcli", "connection", "up", existing_conn_name],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Wait longer for connection to stabilize and verify multiple times
|
|
max_verification_attempts = 5
|
|
verification_delay = 2
|
|
connected = False
|
|
|
|
for attempt in range(max_verification_attempts):
|
|
time.sleep(verification_delay)
|
|
status = self.get_wifi_status()
|
|
if status.connected and status.ssid == ssid:
|
|
connected = True
|
|
break
|
|
|
|
if connected:
|
|
# Save network to config
|
|
self._save_network(ssid, password)
|
|
|
|
ip = status.ip_address or "Unknown"
|
|
self._show_led_message(f"Connected! {ip}", duration=5)
|
|
logger.info(f"Successfully connected to {ssid} with IP {ip}")
|
|
return True, f"Connected to {ssid}"
|
|
else:
|
|
logger.warning(f"Connection activation succeeded but verification failed for {ssid}")
|
|
self._show_led_message("Verification failed", duration=5)
|
|
return False, "Connection activated but verification failed"
|
|
|
|
# No existing connection or activation failed, create new connection
|
|
logger.info(f"Creating new connection for {ssid}...")
|
|
self._save_network(ssid, password)
|
|
|
|
# Connect using nmcli
|
|
if password:
|
|
cmd = ["nmcli", "device", "wifi", "connect", ssid, "password", password]
|
|
else:
|
|
cmd = ["nmcli", "device", "wifi", "connect", ssid]
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Wait longer for connection to stabilize and verify multiple times
|
|
max_verification_attempts = 5
|
|
verification_delay = 2
|
|
connected = False
|
|
|
|
for attempt in range(max_verification_attempts):
|
|
time.sleep(verification_delay)
|
|
status = self.get_wifi_status()
|
|
if status.connected:
|
|
# Verify we're connected to the correct SSID
|
|
if status.ssid == ssid:
|
|
connected = True
|
|
break
|
|
elif status.ssid:
|
|
# Connected to different network - this is a failure
|
|
logger.warning(f"Connected to wrong network: {status.ssid} instead of {ssid}")
|
|
break
|
|
|
|
if connected:
|
|
ip = status.ip_address or "Unknown"
|
|
self._show_led_message(f"Connected! {ip}", duration=5)
|
|
logger.info(f"Successfully connected to {ssid} with IP {ip}")
|
|
return True, f"Connected to {ssid}"
|
|
else:
|
|
self._show_led_message("Connection failed", duration=5)
|
|
return False, "Connection command succeeded but verification failed"
|
|
else:
|
|
error_msg = result.stderr.strip() or result.stdout.strip()
|
|
logger.error(f"Failed to connect to {ssid}: {error_msg}")
|
|
self._show_led_message("Connection failed", duration=5)
|
|
if self._is_wrong_password_error(error_msg):
|
|
return False, f"wrong_password: {error_msg}"
|
|
return False, error_msg
|
|
except Exception as e:
|
|
logger.error(f"Error connecting with nmcli: {e}")
|
|
self._show_led_message("Connection error", duration=5)
|
|
return False, str(e)
|
|
|
|
@staticmethod
|
|
def _is_wrong_password_error(error_msg: str) -> bool:
|
|
"""Return True when nmcli's error output indicates an authentication failure."""
|
|
indicators = [
|
|
"secrets were required",
|
|
"no secret agent",
|
|
"802-11-wireless-security.psk",
|
|
"authentication rejected",
|
|
"association rejected",
|
|
]
|
|
lower = error_msg.lower()
|
|
return any(ind in lower for ind in indicators)
|
|
|
|
def _connect_wpa_supplicant(self, ssid: str, password: str) -> Tuple[bool, str]:
|
|
"""Connect using wpa_supplicant (fallback)"""
|
|
try:
|
|
self._save_network(ssid, password)
|
|
|
|
# This would require modifying /etc/wpa_supplicant/wpa_supplicant.conf
|
|
# For now, return not implemented
|
|
return False, "wpa_supplicant connection not yet implemented. Please use NetworkManager (nmcli)."
|
|
except Exception as e:
|
|
logger.error(f"Error connecting with wpa_supplicant: {e}")
|
|
return False, str(e)
|
|
|
|
def disconnect_from_network(self, skip_ap_check: bool = False) -> Tuple[bool, str]:
|
|
"""
|
|
Disconnect from the current WiFi network
|
|
|
|
Args:
|
|
skip_ap_check: If True, skip auto-enabling AP mode after disconnect
|
|
(useful when switching networks)
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
try:
|
|
# Check if WiFi is connected
|
|
status = self.get_wifi_status()
|
|
if not status.connected:
|
|
return True, "Not connected to any WiFi network"
|
|
|
|
# Disconnect using nmcli
|
|
if self.has_nmcli:
|
|
# Try to disconnect the specific connection first (more reliable)
|
|
if status.ssid:
|
|
# Find the connection name for this SSID
|
|
conn_result = subprocess.run(
|
|
["nmcli", "-t", "-f", "NAME,802-11-wireless.ssid", "connection", "show"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if conn_result.returncode == 0:
|
|
for line in conn_result.stdout.strip().split('\n'):
|
|
if ':' in line:
|
|
parts = line.split(':')
|
|
if len(parts) >= 2:
|
|
conn_name = parts[0].strip()
|
|
conn_ssid = parts[1].strip() if len(parts) > 1 else ""
|
|
if conn_ssid == status.ssid:
|
|
# Disconnect this specific connection
|
|
subprocess.run(
|
|
["nmcli", "connection", "down", conn_name],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
logger.info(f"Disconnected connection {conn_name} for {status.ssid}")
|
|
break
|
|
|
|
# Also disconnect the device to ensure clean state
|
|
result = subprocess.run(
|
|
["nmcli", "device", "disconnect", self._wifi_interface],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info("Successfully disconnected from WiFi network")
|
|
# Wait longer for the disconnect to fully complete
|
|
time.sleep(2)
|
|
|
|
# Check if AP mode should be auto-enabled
|
|
# Skip if we're switching networks (skip_ap_check=True)
|
|
if not skip_ap_check:
|
|
auto_enable = self.config.get("auto_enable_ap_mode", True)
|
|
if auto_enable:
|
|
# Give it a moment, then check if we should enable AP mode
|
|
time.sleep(1)
|
|
self.check_and_manage_ap_mode()
|
|
else:
|
|
logger.debug("Skipping AP mode check (network switch in progress)")
|
|
|
|
return True, "Disconnected from WiFi network"
|
|
else:
|
|
error_msg = result.stderr.strip() or result.stdout.strip()
|
|
logger.error(f"Failed to disconnect from WiFi: {error_msg}")
|
|
return False, f"Failed to disconnect: {error_msg}"
|
|
else:
|
|
return False, "nmcli is required to disconnect from WiFi"
|
|
except Exception as e:
|
|
logger.error(f"Error disconnecting from WiFi: {e}")
|
|
return False, str(e)
|
|
|
|
def _save_network(self, ssid: str, password: str):
|
|
"""Save network credentials to config"""
|
|
# Remove existing entry for this SSID
|
|
self.config["saved_networks"] = [
|
|
n for n in self.config["saved_networks"]
|
|
if n.get("ssid") != ssid
|
|
]
|
|
|
|
# Add new entry
|
|
self.config["saved_networks"].append({
|
|
"ssid": ssid,
|
|
"password": password,
|
|
"saved_at": time.time()
|
|
})
|
|
|
|
self._save_config()
|
|
|
|
def _ensure_wifi_radio_enabled(self, max_retries: int = 3) -> bool:
|
|
"""
|
|
Ensure WiFi radio is enabled (not soft-blocked) with retry logic and verification.
|
|
|
|
Args:
|
|
max_retries: Maximum number of retry attempts to enable WiFi radio
|
|
|
|
Returns:
|
|
True if WiFi is enabled or was successfully enabled, False otherwise
|
|
"""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Check if WiFi radio is enabled
|
|
result = subprocess.run(
|
|
["nmcli", "radio", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
status = result.stdout.strip().lower()
|
|
if status == "enabled":
|
|
# Verify with rfkill as well
|
|
rfkill_result = subprocess.run(
|
|
["rfkill", "list", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if "Soft blocked: yes" not in rfkill_result.stdout:
|
|
logger.debug(f"WiFi radio confirmed enabled (attempt {attempt + 1})")
|
|
return True
|
|
# If soft-blocked, continue to unblock logic below
|
|
|
|
if status == "disabled" or attempt > 0:
|
|
# Try to enable WiFi radio
|
|
if attempt == 0:
|
|
logger.info("WiFi radio is disabled, attempting to enable...")
|
|
else:
|
|
logger.info(f"WiFi radio still disabled, retry {attempt + 1}/{max_retries}...")
|
|
|
|
enable_result = subprocess.run(
|
|
["sudo", "nmcli", "radio", "wifi", "on"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if enable_result.returncode == 0:
|
|
# Also unblock via rfkill in case it's soft-blocked
|
|
subprocess.run(
|
|
["sudo", "rfkill", "unblock", "wifi"],
|
|
capture_output=True,
|
|
timeout=5
|
|
)
|
|
# Wait longer for it to actually enable
|
|
time.sleep(2)
|
|
|
|
# Verify it's actually enabled now
|
|
verify_result = subprocess.run(
|
|
["nmcli", "radio", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if verify_result.returncode == 0 and verify_result.stdout.strip().lower() == "enabled":
|
|
logger.info("WiFi radio enabled and verified successfully")
|
|
return True
|
|
elif attempt < max_retries - 1:
|
|
logger.warning(f"WiFi radio enable command succeeded but not verified, will retry...")
|
|
time.sleep(1)
|
|
continue
|
|
else:
|
|
logger.warning(f"Failed to enable WiFi radio: {enable_result.stderr}")
|
|
if attempt < max_retries - 1:
|
|
time.sleep(1)
|
|
continue
|
|
return False
|
|
|
|
# Fallback: try rfkill
|
|
rfkill_result = subprocess.run(
|
|
["rfkill", "list", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if "Soft blocked: yes" in rfkill_result.stdout:
|
|
logger.info("WiFi is soft-blocked, unblocking via rfkill...")
|
|
subprocess.run(
|
|
["sudo", "rfkill", "unblock", "wifi"],
|
|
capture_output=True,
|
|
timeout=5
|
|
)
|
|
time.sleep(2)
|
|
# Verify unblock worked
|
|
verify_rfkill = subprocess.run(
|
|
["rfkill", "list", "wifi"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if "Soft blocked: yes" not in verify_rfkill.stdout:
|
|
logger.info("WiFi unblocked via rfkill and verified")
|
|
return True
|
|
elif attempt < max_retries - 1:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# If we get here and haven't returned, assume enabled if we can't determine
|
|
if attempt == 0:
|
|
logger.debug("Could not determine WiFi radio status, assuming enabled")
|
|
return True
|
|
else:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Could not check/enable WiFi radio (attempt {attempt + 1}): {e}")
|
|
if attempt < max_retries - 1:
|
|
time.sleep(1)
|
|
continue
|
|
# On last attempt, assume enabled to avoid blocking operations
|
|
return True
|
|
|
|
logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts")
|
|
return False
|
|
|
|
def enable_ap_mode(self) -> Tuple[bool, str]:
|
|
"""
|
|
Enable access point mode
|
|
|
|
Only enables AP mode if:
|
|
- WiFi is NOT connected AND
|
|
- Ethernet is NOT connected
|
|
|
|
Tries hostapd/dnsmasq first (captive portal), falls back to nmcli hotspot if that fails.
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
try:
|
|
# Check if already in AP mode
|
|
if self._is_ap_mode_active():
|
|
return True, "AP mode already active"
|
|
|
|
# Ensure WiFi radio is enabled
|
|
if not self._ensure_wifi_radio_enabled():
|
|
return False, "WiFi radio is disabled and could not be enabled"
|
|
|
|
# Check if WiFi is connected
|
|
status = self.get_wifi_status()
|
|
if status.connected:
|
|
return False, "Cannot enable AP mode while WiFi is connected"
|
|
|
|
# Check if Ethernet is connected
|
|
if self._is_ethernet_connected():
|
|
return False, "Cannot enable AP mode while Ethernet is connected"
|
|
|
|
# Try hostapd/dnsmasq first (captive portal mode)
|
|
if self.has_hostapd and self.has_dnsmasq:
|
|
result = self._enable_ap_mode_hostapd()
|
|
if result[0]:
|
|
self._ap_enabled_at = time.time()
|
|
return result
|
|
|
|
# Fallback to nmcli hotspot (simpler, no captive portal)
|
|
if self.has_nmcli:
|
|
logger.info("hostapd/dnsmasq failed or unavailable, trying nmcli hotspot fallback...")
|
|
self._show_led_message("Setup Mode", duration=5)
|
|
result = self._enable_ap_mode_nmcli_hotspot()
|
|
if result[0]:
|
|
self._ap_enabled_at = time.time()
|
|
return result
|
|
|
|
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
|
|
except Exception as e:
|
|
logger.error(f"Error in enable_ap_mode: {e}")
|
|
return False, str(e)
|
|
|
|
def _enable_ap_mode_hostapd(self) -> Tuple[bool, str]:
|
|
"""Enable AP mode using hostapd and dnsmasq (captive portal)"""
|
|
try:
|
|
|
|
# Create hostapd config
|
|
self._create_hostapd_config()
|
|
|
|
# Create dnsmasq config
|
|
self._create_dnsmasq_config()
|
|
|
|
# Set up WiFi interface for AP mode
|
|
try:
|
|
# Disconnect from any existing WiFi network
|
|
subprocess.run(
|
|
["sudo", "nmcli", "device", "disconnect", self._wifi_interface],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
# Set static IP for AP mode
|
|
subprocess.run(
|
|
["sudo", "ip", "addr", "flush", "dev", self._wifi_interface],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
subprocess.run(
|
|
["sudo", "ip", "addr", "add", "192.168.4.1/24", "dev", self._wifi_interface],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
subprocess.run(
|
|
["sudo", "ip", "link", "set", self._wifi_interface, "up"],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
logger.info(f"Configured {self._wifi_interface} with IP 192.168.4.1 for AP mode")
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError) as e:
|
|
logger.warning(f"Error setting up {self._wifi_interface} IP: {e}")
|
|
|
|
# Start services
|
|
try:
|
|
# Start hostapd first (it sets up the AP)
|
|
result = subprocess.run(
|
|
["sudo", "systemctl", "start", HOSTAPD_SERVICE],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15
|
|
)
|
|
if result.returncode != 0:
|
|
return False, f"Failed to start hostapd: {result.stderr}"
|
|
|
|
# Give hostapd time to initialize
|
|
time.sleep(1)
|
|
|
|
# Start dnsmasq
|
|
result = subprocess.run(
|
|
["sudo", "systemctl", "start", DNSMASQ_SERVICE],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if result.returncode != 0:
|
|
# Stop hostapd if dnsmasq failed
|
|
subprocess.run(["sudo", "systemctl", "stop", HOSTAPD_SERVICE], timeout=5)
|
|
return False, f"Failed to start dnsmasq: {result.stderr}"
|
|
|
|
# Set up iptables port forwarding (port 80 → 5000) and save ip_forward state
|
|
if not self._setup_iptables_redirect():
|
|
logger.error("Captive-portal redirect setup failed; stopping AP services")
|
|
subprocess.run(["sudo", "systemctl", "stop", HOSTAPD_SERVICE],
|
|
capture_output=True, timeout=10)
|
|
subprocess.run(["sudo", "systemctl", "stop", DNSMASQ_SERVICE],
|
|
capture_output=True, timeout=10)
|
|
return False, "AP started but captive-portal redirect setup failed"
|
|
|
|
logger.info("AP mode enabled successfully")
|
|
# Use the validated SSID so the displayed name matches what hostapd broadcast
|
|
ap_ssid, _ = self._validate_ap_config()
|
|
self._show_led_message(
|
|
f"WiFi Setup\n{ap_ssid}\nNo password\n192.168.4.1:5000", duration=10
|
|
)
|
|
return True, "AP mode enabled"
|
|
except Exception as e:
|
|
logger.error(f"Error starting AP services: {e}")
|
|
return False, str(e)
|
|
except Exception as e:
|
|
logger.error(f"Error enabling AP mode: {e}")
|
|
return False, str(e)
|
|
|
|
def _enable_ap_mode_nmcli_hotspot(self) -> Tuple[bool, str]:
|
|
"""
|
|
Enable AP mode using nmcli as an open (passwordless) access point.
|
|
|
|
Uses 'nmcli connection add type wifi 802-11-wireless.mode ap' instead of
|
|
'nmcli device wifi hotspot' because the hotspot subcommand always creates a
|
|
WPA2-protected network on Bookworm/Trixie and silently ignores attempts to
|
|
strip security after creation.
|
|
|
|
Tested for both Bookworm and Trixie (Netplan-based NetworkManager).
|
|
"""
|
|
try:
|
|
# Stop any existing connection
|
|
self.disconnect_from_network()
|
|
time.sleep(1)
|
|
|
|
ap_ssid, ap_channel = self._validate_ap_config()
|
|
|
|
# Delete only the specific application-managed AP profiles by name.
|
|
# Never delete by SSID — that would destroy a user's saved home network.
|
|
for conn_name in ["Hotspot", "LEDMatrix-Setup-AP", "TickerSetup-AP"]:
|
|
subprocess.run(["nmcli", "connection", "down", conn_name],
|
|
capture_output=True, timeout=5)
|
|
subprocess.run(["nmcli", "connection", "delete", conn_name],
|
|
capture_output=True, timeout=10)
|
|
|
|
time.sleep(1)
|
|
|
|
# Create an open AP connection profile from scratch.
|
|
# Using 'connection add' instead of 'device wifi hotspot' because the
|
|
# hotspot subcommand always attaches a WPA2 PSK on Bookworm/Trixie and
|
|
# ignores post-creation security modifications.
|
|
logger.info(f"Creating open AP with nmcli connection add: {ap_ssid} on "
|
|
f"{self._wifi_interface} (no password)")
|
|
cmd = [
|
|
"nmcli", "connection", "add",
|
|
"type", "wifi",
|
|
"con-name", "LEDMatrix-Setup-AP",
|
|
"ifname", self._wifi_interface,
|
|
"ssid", ap_ssid,
|
|
"802-11-wireless.mode", "ap",
|
|
"802-11-wireless.band", "bg", # 2.4 GHz for maximum compatibility
|
|
"802-11-wireless.channel", str(ap_channel),
|
|
"ipv4.method", "shared",
|
|
"ipv4.addresses", "192.168.4.1/24",
|
|
# No 802-11-wireless-security section → open network
|
|
]
|
|
|
|
# PMF (Protected Management Frames) is only meaningful for WPA2/WPA3.
|
|
# An open AP has no security section, so adding 802-11-wireless-security.pmf
|
|
# would cause NM to require key-mgmt too, breaking the connection add on
|
|
# Trixie NM 1.52+. Leave PMF untouched — open APs have no frame protection.
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode != 0:
|
|
error_msg = result.stderr.strip() or result.stdout.strip()
|
|
logger.error(f"Failed to create AP connection profile: {error_msg}")
|
|
self._show_led_message("AP mode failed", duration=5)
|
|
return False, f"Failed to create AP profile: {error_msg}"
|
|
|
|
# Write the NM dnsmasq-shared.d captive-portal config BEFORE bringing up
|
|
# the connection so NM's dnsmasq picks it up at start time.
|
|
# This causes every hostname DNS query from a connected device to resolve
|
|
# to 192.168.4.1, automatically triggering the OS captive-portal popup.
|
|
self._write_nm_dnsmasq_captive_conf()
|
|
|
|
logger.info("AP connection profile created, bringing it up...")
|
|
up_result = subprocess.run(
|
|
["nmcli", "connection", "up", "LEDMatrix-Setup-AP"],
|
|
capture_output=True, text=True, timeout=20
|
|
)
|
|
if up_result.returncode != 0:
|
|
error_msg = up_result.stderr.strip() or up_result.stdout.strip()
|
|
logger.error(f"Failed to bring up AP connection: {error_msg}")
|
|
self._remove_nm_dnsmasq_captive_conf()
|
|
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
|
|
capture_output=True, timeout=10)
|
|
self._show_led_message("AP mode failed", duration=5)
|
|
return False, f"Failed to start AP: {error_msg}"
|
|
|
|
time.sleep(2)
|
|
|
|
# NM's ipv4.method=shared manages ip_forward automatically, so we only
|
|
# need to add the iptables port-redirect rules for the captive portal.
|
|
if not self._setup_iptables_redirect():
|
|
logger.error("Captive-portal redirect setup failed; rolling back AP profile")
|
|
self._remove_nm_dnsmasq_captive_conf()
|
|
subprocess.run(["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
|
|
capture_output=True, timeout=10)
|
|
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
|
|
capture_output=True, timeout=10)
|
|
self._clear_led_message()
|
|
return False, "AP started but captive-portal redirect setup failed"
|
|
|
|
# Verify the AP is actually running
|
|
status = self._get_ap_status_nmcli()
|
|
if status.get('active'):
|
|
ip = status.get('ip', '192.168.4.1')
|
|
logger.info(f"AP mode confirmed active at {ip} (open network, no password)")
|
|
self._show_led_message(f"WiFi Setup\n{ap_ssid}\nNo password\n{ip}:5000", duration=10)
|
|
return True, f"AP mode enabled (open network) - Access at {ip}:5000"
|
|
else:
|
|
logger.error("AP mode started but not verified by status check — rolling back")
|
|
self._teardown_iptables_redirect()
|
|
self._remove_nm_dnsmasq_captive_conf()
|
|
subprocess.run(["nmcli", "connection", "down", "LEDMatrix-Setup-AP"],
|
|
capture_output=True, timeout=10)
|
|
subprocess.run(["nmcli", "connection", "delete", "LEDMatrix-Setup-AP"],
|
|
capture_output=True, timeout=10)
|
|
self._clear_led_message()
|
|
return False, "AP mode started but verification failed"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting AP mode with nmcli: {e}")
|
|
self._remove_nm_dnsmasq_captive_conf()
|
|
self._show_led_message("Setup mode error", duration=5)
|
|
return False, str(e)
|
|
|
|
def _get_ap_status_nmcli(self) -> Dict:
|
|
"""
|
|
Get AP status using nmcli (for hotspot mode).
|
|
|
|
Returns:
|
|
Dict with AP status info including active state, SSID, IP, and interface
|
|
"""
|
|
try:
|
|
# Check if hotspot connection is active
|
|
result = subprocess.run(
|
|
["nmcli", "-t", "-f", "NAME,TYPE,DEVICE", "connection", "show", "--active"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
for line in result.stdout.strip().split('\n'):
|
|
parts = line.split(':')
|
|
if len(parts) < 2:
|
|
continue
|
|
conn_name = parts[0].strip()
|
|
conn_type = parts[1].strip().lower()
|
|
# Match our known AP profile name OR the legacy nmcli hotspot type
|
|
if conn_name == "LEDMatrix-Setup-AP" or 'hotspot' in conn_type:
|
|
# Get actual IP address (may be 192.168.4.1 or 10.42.0.1 depending on config)
|
|
ip = '192.168.4.1'
|
|
interface = parts[2] if len(parts) > 2 else self._wifi_interface
|
|
try:
|
|
ip_result = subprocess.run(
|
|
["nmcli", "-t", "-f", "IP4.ADDRESS", "device", "show", interface],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if ip_result.returncode == 0:
|
|
for ip_line in ip_result.stdout.strip().split('\n'):
|
|
if '/' in ip_line:
|
|
ip = ip_line.split('/')[0].split(':')[-1].strip()
|
|
break
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
|
|
pass
|
|
|
|
return {
|
|
'active': True,
|
|
'ssid': self.config.get("ap_ssid", DEFAULT_AP_SSID),
|
|
'ip': ip,
|
|
'interface': interface
|
|
}
|
|
|
|
return {'active': False}
|
|
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError) as e:
|
|
logger.error(f"Error getting AP status with nmcli: {e}")
|
|
return {'active': False}
|
|
|
|
def disable_ap_mode(self) -> Tuple[bool, str]:
|
|
"""
|
|
Disable access point mode
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
try:
|
|
if not self._is_ap_mode_active():
|
|
return True, "AP mode not active"
|
|
|
|
# Check which AP mode is active and disable accordingly
|
|
# First check if hostapd is running (captive portal mode)
|
|
hostapd_active = False
|
|
try:
|
|
result = subprocess.run(
|
|
["systemctl", "is-active", HOSTAPD_SERVICE],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2
|
|
)
|
|
hostapd_active = result.stdout.strip() == "active"
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError):
|
|
pass
|
|
|
|
# Stop services
|
|
try:
|
|
if hostapd_active:
|
|
# Disable hostapd/dnsmasq mode (captive portal)
|
|
subprocess.run(
|
|
["sudo", "systemctl", "stop", HOSTAPD_SERVICE],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
subprocess.run(
|
|
["sudo", "systemctl", "stop", DNSMASQ_SERVICE],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
else:
|
|
# Disable nmcli hotspot mode (fallback)
|
|
for conn_name in ["LEDMatrix-Setup-AP", "Hotspot", "TickerSetup-AP"]:
|
|
subprocess.run(
|
|
["nmcli", "connection", "down", conn_name],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
subprocess.run(
|
|
["nmcli", "connection", "delete", conn_name],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
# Remove the drop-in captive portal config (only for hostapd mode)
|
|
if hostapd_active and DNSMASQ_CONFIG_PATH.exists():
|
|
try:
|
|
subprocess.run(
|
|
["sudo", "rm", "-f", str(DNSMASQ_CONFIG_PATH)],
|
|
capture_output=True, timeout=5
|
|
)
|
|
logger.info(f"Removed captive portal dnsmasq config: {DNSMASQ_CONFIG_PATH}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not remove dnsmasq drop-in config: {e}")
|
|
|
|
# Remove iptables redirect rules and restore ip_forward state (hostapd mode only)
|
|
if hostapd_active:
|
|
self._teardown_iptables_redirect()
|
|
|
|
# Clean up WiFi interface IP configuration
|
|
subprocess.run(
|
|
["sudo", "ip", "addr", "del", "192.168.4.1/24", "dev", self._wifi_interface],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
# Only restart NetworkManager if hostapd was active (needed for hostapd/dnsmasq cleanup)
|
|
# Before restarting, ensure we have connectivity safety (Ethernet or WiFi enabled)
|
|
connectivity_safe = self._has_connectivity_safety()
|
|
|
|
if not connectivity_safe:
|
|
# Ensure WiFi radio is enabled before restart to maintain connectivity option
|
|
logger.warning("No connectivity safety detected (no Ethernet, WiFi may be disabled), ensuring WiFi radio enabled before restart")
|
|
self._ensure_wifi_radio_enabled()
|
|
|
|
logger.info("Restarting NetworkManager to restore normal WiFi operation after hostapd cleanup")
|
|
subprocess.run(
|
|
["sudo", "systemctl", "restart", "NetworkManager"],
|
|
capture_output=True,
|
|
timeout=15
|
|
)
|
|
# Give NetworkManager time to restart
|
|
time.sleep(2)
|
|
|
|
# Explicitly ensure WiFi radio is enabled after restart (with retries for safety)
|
|
wifi_enabled = self._ensure_wifi_radio_enabled(max_retries=5)
|
|
if not wifi_enabled:
|
|
logger.warning("WiFi radio may be disabled after NetworkManager restart - this could cause lockout if Ethernet not connected")
|
|
# Try one more time with rfkill as last resort
|
|
try:
|
|
subprocess.run(
|
|
["sudo", "rfkill", "unblock", "wifi"],
|
|
capture_output=True,
|
|
timeout=5
|
|
)
|
|
time.sleep(1)
|
|
logger.info("Attempted final WiFi radio unblock via rfkill")
|
|
except Exception as e:
|
|
logger.error(f"Final WiFi radio unblock attempt failed: {e}")
|
|
else:
|
|
# nmcli AP mode — NM's ipv4.method=shared manages ip_forward automatically,
|
|
# so we only need to remove the iptables redirect rules we added.
|
|
logger.info("Skipping NetworkManager restart (nmcli AP mode, restart not needed)")
|
|
self._teardown_iptables_redirect()
|
|
self._remove_nm_dnsmasq_captive_conf()
|
|
# Ensure WiFi radio is enabled after nmcli operations
|
|
wifi_enabled = self._ensure_wifi_radio_enabled(max_retries=3)
|
|
if not wifi_enabled:
|
|
logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
|
|
|
|
self._ap_enabled_at = None
|
|
logger.info("AP mode disabled successfully")
|
|
return True, "AP mode disabled"
|
|
except Exception as e:
|
|
logger.error(f"Error stopping AP services: {e}")
|
|
return False, str(e)
|
|
except Exception as e:
|
|
logger.error(f"Error disabling AP mode: {e}")
|
|
return False, str(e)
|
|
|
|
def _create_hostapd_config(self):
|
|
"""Create hostapd configuration file"""
|
|
try:
|
|
config_dir = HOSTAPD_CONFIG_PATH.parent
|
|
config_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Use validated values — strips invalid chars and ensures channel is an int.
|
|
# Also strip newlines from SSID to prevent config-file injection.
|
|
ap_ssid, ap_channel = self._validate_ap_config()
|
|
ap_ssid = ap_ssid.replace('\n', '').replace('\r', '')
|
|
|
|
# Open network configuration (no password) for easy setup access
|
|
config_content = f"""interface={self._wifi_interface}
|
|
driver=nl80211
|
|
ssid={ap_ssid}
|
|
hw_mode=g
|
|
channel={ap_channel}
|
|
wmm_enabled=0
|
|
macaddr_acl=0
|
|
auth_algs=1
|
|
ignore_broadcast_ssid=0
|
|
# Open network - no WPA/WPA2 encryption
|
|
"""
|
|
|
|
# Write config (requires sudo)
|
|
with open("/tmp/hostapd.conf", 'w') as f:
|
|
f.write(config_content)
|
|
|
|
# Copy to final location with sudo
|
|
subprocess.run(
|
|
["sudo", "cp", "/tmp/hostapd.conf", str(HOSTAPD_CONFIG_PATH)],
|
|
timeout=10
|
|
)
|
|
|
|
logger.info(f"Created hostapd config at {HOSTAPD_CONFIG_PATH} for {self._wifi_interface}")
|
|
except (OSError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
|
|
logger.error(f"Error creating hostapd config: {e}")
|
|
raise
|
|
|
|
def _check_dnsmasq_conflict(self) -> Tuple[bool, str]:
|
|
"""
|
|
Check if dnsmasq is already in use for other purposes (e.g., Pi-hole).
|
|
|
|
Returns:
|
|
Tuple of (conflict_detected, description)
|
|
"""
|
|
try:
|
|
# Check if dnsmasq service is active
|
|
result = subprocess.run(
|
|
["systemctl", "is-active", "dnsmasq"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.stdout.strip() == "active":
|
|
# Check if it's configured for something other than our AP
|
|
if DNSMASQ_CONFIG_PATH.exists():
|
|
try:
|
|
content = DNSMASQ_CONFIG_PATH.read_text()
|
|
# Check for Pi-hole or other common dnsmasq uses
|
|
if 'pihole' in content.lower() or 'pi-hole' in content.lower():
|
|
return True, "Pi-hole detected - dnsmasq is in use"
|
|
if 'server=' in content and self._wifi_interface not in content:
|
|
return True, "dnsmasq appears to be configured for DNS forwarding"
|
|
except (OSError, PermissionError):
|
|
pass
|
|
|
|
return False, ""
|
|
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
|
|
return False, ""
|
|
|
|
def _create_dnsmasq_config(self):
|
|
"""
|
|
Create dnsmasq drop-in configuration for captive portal DNS redirection.
|
|
|
|
Writes to /etc/dnsmasq.d/ledmatrix-captive.conf so we don't overwrite
|
|
the main /etc/dnsmasq.conf (preserves Pi-hole, etc.).
|
|
"""
|
|
try:
|
|
# Using a drop-in file in /etc/dnsmasq.d/ to avoid overwriting the
|
|
# main /etc/dnsmasq.conf (which may belong to Pi-hole or other services).
|
|
config_content = f"""interface={self._wifi_interface}
|
|
dhcp-range=192.168.4.2,192.168.4.20,255.255.255.0,24h
|
|
|
|
# Captive portal: Redirect all DNS queries to Pi
|
|
address=/#/192.168.4.1
|
|
|
|
# Captive portal detection endpoints
|
|
address=/captive.apple.com/192.168.4.1
|
|
address=/connectivitycheck.gstatic.com/192.168.4.1
|
|
address=/www.msftconnecttest.com/192.168.4.1
|
|
address=/detectportal.firefox.com/192.168.4.1
|
|
"""
|
|
|
|
# Write config (requires sudo)
|
|
with open("/tmp/dnsmasq.conf", 'w') as f:
|
|
f.write(config_content)
|
|
|
|
# Copy to final location with sudo
|
|
subprocess.run(
|
|
["sudo", "cp", "/tmp/dnsmasq.conf", str(DNSMASQ_CONFIG_PATH)],
|
|
timeout=10
|
|
)
|
|
|
|
logger.info(f"Created dnsmasq config at {DNSMASQ_CONFIG_PATH} for {self._wifi_interface}")
|
|
except (OSError, subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
|
|
logger.error(f"Error creating dnsmasq config: {e}")
|
|
raise
|
|
|
|
def check_and_manage_ap_mode(self) -> bool:
|
|
"""
|
|
Check WiFi and Ethernet connection status and enable/disable AP mode accordingly.
|
|
Only auto-enables AP mode if:
|
|
- auto_enable_ap_mode is enabled in config AND
|
|
- WiFi is NOT connected AND
|
|
- Ethernet is NOT connected AND
|
|
- Multiple consecutive disconnected checks (grace period to avoid false positives)
|
|
|
|
Always auto-disables AP mode when WiFi or Ethernet connects.
|
|
|
|
This should be called periodically by a background service.
|
|
|
|
Returns:
|
|
True if AP mode state changed, False otherwise
|
|
"""
|
|
try:
|
|
# Get status with retry for more reliable detection
|
|
status = self._get_wifi_status_with_retry()
|
|
ethernet_connected = self._is_ethernet_connected()
|
|
ap_active = self._is_ap_mode_active()
|
|
auto_enable = self.config.get("auto_enable_ap_mode", True) # Default: True (safe due to grace period)
|
|
|
|
# Log current state for debugging
|
|
logger.debug(f"WiFi status: connected={status.connected}, SSID={status.ssid}, "
|
|
f"Ethernet={ethernet_connected}, AP_active={ap_active}, "
|
|
f"auto_enable={auto_enable}, disconnected_checks={self._disconnected_checks}")
|
|
|
|
# Determine if we should have AP mode active.
|
|
# AP-enable uses only the nmcli association state (fast, no network calls).
|
|
# This keeps the same reliable behaviour as before: momentary packet loss
|
|
# while on working WiFi does NOT trigger AP mode. The internet-reachability
|
|
# check is performed separately in the daemon watchdog for NM recovery.
|
|
is_disconnected = not status.connected and not ethernet_connected
|
|
|
|
if is_disconnected:
|
|
# Increment disconnected check counter
|
|
self._disconnected_checks += 1
|
|
logger.debug(f"Network disconnected (check {self._disconnected_checks}/{self._disconnected_checks_required})")
|
|
else:
|
|
# Reset counter if we're associated
|
|
if self._disconnected_checks > 0:
|
|
logger.debug("Network connected, resetting disconnected check counter")
|
|
self._disconnected_checks = 0
|
|
|
|
# Only enable AP if we've had enough consecutive disconnected checks
|
|
should_have_ap = (auto_enable and
|
|
is_disconnected and
|
|
self._disconnected_checks >= self._disconnected_checks_required)
|
|
|
|
if should_have_ap and not ap_active:
|
|
# Pre-cache a WiFi scan so the captive portal can show networks
|
|
try:
|
|
logger.info("Running pre-AP WiFi scan for captive portal cache...")
|
|
networks, _cached = self.scan_networks(allow_cached=False)
|
|
if networks:
|
|
self._save_cached_scan(networks)
|
|
logger.info(f"Cached {len(networks)} networks for captive portal")
|
|
except Exception as scan_err:
|
|
logger.debug(f"Pre-AP scan failed (non-critical): {scan_err}")
|
|
|
|
logger.info(f"Enabling AP mode after {self._disconnected_checks} consecutive disconnected checks")
|
|
success, message = self.enable_ap_mode()
|
|
if success:
|
|
logger.info("Auto-enabled AP mode (no WiFi or Ethernet connection after grace period)")
|
|
self._disconnected_checks = 0 # Reset counter after enabling
|
|
return True
|
|
else:
|
|
logger.warning(f"Failed to enable AP mode: {message}")
|
|
elif not should_have_ap and ap_active:
|
|
# Should not have AP but do - disable AP mode
|
|
# Always disable if WiFi or Ethernet connects, regardless of auto_enable setting
|
|
if status.connected or ethernet_connected:
|
|
success, message = self.disable_ap_mode()
|
|
if success:
|
|
if status.connected:
|
|
logger.info("Auto-disabled AP mode (WiFi connected)")
|
|
elif ethernet_connected:
|
|
logger.info("Auto-disabled AP mode (Ethernet connected)")
|
|
self._disconnected_checks = 0 # Reset counter
|
|
return True
|
|
else:
|
|
logger.warning(f"Failed to auto-disable AP mode: {message}")
|
|
elif not auto_enable:
|
|
# AP is active but auto_enable is disabled - this means it was manually enabled
|
|
# Don't disable it automatically, let it stay active
|
|
logger.debug("AP mode is active (manually enabled), keeping active")
|
|
|
|
# Idle-timeout check: disable AP if no client has connected within the window.
|
|
# Only applies when AP is active and we haven't just decided to enable/disable it.
|
|
if ap_active and self._ap_enabled_at is not None:
|
|
try:
|
|
idle_timeout_min = max(1, min(1440, int(self.config.get("ap_idle_timeout_minutes", 15))))
|
|
except (TypeError, ValueError):
|
|
idle_timeout_min = 15
|
|
elapsed = time.time() - self._ap_enabled_at
|
|
if elapsed > idle_timeout_min * 60 and not self._has_ap_clients():
|
|
logger.info(
|
|
f"AP idle timeout ({idle_timeout_min} min, no clients) — disabling AP"
|
|
)
|
|
success, message = self.disable_ap_mode()
|
|
if success:
|
|
return True
|
|
else:
|
|
logger.warning(f"Failed to disable AP on idle timeout: {message}")
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error checking AP mode: {e}", exc_info=True)
|
|
return False
|
|
|
|
def _get_wifi_status_with_retry(self, max_retries=2) -> WiFiStatus:
|
|
"""
|
|
Get WiFi status with retry logic to avoid false negatives.
|
|
|
|
Args:
|
|
max_retries: Number of retry attempts if first check fails
|
|
|
|
Returns:
|
|
WiFiStatus object
|
|
"""
|
|
for attempt in range(max_retries + 1):
|
|
status = self.get_wifi_status()
|
|
# If we get a connected status, trust it immediately
|
|
if status.connected:
|
|
return status
|
|
|
|
# If disconnected, wait a bit and retry (in case of transient issues)
|
|
if attempt < max_retries:
|
|
time.sleep(1)
|
|
logger.debug(f"WiFi status check attempt {attempt + 1}/{max_retries + 1}: disconnected, retrying...")
|
|
|
|
# Return the last status (disconnected)
|
|
return status
|
|
|