4 Commits

Author SHA1 Message Date
Chuck
3b763b613a fix(wifi): address four review findings in wifi_manager.py
IP parsing (line 476): use partition(':') so bare "ip/mask" lines
(no field-label prefix) are handled without IndexError; falls back to
the full string when no ':' is present before splitting on '/'.

AP-mode override comment (line 503): add one-line explanation above
the wifi_connected/ssid/ip_address clear so maintainers know why the
fields are reset while wlan0 reports as "connected".

Stale force-flag cleanup (__init__): remove a left-over
_FORCE_AP_FLAG_PATH from a prior crash on first instantiation per
process (guarded by class-level _startup_cleanup_done so the nmcli
AP-state check only runs once, not on every per-request instantiation).

Force-flag logging (enable_ap_mode): log at debug when force=True is
applied, log success at debug and failure with OSError details at
warning for both the hostapd and nmcli hotspot paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 15:08:15 -04:00
Chuck
f279980b44 fix(wifi): suppress false-positive Bandit B603/B607 on new nmcli calls
Both subprocess.run calls in the SSID connection lookup use fixed
arguments (no user input) or values derived from nmcli's own output —
not from user-controlled data. Add nosec B603 B607 annotations to
silence the Codacy/Bandit warnings, consistent with existing nosec
usage in the file.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 14:58:36 -04:00
Chuck
6313b9c25f fix(wifi): strict bool parsing for force; nosec annotation parity
- api_v3.py: replace bool(...) coercion for force with strict check —
  only actual boolean True or strings "true"/"1" (case-insensitive)
  pass; "false", integers, and other strings are treated as False so
  the Ethernet/WiFi guards and _FORCE_AP_FLAG_PATH cannot be bypassed
  by accident
- wifi_manager.py: add nosec B108 annotation to _IP_FORWARD_SAVE_PATH
  to match the identical annotation already on _FORCE_AP_FLAG_PATH

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 14:31:00 -04:00
Chuck
d81156d53e fix(wifi): fix AP mode, captive portal, and WiFi connect flow
- Fix scan API returning 500: scan_networks() returns a tuple but the
  endpoint was iterating it directly; unpack with _was_cached
- Fix IP address display showing 'IP4.ADDRESS[1]:x.x.x.x': nmcli -t
  output includes the field label; split on ':' before '/'
- Add force parameter to enable_ap_mode() to bypass WiFi/Ethernet
  guards; expose via force JSON body field in the AP enable endpoint
- Fix daemon auto-disabling forced AP: add _FORCE_AP_FLAG_PATH flag
  file written on force-enable and checked in check_and_manage_ap_mode
  before auto-disabling; disable_ap_mode() clears it
- Fix wifi_connected false positive in AP mode: _get_status_nmcli()
  was reporting wlan0 as 'connected' when it was running as AP;
  override wifi_connected=False when _is_ap_mode_active() is True
- Fix AP verification failure on async NM activation: retry
  _get_ap_status_nmcli() up to 5 times with 2s delay instead of
  single immediate check
- Fix WiFi connect ignoring existing NM connections: nmcli does not
  support 802-11-wireless.ssid as a column in 'connection show';
  replace with NAME,TYPE list then per-connection SSID query via -g
  (fixes 'netplan generate failed' error on Trixie / netplan systems)
- Fix failsafe AP re-enable blocked by Ethernet: all recovery-path
  enable_ap_mode() calls in connect_to_network() now pass force=True

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 14:22:26 -04:00
5 changed files with 134 additions and 163 deletions

View File

@@ -151,6 +151,18 @@ class WiFiManager:
f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, " f"hostapd: {self.has_hostapd}, dnsmasq: {self.has_dnsmasq}, "
f"interface: {self._wifi_interface}, trixie: {self._is_trixie}") f"interface: {self._wifi_interface}, trixie: {self._is_trixie}")
# Once per process: remove a stale force-AP flag left by a prior crash.
# Guard with a class-level flag so the nmcli AP-state check only runs
# once even though WiFiManager is instantiated per-request.
if not WiFiManager._startup_cleanup_done:
WiFiManager._startup_cleanup_done = True
if self._FORCE_AP_FLAG_PATH.exists() and not self._is_ap_mode_active():
try:
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
logger.debug("Removed stale force-AP flag on startup (AP not active)")
except OSError as exc:
logger.warning(f"Could not remove stale force-AP flag: {exc}")
def _show_led_message(self, message: str, duration: int = 5): def _show_led_message(self, message: str, duration: int = 5):
""" """
Show a WiFi status message on the LED display. Show a WiFi status message on the LED display.
@@ -474,7 +486,10 @@ class WiFiManager:
if result.returncode == 0: if result.returncode == 0:
for line in result.stdout.strip().split('\n'): for line in result.stdout.strip().split('\n'):
if '/' in line: if '/' in line:
ip_address = line.split('/')[0].strip() # nmcli -t output is "IP4.ADDRESS[1]:x.x.x.x/prefix";
# bare "x.x.x.x/prefix" is also accepted defensively.
_, sep, rest = line.partition(':')
ip_address = (rest if sep else line).split('/')[0].strip()
break break
# Final fallback: Get signal strength by matching SSID in WiFi list # Final fallback: Get signal strength by matching SSID in WiFi list
@@ -500,6 +515,13 @@ class WiFiManager:
# Check if AP mode is active # Check if AP mode is active
ap_active = self._is_ap_mode_active() ap_active = self._is_ap_mode_active()
# wlan0 shows as "connected" in AP mode; clear client-station fields so
# callers don't mistake the AP for an outbound WiFi connection.
if ap_active and wifi_connected:
wifi_connected = False
ssid = None
ip_address = None
logger.debug(f"{wlan_device} is in AP mode — overriding wifi_connected to False")
return WiFiStatus( return WiFiStatus(
connected=wifi_connected, connected=wifi_connected,
@@ -690,6 +712,10 @@ class WiFiManager:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi _IP_FORWARD_SAVE_PATH = Path("/tmp/ledmatrix_ip_forward_saved") # nosec B108 - process-specific named file; device is single-user RPi
# Written when AP mode is manually force-enabled; prevents daemon auto-disable
_FORCE_AP_FLAG_PATH = Path("/tmp/ledmatrix_force_ap_active") # nosec B108 - process-specific named file; device is single-user RPi
# Ensures the startup stale-flag cleanup runs once per process, not per instantiation
_startup_cleanup_done: bool = False
def _validate_ap_config(self) -> Tuple[str, int]: def _validate_ap_config(self) -> Tuple[str, int]:
"""Return a sanitized (ssid, channel) pair from config, falling back to defaults.""" """Return a sanitized (ssid, channel) pair from config, falling back to defaults."""
@@ -1367,7 +1393,7 @@ class WiFiManager:
logger.error(f"Failed to restore original connection: {original_ssid}") logger.error(f"Failed to restore original connection: {original_ssid}")
# Trigger AP mode as last resort # Trigger AP mode as last resort
self._show_led_message("Enabling AP mode...", duration=5) self._show_led_message("Enabling AP mode...", duration=5)
ap_success, ap_msg = self.enable_ap_mode() ap_success, ap_msg = self.enable_ap_mode(force=True)
if ap_success: if ap_success:
logger.info("AP mode enabled as failsafe") logger.info("AP mode enabled as failsafe")
return False, "Connection failed and restoration failed. AP mode enabled." return False, "Connection failed and restoration failed. AP mode enabled."
@@ -1379,7 +1405,7 @@ class WiFiManager:
elif not success: elif not success:
logger.warning(f"Connection to {ssid} failed and no original connection to restore") logger.warning(f"Connection to {ssid} failed and no original connection to restore")
self._show_led_message("Enabling AP mode...", duration=5) self._show_led_message("Enabling AP mode...", duration=5)
ap_success, ap_msg = self.enable_ap_mode() ap_success, ap_msg = self.enable_ap_mode(force=True)
if ap_success: if ap_success:
logger.info("AP mode enabled as failsafe") logger.info("AP mode enabled as failsafe")
return False, "Connection failed. AP mode enabled." return False, "Connection failed. AP mode enabled."
@@ -1400,7 +1426,7 @@ class WiFiManager:
logger.error(f"Failed to restore after exception: {restore_error}") logger.error(f"Failed to restore after exception: {restore_error}")
# Last resort: enable AP mode # Last resort: enable AP mode
try: try:
self.enable_ap_mode() self.enable_ap_mode(force=True)
except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging except Exception as ap_error: # nosec B110 - last-resort; do not re-raise, but log for debugging
logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True) logger.error("Last-resort AP mode enable failed in recovery path: %s", ap_error, exc_info=True)
return False, str(e) return False, str(e)
@@ -1464,26 +1490,29 @@ class WiFiManager:
# Show LED message # Show LED message
self._show_led_message(f"Connecting to {ssid}...", duration=10) self._show_led_message(f"Connecting to {ssid}...", duration=10)
# First, check if connection already exists and try to activate it # Find existing NM connection for this SSID.
# NetworkManager connection names might not match SSID exactly, so search by SSID # 802-11-wireless.ssid is not a valid column in 'nmcli connection show',
check_result = subprocess.run( # so list all wifi connections then query each one's SSID individually.
["nmcli", "-t", "-f", "NAME,802-11-wireless.ssid", "connection", "show"], list_result = subprocess.run( # nosec B603 B607 - fixed args, no user input
capture_output=True, ["nmcli", "-t", "-f", "NAME,TYPE", "connection", "show"],
text=True, capture_output=True, text=True, timeout=5
timeout=5
) )
existing_conn_name = None existing_conn_name = None
if check_result.returncode == 0: if list_result.returncode == 0:
for line in check_result.stdout.strip().split('\n'): for line in list_result.stdout.strip().split('\n'):
if ':' in line: if ':' not in line:
parts = line.split(':') continue
if len(parts) >= 2: parts = line.split(':')
conn_name = parts[0].strip() if len(parts) < 2 or parts[1].strip() != '802-11-wireless':
conn_ssid = parts[1].strip() if len(parts) > 1 else "" continue
if conn_ssid == ssid: conn_name = parts[0].strip()
existing_conn_name = conn_name ssid_r = subprocess.run( # nosec B603 B607 - conn_name from nmcli output, not user input
break ["nmcli", "-g", "802-11-wireless.ssid", "connection", "show", conn_name],
capture_output=True, text=True, timeout=5
)
if ssid_r.returncode == 0 and ssid_r.stdout.strip() == ssid:
existing_conn_name = conn_name
break
# Also try direct lookup by SSID (in case connection name matches SSID) # Also try direct lookup by SSID (in case connection name matches SSID)
if not existing_conn_name: if not existing_conn_name:
@@ -1855,7 +1884,7 @@ class WiFiManager:
logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts") logger.warning(f"Failed to enable WiFi radio after {max_retries} attempts")
return False return False
def enable_ap_mode(self) -> Tuple[bool, str]: def enable_ap_mode(self, force: bool = False) -> Tuple[bool, str]:
""" """
Enable access point mode Enable access point mode
@@ -1877,20 +1906,29 @@ class WiFiManager:
if not self._ensure_wifi_radio_enabled(): if not self._ensure_wifi_radio_enabled():
return False, "WiFi radio is disabled and could not be enabled" return False, "WiFi radio is disabled and could not be enabled"
# Check if WiFi is connected # Check if WiFi is connected (skip when force=True)
status = self.get_wifi_status() status = self.get_wifi_status()
if status.connected: if not force and status.connected:
return False, "Cannot enable AP mode while WiFi is connected" return False, "Cannot enable AP mode while WiFi is connected"
# Check if Ethernet is connected # Check if Ethernet is connected (skip when force=True)
if self._is_ethernet_connected(): if not force and self._is_ethernet_connected():
return False, "Cannot enable AP mode while Ethernet is connected" return False, "Cannot enable AP mode while Ethernet is connected"
if force:
logger.debug(f"enable_ap_mode: force=True — WiFi/Ethernet guards bypassed; will create {self._FORCE_AP_FLAG_PATH}")
# Try hostapd/dnsmasq first (captive portal mode) # Try hostapd/dnsmasq first (captive portal mode)
if self.has_hostapd and self.has_dnsmasq: if self.has_hostapd and self.has_dnsmasq:
result = self._enable_ap_mode_hostapd() result = self._enable_ap_mode_hostapd()
if result[0]: if result[0]:
self._ap_enabled_at = time.time() self._ap_enabled_at = time.time()
if force:
try:
self._FORCE_AP_FLAG_PATH.touch()
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
except OSError as exc:
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
return result return result
# Fallback to nmcli hotspot (simpler, no captive portal) # Fallback to nmcli hotspot (simpler, no captive portal)
@@ -1900,6 +1938,12 @@ class WiFiManager:
result = self._enable_ap_mode_nmcli_hotspot() result = self._enable_ap_mode_nmcli_hotspot()
if result[0]: if result[0]:
self._ap_enabled_at = time.time() self._ap_enabled_at = time.time()
if force:
try:
self._FORCE_AP_FLAG_PATH.touch()
logger.debug(f"Force-AP flag created: {self._FORCE_AP_FLAG_PATH}")
except OSError as exc:
logger.warning(f"Failed to create force-AP flag {self._FORCE_AP_FLAG_PATH}: {exc}")
return result return result
return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)" return False, "No WiFi tools available (nmcli, hostapd, or dnsmasq required)"
@@ -2091,8 +2135,14 @@ class WiFiManager:
self._clear_led_message() self._clear_led_message()
return False, "AP started but captive-portal redirect setup failed" return False, "AP started but captive-portal redirect setup failed"
# Verify the AP is actually running # Verify the AP is actually running (retry up to 5x with 2s delay for NM async activation)
status = self._get_ap_status_nmcli() status = {}
for _attempt in range(5):
status = self._get_ap_status_nmcli()
if status.get('active'):
break
logger.debug(f"AP verification attempt {_attempt + 1}/5 not yet active, waiting 2s")
time.sleep(2)
if status.get('active'): if status.get('active'):
ip = status.get('ip', '192.168.4.1') ip = status.get('ip', '192.168.4.1')
logger.info(f"AP mode confirmed active at {ip} (open network, no password)") logger.info(f"AP mode confirmed active at {ip} (open network, no password)")
@@ -2290,6 +2340,7 @@ class WiFiManager:
logger.warning("WiFi radio may be disabled after nmcli AP cleanup") logger.warning("WiFi radio may be disabled after nmcli AP cleanup")
self._ap_enabled_at = None self._ap_enabled_at = None
self._FORCE_AP_FLAG_PATH.unlink(missing_ok=True)
logger.info("AP mode disabled successfully") logger.info("AP mode disabled successfully")
return True, "AP mode disabled" return True, "AP mode disabled"
except Exception as e: except Exception as e:
@@ -2478,22 +2529,29 @@ address=/detectportal.firefox.com/192.168.4.1
else: else:
logger.warning(f"Failed to enable AP mode: {message}") logger.warning(f"Failed to enable AP mode: {message}")
elif not should_have_ap and ap_active: elif not should_have_ap and ap_active:
# Should not have AP but do - disable AP mode # Should not have AP but do - check if it was manually force-enabled
# Always disable if WiFi or Ethernet connects, regardless of auto_enable setting force_active = self._FORCE_AP_FLAG_PATH.exists()
if status.connected or ethernet_connected: if status.connected:
# WiFi connected: always disable AP (user successfully configured WiFi)
success, message = self.disable_ap_mode() success, message = self.disable_ap_mode()
if success: if success:
if status.connected: logger.info("Auto-disabled AP mode (WiFi connected)")
logger.info("Auto-disabled AP mode (WiFi connected)") self._disconnected_checks = 0
elif ethernet_connected:
logger.info("Auto-disabled AP mode (Ethernet connected)")
self._disconnected_checks = 0 # Reset counter
return True return True
else: else:
logger.warning(f"Failed to auto-disable AP mode: {message}") logger.warning(f"Failed to auto-disable AP mode: {message}")
elif ethernet_connected and not force_active:
# Ethernet connected, AP not manually forced: auto-disable
success, message = self.disable_ap_mode()
if success:
logger.info("Auto-disabled AP mode (Ethernet connected)")
self._disconnected_checks = 0
return True
else:
logger.warning(f"Failed to auto-disable AP mode: {message}")
elif ethernet_connected and force_active:
logger.debug("AP mode is force-active; Ethernet connected but auto-disable suppressed")
elif not auto_enable: elif not auto_enable:
# AP is active but auto_enable is disabled - this means it was manually enabled
# Don't disable it automatically, let it stay active
logger.debug("AP mode is active (manually enabled), keeping active") logger.debug("AP mode is active (manually enabled), keeping active")
# Idle-timeout check: disable AP if no client has connected within the window. # Idle-timeout check: disable AP if no client has connected within the window.

View File

@@ -2,10 +2,8 @@ from flask import Flask, request, redirect, url_for, jsonify, Response, send_fro
import json import json
import logging import logging
import os import os
import queue
import sys import sys
import subprocess import subprocess
import threading
import time import time
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -415,44 +413,13 @@ def add_security_headers(response):
return response return response
class _StreamBroadcaster: # SSE helper function
"""Fan-out broadcaster: one background generator thread pushes to all SSE clients. def sse_response(generator_func):
"""Helper to create SSE responses"""
This means N browser tabs share one generator instead of each running their own, def generate():
keeping PIL encodes / subprocess forks constant regardless of how many tabs are open. for data in generator_func():
""" yield f"data: {json.dumps(data)}\n\n"
return Response(generate(), mimetype='text/event-stream')
def __init__(self, generator_factory):
self._generator_factory = generator_factory
self._clients: set = set()
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
def subscribe(self) -> queue.Queue:
q: queue.Queue = queue.Queue(maxsize=5)
with self._lock:
self._clients.add(q)
if not (self._thread and self._thread.is_alive()):
self._thread = threading.Thread(target=self._broadcast, daemon=True)
self._thread.start()
return q
def unsubscribe(self, q: queue.Queue) -> None:
with self._lock:
self._clients.discard(q)
def _broadcast(self):
for data in self._generator_factory():
with self._lock:
if not self._clients:
continue
dead = set()
for q in self._clients:
try:
q.put_nowait(data)
except queue.Full:
dead.add(q)
self._clients -= dead
# System status generator for SSE # System status generator for SSE
def system_status_generator(): def system_status_generator():
@@ -629,50 +596,20 @@ def logs_generator():
time.sleep(5) # Update every 5 seconds (reduced frequency for better performance) time.sleep(5) # Update every 5 seconds (reduced frequency for better performance)
# One broadcaster per stream — shared across all SSE clients
_stats_broadcaster = _StreamBroadcaster(system_status_generator)
_display_broadcaster = _StreamBroadcaster(display_preview_generator)
_logs_broadcaster = _StreamBroadcaster(logs_generator)
def _sse_stream(broadcaster: _StreamBroadcaster) -> Response:
"""Return a streaming SSE response backed by a shared broadcaster."""
q = broadcaster.subscribe()
def generate():
try:
while True:
try:
data = q.get(timeout=30)
yield f"data: {json.dumps(data)}\n\n"
except queue.Empty:
# Send an SSE comment heartbeat to keep the connection alive
# through proxies that close idle connections.
yield ": heartbeat\n\n"
except GeneratorExit:
pass
finally:
broadcaster.unsubscribe(q)
return Response(generate(), mimetype='text/event-stream')
# SSE endpoints # SSE endpoints
@app.route('/api/v3/stream/stats') @app.route('/api/v3/stream/stats')
def stream_stats(): def stream_stats():
return _sse_stream(_stats_broadcaster) return sse_response(system_status_generator)
@app.route('/api/v3/stream/display') @app.route('/api/v3/stream/display')
def stream_display(): def stream_display():
return _sse_stream(_display_broadcaster) return sse_response(display_preview_generator)
@app.route('/api/v3/stream/logs') @app.route('/api/v3/stream/logs')
def stream_logs(): def stream_logs():
return _sse_stream(_logs_broadcaster) return sse_response(logs_generator)
# Exempt SSE streams from CSRF and apply a generous rate limit. # Exempt SSE streams from CSRF and add rate limiting
# SSE connections are long-lived HTTP requests, not repeated API calls, so the
# tight "20 per minute" default would be exhausted quickly on reconnects.
if csrf: if csrf:
csrf.exempt(stream_stats) csrf.exempt(stream_stats)
csrf.exempt(stream_display) csrf.exempt(stream_display)
@@ -680,9 +617,9 @@ if csrf:
# Note: api_v3 blueprint is exempted above after registration # Note: api_v3 blueprint is exempted above after registration
if limiter: if limiter:
limiter.limit("200 per minute")(stream_stats) limiter.limit("20 per minute")(stream_stats)
limiter.limit("200 per minute")(stream_display) limiter.limit("20 per minute")(stream_display)
limiter.limit("200 per minute")(stream_logs) limiter.limit("20 per minute")(stream_logs)
# Main route - redirect to v3 interface as default # Main route - redirect to v3 interface as default
@app.route('/') @app.route('/')

View File

@@ -6542,7 +6542,7 @@ def scan_wifi_networks():
ap_was_active = wifi_manager._is_ap_mode_active() ap_was_active = wifi_manager._is_ap_mode_active()
# Perform the scan (this will handle AP mode disabling/enabling internally) # Perform the scan (this will handle AP mode disabling/enabling internally)
networks = wifi_manager.scan_networks() networks, _was_cached = wifi_manager.scan_networks()
# Convert to dict format # Convert to dict format
networks_data = [ networks_data = [
@@ -6680,7 +6680,9 @@ def enable_ap_mode():
from src.wifi_manager import WiFiManager from src.wifi_manager import WiFiManager
wifi_manager = WiFiManager() wifi_manager = WiFiManager()
success, message = wifi_manager.enable_ap_mode() _force_raw = (request.get_json(silent=True) or {}).get('force', False)
force = _force_raw is True or (isinstance(_force_raw, str) and _force_raw.lower() in ('true', '1'))
success, message = wifi_manager.enable_ap_mode(force=force)
if success: if success:
return jsonify({ return jsonify({

View File

@@ -51,7 +51,7 @@ document.body.addEventListener('htmx:afterRequest', function(event) {
} }
}); });
// SSE reconnection helper — closes and reopens both SSE streams. // SSE reconnection helper
window.reconnectSSE = function() { window.reconnectSSE = function() {
if (window.statsSource) { if (window.statsSource) {
window.statsSource.close(); window.statsSource.close();
@@ -65,9 +65,8 @@ window.reconnectSSE = function() {
if (window.displaySource) { if (window.displaySource) {
window.displaySource.close(); window.displaySource.close();
window.displaySource = new EventSource('/api/v3/stream/display'); window.displaySource = new EventSource('/api/v3/stream/display');
window.displaySource.onmessage = function(event) { window.displaySource.onmessage = function() {
const data = JSON.parse(event.data); // Handle display updates
if (typeof updateDisplayPreview === 'function') updateDisplayPreview(data);
}; };
} }
}; };

View File

@@ -1370,58 +1370,33 @@
<!-- SSE connection for real-time updates --> <!-- SSE connection for real-time updates -->
<script> <script>
// Assign to window so reconnectSSE() in app.js can reach them. // Connect to SSE streams
window.statsSource = new EventSource('/api/v3/stream/stats'); const statsSource = new EventSource('/api/v3/stream/stats');
window.displaySource = new EventSource('/api/v3/stream/display'); const displaySource = new EventSource('/api/v3/stream/display');
window.statsSource.onmessage = function(event) { statsSource.onmessage = function(event) {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
updateSystemStats(data); updateSystemStats(data);
}; };
window.displaySource.onmessage = function(event) { displaySource.onmessage = function(event) {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
updateDisplayPreview(data); updateDisplayPreview(data);
}; };
function _setConnectionStatus(connected, reconnecting) { // Connection status
const el = document.getElementById('connection-status'); statsSource.addEventListener('open', function() {
if (!el) return; document.getElementById('connection-status').innerHTML = `
if (connected) { <div class="w-2 h-2 bg-green-500 rounded-full"></div>
el.innerHTML = ` <span class="text-gray-600">Connected</span>
<div class="w-2 h-2 bg-green-500 rounded-full"></div> `;
<span class="text-gray-600">Connected</span>
`;
} else if (reconnecting) {
el.innerHTML = `
<div class="w-2 h-2 bg-yellow-500 rounded-full animate-pulse"></div>
<span class="text-gray-600">Reconnecting…</span>
`;
} else {
el.innerHTML = `
<div class="w-2 h-2 bg-red-500 rounded-full"></div>
<span class="text-gray-600" title="Connection lost — try refreshing the page">Disconnected</span>
`;
}
}
var _statsErrorCount = 0;
window.statsSource.addEventListener('open', function() {
_statsErrorCount = 0;
_setConnectionStatus(true, false);
}); });
window.statsSource.addEventListener('error', function() { statsSource.addEventListener('error', function() {
_statsErrorCount++; document.getElementById('connection-status').innerHTML = `
// EventSource readyState 0 = CONNECTING (auto-retrying), 2 = CLOSED <div class="w-2 h-2 bg-red-500 rounded-full"></div>
var reconnecting = window.statsSource.readyState === EventSource.CONNECTING; <span class="text-gray-600">Disconnected</span>
_setConnectionStatus(false, reconnecting && _statsErrorCount <= 3); `;
});
window.displaySource.addEventListener('error', function() {
// Display stream errors don't change the status badge but log to console
// so failures aren't completely silent.
console.warn('LEDMatrix: display preview stream error (readyState=' + window.displaySource.readyState + ')');
}); });
function updateSystemStats(data) { function updateSystemStats(data) {