mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-05-26 05:53:33 +00:00
fix(web-ui): support multiple browser tabs via SSE broadcaster (#349)
* fix(web-ui): support multiple browser tabs via SSE broadcaster pattern Each SSE stream (stats, display preview, logs) previously ran a separate generator per connected client, so two open tabs meant double the PIL image encodes per second and double the journalctl subprocesses. Under load or on reconnect storms the tight "20 per minute" rate limit was easily exhausted, silently breaking tabs without any user-facing explanation. - Replace per-client sse_response generators with _StreamBroadcaster: one background thread per stream type fans data to all subscribed client queues, keeping CPU/subprocess work constant regardless of how many tabs are open - Add 30-second SSE heartbeat comments to keep idle connections alive through proxies - Raise SSE rate limit from "20/min" to "200/min" to prevent reconnect storms from exhausting the limit - Assign statsSource/displaySource to window.* so reconnectSSE() in app.js can actually reach them (was dead code due to const scoping) - Add displaySource error handler so display preview failures are no longer completely silent - Improve connection status badge: shows "Reconnecting…" on first few errors, "Disconnected" with tooltip hint after persistent failure - Complete the empty displaySource.onmessage stub in reconnectSSE() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(web-ui): harden SSE broadcaster — drop-oldest on full queue, exit on no subscribers, reattach reconnect handlers - _broadcast: on queue.Full drop the oldest item and retry the put instead of removing the client from _clients — a slow tab now stays subscribed and receives the latest data rather than being silently ejected - _broadcast: break instead of continue when _clients is empty so the background generator thread exits rather than spinning indefinitely; subscribe() already restarts it on the next connection - base.html: expose _statsOpenHandler, _statsErrorHandler, and _displayErrorHandler as window properties so reconnectSSE() can reattach them after replacing the EventSource instances - app.js: reconnectSSE() now reattaches those handlers after creating each new EventSource so the status badge and display-stream console logging survive a manual reconnect Heartbeat path (~line 646) is a queue read (q.get), not a write; no queue.Full can occur there so no change needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(lint): declare updateDisplayPreview in ESLint global comment Codacy flagged 'updateDisplayPreview is not defined' at app.js:73. The function is defined in base.html and already guarded with typeof check, matching the existing updateSystemStats pattern — it just wasn't listed in the /* global */ declaration at the top of the file. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Chuck <chuck@example.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,8 +2,10 @@ from flask import Flask, request, redirect, url_for, jsonify, Response, send_fro
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
@@ -413,13 +415,53 @@ def add_security_headers(response):
|
|||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
# SSE helper function
|
class _StreamBroadcaster:
|
||||||
def sse_response(generator_func):
|
"""Fan-out broadcaster: one background generator thread pushes to all SSE clients.
|
||||||
"""Helper to create SSE responses"""
|
|
||||||
def generate():
|
This means N browser tabs share one generator instead of each running their own,
|
||||||
for data in generator_func():
|
keeping PIL encodes / subprocess forks constant regardless of how many tabs are open.
|
||||||
yield f"data: {json.dumps(data)}\n\n"
|
"""
|
||||||
return Response(generate(), mimetype='text/event-stream')
|
|
||||||
|
def __init__(self, generator_factory):
|
||||||
|
self._generator_factory = generator_factory
|
||||||
|
self._clients: set = set()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._thread: threading.Thread | None = None
|
||||||
|
|
||||||
|
def subscribe(self) -> queue.Queue:
|
||||||
|
q: queue.Queue = queue.Queue(maxsize=5)
|
||||||
|
with self._lock:
|
||||||
|
self._clients.add(q)
|
||||||
|
if not (self._thread and self._thread.is_alive()):
|
||||||
|
self._thread = threading.Thread(target=self._broadcast, daemon=True)
|
||||||
|
self._thread.start()
|
||||||
|
return q
|
||||||
|
|
||||||
|
def unsubscribe(self, q: queue.Queue) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._clients.discard(q)
|
||||||
|
|
||||||
|
def _broadcast(self):
|
||||||
|
for data in self._generator_factory():
|
||||||
|
with self._lock:
|
||||||
|
if not self._clients:
|
||||||
|
# No subscribers — exit so the thread doesn't spin indefinitely.
|
||||||
|
# subscribe() will restart it when a new client arrives.
|
||||||
|
break
|
||||||
|
for q in self._clients:
|
||||||
|
try:
|
||||||
|
q.put_nowait(data)
|
||||||
|
except queue.Full:
|
||||||
|
# Client is reading too slowly; drop the oldest item and
|
||||||
|
# deliver the latest so the queue never stalls the client.
|
||||||
|
try:
|
||||||
|
q.get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
q.put_nowait(data)
|
||||||
|
except queue.Full:
|
||||||
|
pass
|
||||||
|
|
||||||
# System status generator for SSE
|
# System status generator for SSE
|
||||||
def system_status_generator():
|
def system_status_generator():
|
||||||
@@ -596,20 +638,50 @@ def logs_generator():
|
|||||||
|
|
||||||
time.sleep(5) # Update every 5 seconds (reduced frequency for better performance)
|
time.sleep(5) # Update every 5 seconds (reduced frequency for better performance)
|
||||||
|
|
||||||
|
# One broadcaster per stream — shared across all SSE clients
|
||||||
|
_stats_broadcaster = _StreamBroadcaster(system_status_generator)
|
||||||
|
_display_broadcaster = _StreamBroadcaster(display_preview_generator)
|
||||||
|
_logs_broadcaster = _StreamBroadcaster(logs_generator)
|
||||||
|
|
||||||
|
|
||||||
|
def _sse_stream(broadcaster: _StreamBroadcaster) -> Response:
|
||||||
|
"""Return a streaming SSE response backed by a shared broadcaster."""
|
||||||
|
q = broadcaster.subscribe()
|
||||||
|
|
||||||
|
def generate():
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = q.get(timeout=30)
|
||||||
|
yield f"data: {json.dumps(data)}\n\n"
|
||||||
|
except queue.Empty:
|
||||||
|
# Send an SSE comment heartbeat to keep the connection alive
|
||||||
|
# through proxies that close idle connections.
|
||||||
|
yield ": heartbeat\n\n"
|
||||||
|
except GeneratorExit:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
broadcaster.unsubscribe(q)
|
||||||
|
|
||||||
|
return Response(generate(), mimetype='text/event-stream')
|
||||||
|
|
||||||
|
|
||||||
# SSE endpoints
|
# SSE endpoints
|
||||||
@app.route('/api/v3/stream/stats')
|
@app.route('/api/v3/stream/stats')
|
||||||
def stream_stats():
|
def stream_stats():
|
||||||
return sse_response(system_status_generator)
|
return _sse_stream(_stats_broadcaster)
|
||||||
|
|
||||||
@app.route('/api/v3/stream/display')
|
@app.route('/api/v3/stream/display')
|
||||||
def stream_display():
|
def stream_display():
|
||||||
return sse_response(display_preview_generator)
|
return _sse_stream(_display_broadcaster)
|
||||||
|
|
||||||
@app.route('/api/v3/stream/logs')
|
@app.route('/api/v3/stream/logs')
|
||||||
def stream_logs():
|
def stream_logs():
|
||||||
return sse_response(logs_generator)
|
return _sse_stream(_logs_broadcaster)
|
||||||
|
|
||||||
# Exempt SSE streams from CSRF and add rate limiting
|
# Exempt SSE streams from CSRF and apply a generous rate limit.
|
||||||
|
# SSE connections are long-lived HTTP requests, not repeated API calls, so the
|
||||||
|
# tight "20 per minute" default would be exhausted quickly on reconnects.
|
||||||
if csrf:
|
if csrf:
|
||||||
csrf.exempt(stream_stats)
|
csrf.exempt(stream_stats)
|
||||||
csrf.exempt(stream_display)
|
csrf.exempt(stream_display)
|
||||||
@@ -617,9 +689,9 @@ if csrf:
|
|||||||
# Note: api_v3 blueprint is exempted above after registration
|
# Note: api_v3 blueprint is exempted above after registration
|
||||||
|
|
||||||
if limiter:
|
if limiter:
|
||||||
limiter.limit("20 per minute")(stream_stats)
|
limiter.limit("200 per minute")(stream_stats)
|
||||||
limiter.limit("20 per minute")(stream_display)
|
limiter.limit("200 per minute")(stream_display)
|
||||||
limiter.limit("20 per minute")(stream_logs)
|
limiter.limit("200 per minute")(stream_logs)
|
||||||
|
|
||||||
# Main route - redirect to v3 interface as default
|
# Main route - redirect to v3 interface as default
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
/* global showNotification, updateSystemStats, htmx */
|
/* global showNotification, updateSystemStats, updateDisplayPreview, htmx */
|
||||||
// LED Matrix v3 JavaScript
|
// LED Matrix v3 JavaScript
|
||||||
// Additional helpers for HTMX and Alpine.js integration
|
// Additional helpers for HTMX and Alpine.js integration
|
||||||
|
|
||||||
@@ -51,7 +51,8 @@ document.body.addEventListener('htmx:afterRequest', function(event) {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// SSE reconnection helper
|
// SSE reconnection helper — closes and reopens both SSE streams,
|
||||||
|
// reattaching the open/error handlers defined in base.html.
|
||||||
window.reconnectSSE = function() {
|
window.reconnectSSE = function() {
|
||||||
if (window.statsSource) {
|
if (window.statsSource) {
|
||||||
window.statsSource.close();
|
window.statsSource.close();
|
||||||
@@ -60,14 +61,18 @@ window.reconnectSSE = function() {
|
|||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
if (typeof updateSystemStats === 'function') updateSystemStats(data);
|
if (typeof updateSystemStats === 'function') updateSystemStats(data);
|
||||||
};
|
};
|
||||||
|
if (window._statsOpenHandler) window.statsSource.addEventListener('open', window._statsOpenHandler);
|
||||||
|
if (window._statsErrorHandler) window.statsSource.addEventListener('error', window._statsErrorHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (window.displaySource) {
|
if (window.displaySource) {
|
||||||
window.displaySource.close();
|
window.displaySource.close();
|
||||||
window.displaySource = new EventSource('/api/v3/stream/display');
|
window.displaySource = new EventSource('/api/v3/stream/display');
|
||||||
window.displaySource.onmessage = function() {
|
window.displaySource.onmessage = function(event) {
|
||||||
// Handle display updates
|
const data = JSON.parse(event.data);
|
||||||
|
if (typeof updateDisplayPreview === 'function') updateDisplayPreview(data);
|
||||||
};
|
};
|
||||||
|
if (window._displayErrorHandler) window.displaySource.addEventListener('error', window._displayErrorHandler);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -1370,34 +1370,64 @@
|
|||||||
|
|
||||||
<!-- SSE connection for real-time updates -->
|
<!-- SSE connection for real-time updates -->
|
||||||
<script>
|
<script>
|
||||||
// Connect to SSE streams
|
// Assign to window so reconnectSSE() in app.js can reach them.
|
||||||
const statsSource = new EventSource('/api/v3/stream/stats');
|
window.statsSource = new EventSource('/api/v3/stream/stats');
|
||||||
const displaySource = new EventSource('/api/v3/stream/display');
|
window.displaySource = new EventSource('/api/v3/stream/display');
|
||||||
|
|
||||||
statsSource.onmessage = function(event) {
|
window.statsSource.onmessage = function(event) {
|
||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
updateSystemStats(data);
|
updateSystemStats(data);
|
||||||
};
|
};
|
||||||
|
|
||||||
displaySource.onmessage = function(event) {
|
window.displaySource.onmessage = function(event) {
|
||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
updateDisplayPreview(data);
|
updateDisplayPreview(data);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Connection status
|
function _setConnectionStatus(connected, reconnecting) {
|
||||||
statsSource.addEventListener('open', function() {
|
const el = document.getElementById('connection-status');
|
||||||
document.getElementById('connection-status').innerHTML = `
|
if (!el) return;
|
||||||
|
if (connected) {
|
||||||
|
el.innerHTML = `
|
||||||
<div class="w-2 h-2 bg-green-500 rounded-full"></div>
|
<div class="w-2 h-2 bg-green-500 rounded-full"></div>
|
||||||
<span class="text-gray-600">Connected</span>
|
<span class="text-gray-600">Connected</span>
|
||||||
`;
|
`;
|
||||||
});
|
} else if (reconnecting) {
|
||||||
|
el.innerHTML = `
|
||||||
statsSource.addEventListener('error', function() {
|
<div class="w-2 h-2 bg-yellow-500 rounded-full animate-pulse"></div>
|
||||||
document.getElementById('connection-status').innerHTML = `
|
<span class="text-gray-600">Reconnecting…</span>
|
||||||
<div class="w-2 h-2 bg-red-500 rounded-full"></div>
|
|
||||||
<span class="text-gray-600">Disconnected</span>
|
|
||||||
`;
|
`;
|
||||||
});
|
} else {
|
||||||
|
el.innerHTML = `
|
||||||
|
<div class="w-2 h-2 bg-red-500 rounded-full"></div>
|
||||||
|
<span class="text-gray-600" title="Connection lost — try refreshing the page">Disconnected</span>
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _statsErrorCount = 0;
|
||||||
|
|
||||||
|
// Named on window so reconnectSSE() in app.js can reattach them after
|
||||||
|
// replacing the EventSource instances.
|
||||||
|
window._statsOpenHandler = function() {
|
||||||
|
_statsErrorCount = 0;
|
||||||
|
_setConnectionStatus(true, false);
|
||||||
|
};
|
||||||
|
window._statsErrorHandler = function() {
|
||||||
|
_statsErrorCount++;
|
||||||
|
// EventSource readyState 0 = CONNECTING (auto-retrying), 2 = CLOSED
|
||||||
|
var reconnecting = window.statsSource.readyState === EventSource.CONNECTING;
|
||||||
|
_setConnectionStatus(false, reconnecting && _statsErrorCount <= 3);
|
||||||
|
};
|
||||||
|
window._displayErrorHandler = function() {
|
||||||
|
// Display stream errors don't change the status badge but log to console
|
||||||
|
// so failures aren't completely silent.
|
||||||
|
console.warn('LEDMatrix: display preview stream error (readyState=' + window.displaySource.readyState + ')');
|
||||||
|
};
|
||||||
|
|
||||||
|
window.statsSource.addEventListener('open', window._statsOpenHandler);
|
||||||
|
window.statsSource.addEventListener('error', window._statsErrorHandler);
|
||||||
|
window.displaySource.addEventListener('error', window._displayErrorHandler);
|
||||||
|
|
||||||
function updateSystemStats(data) {
|
function updateSystemStats(data) {
|
||||||
// Update CPU in header
|
// Update CPU in header
|
||||||
|
|||||||
Reference in New Issue
Block a user