mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-05-25 13:43:31 +00:00
fix(web-ui): support multiple browser tabs via SSE broadcaster pattern
Each SSE stream (stats, display preview, logs) previously ran a separate generator per connected client, so two open tabs meant double the PIL image encodes per second and double the journalctl subprocesses. Under load or on reconnect storms the tight "20 per minute" rate limit was easily exhausted, silently breaking tabs without any user-facing explanation. - Replace per-client sse_response generators with _StreamBroadcaster: one background thread per stream type fans data to all subscribed client queues, keeping CPU/subprocess work constant regardless of how many tabs are open - Add 30-second SSE heartbeat comments to keep idle connections alive through proxies - Raise SSE rate limit from "20/min" to "200/min" to prevent reconnect storms from exhausting the limit - Assign statsSource/displaySource to window.* so reconnectSSE() in app.js can actually reach them (was dead code due to const scoping) - Add displaySource error handler so display preview failures are no longer completely silent - Improve connection status badge: shows "Reconnecting…" on first few errors, "Disconnected" with tooltip hint after persistent failure - Complete the empty displaySource.onmessage stub in reconnectSSE() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,8 +2,10 @@ from flask import Flask, request, redirect, url_for, jsonify, Response, send_fro
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
@@ -413,13 +415,44 @@ def add_security_headers(response):
|
|||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
# SSE helper function
|
class _StreamBroadcaster:
|
||||||
def sse_response(generator_func):
|
"""Fan-out broadcaster: one background generator thread pushes to all SSE clients.
|
||||||
"""Helper to create SSE responses"""
|
|
||||||
def generate():
|
This means N browser tabs share one generator instead of each running their own,
|
||||||
for data in generator_func():
|
keeping PIL encodes / subprocess forks constant regardless of how many tabs are open.
|
||||||
yield f"data: {json.dumps(data)}\n\n"
|
"""
|
||||||
return Response(generate(), mimetype='text/event-stream')
|
|
||||||
|
def __init__(self, generator_factory):
|
||||||
|
self._generator_factory = generator_factory
|
||||||
|
self._clients: set = set()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._thread: threading.Thread | None = None
|
||||||
|
|
||||||
|
def subscribe(self) -> queue.Queue:
|
||||||
|
q: queue.Queue = queue.Queue(maxsize=5)
|
||||||
|
with self._lock:
|
||||||
|
self._clients.add(q)
|
||||||
|
if not (self._thread and self._thread.is_alive()):
|
||||||
|
self._thread = threading.Thread(target=self._broadcast, daemon=True)
|
||||||
|
self._thread.start()
|
||||||
|
return q
|
||||||
|
|
||||||
|
def unsubscribe(self, q: queue.Queue) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._clients.discard(q)
|
||||||
|
|
||||||
|
def _broadcast(self):
|
||||||
|
for data in self._generator_factory():
|
||||||
|
with self._lock:
|
||||||
|
if not self._clients:
|
||||||
|
continue
|
||||||
|
dead = set()
|
||||||
|
for q in self._clients:
|
||||||
|
try:
|
||||||
|
q.put_nowait(data)
|
||||||
|
except queue.Full:
|
||||||
|
dead.add(q)
|
||||||
|
self._clients -= dead
|
||||||
|
|
||||||
# System status generator for SSE
|
# System status generator for SSE
|
||||||
def system_status_generator():
|
def system_status_generator():
|
||||||
@@ -596,20 +629,50 @@ def logs_generator():
|
|||||||
|
|
||||||
time.sleep(5) # Update every 5 seconds (reduced frequency for better performance)
|
time.sleep(5) # Update every 5 seconds (reduced frequency for better performance)
|
||||||
|
|
||||||
|
# One broadcaster per stream — shared across all SSE clients
|
||||||
|
_stats_broadcaster = _StreamBroadcaster(system_status_generator)
|
||||||
|
_display_broadcaster = _StreamBroadcaster(display_preview_generator)
|
||||||
|
_logs_broadcaster = _StreamBroadcaster(logs_generator)
|
||||||
|
|
||||||
|
|
||||||
|
def _sse_stream(broadcaster: _StreamBroadcaster) -> Response:
|
||||||
|
"""Return a streaming SSE response backed by a shared broadcaster."""
|
||||||
|
q = broadcaster.subscribe()
|
||||||
|
|
||||||
|
def generate():
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = q.get(timeout=30)
|
||||||
|
yield f"data: {json.dumps(data)}\n\n"
|
||||||
|
except queue.Empty:
|
||||||
|
# Send an SSE comment heartbeat to keep the connection alive
|
||||||
|
# through proxies that close idle connections.
|
||||||
|
yield ": heartbeat\n\n"
|
||||||
|
except GeneratorExit:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
broadcaster.unsubscribe(q)
|
||||||
|
|
||||||
|
return Response(generate(), mimetype='text/event-stream')
|
||||||
|
|
||||||
|
|
||||||
# SSE endpoints
|
# SSE endpoints
|
||||||
@app.route('/api/v3/stream/stats')
|
@app.route('/api/v3/stream/stats')
|
||||||
def stream_stats():
|
def stream_stats():
|
||||||
return sse_response(system_status_generator)
|
return _sse_stream(_stats_broadcaster)
|
||||||
|
|
||||||
@app.route('/api/v3/stream/display')
|
@app.route('/api/v3/stream/display')
|
||||||
def stream_display():
|
def stream_display():
|
||||||
return sse_response(display_preview_generator)
|
return _sse_stream(_display_broadcaster)
|
||||||
|
|
||||||
@app.route('/api/v3/stream/logs')
|
@app.route('/api/v3/stream/logs')
|
||||||
def stream_logs():
|
def stream_logs():
|
||||||
return sse_response(logs_generator)
|
return _sse_stream(_logs_broadcaster)
|
||||||
|
|
||||||
# Exempt SSE streams from CSRF and add rate limiting
|
# Exempt SSE streams from CSRF and apply a generous rate limit.
|
||||||
|
# SSE connections are long-lived HTTP requests, not repeated API calls, so the
|
||||||
|
# tight "20 per minute" default would be exhausted quickly on reconnects.
|
||||||
if csrf:
|
if csrf:
|
||||||
csrf.exempt(stream_stats)
|
csrf.exempt(stream_stats)
|
||||||
csrf.exempt(stream_display)
|
csrf.exempt(stream_display)
|
||||||
@@ -617,9 +680,9 @@ if csrf:
|
|||||||
# Note: api_v3 blueprint is exempted above after registration
|
# Note: api_v3 blueprint is exempted above after registration
|
||||||
|
|
||||||
if limiter:
|
if limiter:
|
||||||
limiter.limit("20 per minute")(stream_stats)
|
limiter.limit("200 per minute")(stream_stats)
|
||||||
limiter.limit("20 per minute")(stream_display)
|
limiter.limit("200 per minute")(stream_display)
|
||||||
limiter.limit("20 per minute")(stream_logs)
|
limiter.limit("200 per minute")(stream_logs)
|
||||||
|
|
||||||
# Main route - redirect to v3 interface as default
|
# Main route - redirect to v3 interface as default
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ document.body.addEventListener('htmx:afterRequest', function(event) {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// SSE reconnection helper
|
// SSE reconnection helper — closes and reopens both SSE streams.
|
||||||
window.reconnectSSE = function() {
|
window.reconnectSSE = function() {
|
||||||
if (window.statsSource) {
|
if (window.statsSource) {
|
||||||
window.statsSource.close();
|
window.statsSource.close();
|
||||||
@@ -65,8 +65,9 @@ window.reconnectSSE = function() {
|
|||||||
if (window.displaySource) {
|
if (window.displaySource) {
|
||||||
window.displaySource.close();
|
window.displaySource.close();
|
||||||
window.displaySource = new EventSource('/api/v3/stream/display');
|
window.displaySource = new EventSource('/api/v3/stream/display');
|
||||||
window.displaySource.onmessage = function() {
|
window.displaySource.onmessage = function(event) {
|
||||||
// Handle display updates
|
const data = JSON.parse(event.data);
|
||||||
|
if (typeof updateDisplayPreview === 'function') updateDisplayPreview(data);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1370,33 +1370,58 @@
|
|||||||
|
|
||||||
<!-- SSE connection for real-time updates -->
|
<!-- SSE connection for real-time updates -->
|
||||||
<script>
|
<script>
|
||||||
// Connect to SSE streams
|
// Assign to window so reconnectSSE() in app.js can reach them.
|
||||||
const statsSource = new EventSource('/api/v3/stream/stats');
|
window.statsSource = new EventSource('/api/v3/stream/stats');
|
||||||
const displaySource = new EventSource('/api/v3/stream/display');
|
window.displaySource = new EventSource('/api/v3/stream/display');
|
||||||
|
|
||||||
statsSource.onmessage = function(event) {
|
window.statsSource.onmessage = function(event) {
|
||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
updateSystemStats(data);
|
updateSystemStats(data);
|
||||||
};
|
};
|
||||||
|
|
||||||
displaySource.onmessage = function(event) {
|
window.displaySource.onmessage = function(event) {
|
||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
updateDisplayPreview(data);
|
updateDisplayPreview(data);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Connection status
|
function _setConnectionStatus(connected, reconnecting) {
|
||||||
statsSource.addEventListener('open', function() {
|
const el = document.getElementById('connection-status');
|
||||||
document.getElementById('connection-status').innerHTML = `
|
if (!el) return;
|
||||||
<div class="w-2 h-2 bg-green-500 rounded-full"></div>
|
if (connected) {
|
||||||
<span class="text-gray-600">Connected</span>
|
el.innerHTML = `
|
||||||
`;
|
<div class="w-2 h-2 bg-green-500 rounded-full"></div>
|
||||||
|
<span class="text-gray-600">Connected</span>
|
||||||
|
`;
|
||||||
|
} else if (reconnecting) {
|
||||||
|
el.innerHTML = `
|
||||||
|
<div class="w-2 h-2 bg-yellow-500 rounded-full animate-pulse"></div>
|
||||||
|
<span class="text-gray-600">Reconnecting…</span>
|
||||||
|
`;
|
||||||
|
} else {
|
||||||
|
el.innerHTML = `
|
||||||
|
<div class="w-2 h-2 bg-red-500 rounded-full"></div>
|
||||||
|
<span class="text-gray-600" title="Connection lost — try refreshing the page">Disconnected</span>
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _statsErrorCount = 0;
|
||||||
|
window.statsSource.addEventListener('open', function() {
|
||||||
|
_statsErrorCount = 0;
|
||||||
|
_setConnectionStatus(true, false);
|
||||||
});
|
});
|
||||||
|
|
||||||
statsSource.addEventListener('error', function() {
|
window.statsSource.addEventListener('error', function() {
|
||||||
document.getElementById('connection-status').innerHTML = `
|
_statsErrorCount++;
|
||||||
<div class="w-2 h-2 bg-red-500 rounded-full"></div>
|
// EventSource readyState 0 = CONNECTING (auto-retrying), 2 = CLOSED
|
||||||
<span class="text-gray-600">Disconnected</span>
|
var reconnecting = window.statsSource.readyState === EventSource.CONNECTING;
|
||||||
`;
|
_setConnectionStatus(false, reconnecting && _statsErrorCount <= 3);
|
||||||
|
});
|
||||||
|
|
||||||
|
window.displaySource.addEventListener('error', function() {
|
||||||
|
// Display stream errors don't change the status badge but log to console
|
||||||
|
// so failures aren't completely silent.
|
||||||
|
console.warn('LEDMatrix: display preview stream error (readyState=' + window.displaySource.readyState + ')');
|
||||||
});
|
});
|
||||||
|
|
||||||
function updateSystemStats(data) {
|
function updateSystemStats(data) {
|
||||||
|
|||||||
Reference in New Issue
Block a user