mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-05-25 13:43:31 +00:00
fix(web-ui): harden SSE broadcaster — drop-oldest on full queue, exit on no subscribers, reattach reconnect handlers
- _broadcast: on queue.Full drop the oldest item and retry the put instead of removing the client from _clients — a slow tab now stays subscribed and receives the latest data rather than being silently ejected - _broadcast: break instead of continue when _clients is empty so the background generator thread exits rather than spinning indefinitely; subscribe() already restarts it on the next connection - base.html: expose _statsOpenHandler, _statsErrorHandler, and _displayErrorHandler as window properties so reconnectSSE() can reattach them after replacing the EventSource instances - app.js: reconnectSSE() now reattaches those handlers after creating each new EventSource so the status badge and display-stream console logging survive a manual reconnect Heartbeat path (~line 646) is a queue read (q.get), not a write; no queue.Full can occur there so no change needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -445,14 +445,23 @@ class _StreamBroadcaster:
|
|||||||
for data in self._generator_factory():
|
for data in self._generator_factory():
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if not self._clients:
|
if not self._clients:
|
||||||
continue
|
# No subscribers — exit so the thread doesn't spin indefinitely.
|
||||||
dead = set()
|
# subscribe() will restart it when a new client arrives.
|
||||||
|
break
|
||||||
for q in self._clients:
|
for q in self._clients:
|
||||||
try:
|
try:
|
||||||
q.put_nowait(data)
|
q.put_nowait(data)
|
||||||
except queue.Full:
|
except queue.Full:
|
||||||
dead.add(q)
|
# Client is reading too slowly; drop the oldest item and
|
||||||
self._clients -= dead
|
# deliver the latest so the queue never stalls the client.
|
||||||
|
try:
|
||||||
|
q.get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
q.put_nowait(data)
|
||||||
|
except queue.Full:
|
||||||
|
pass
|
||||||
|
|
||||||
# System status generator for SSE
|
# System status generator for SSE
|
||||||
def system_status_generator():
|
def system_status_generator():
|
||||||
|
|||||||
@@ -51,7 +51,8 @@ document.body.addEventListener('htmx:afterRequest', function(event) {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// SSE reconnection helper — closes and reopens both SSE streams.
|
// SSE reconnection helper — closes and reopens both SSE streams,
|
||||||
|
// reattaching the open/error handlers defined in base.html.
|
||||||
window.reconnectSSE = function() {
|
window.reconnectSSE = function() {
|
||||||
if (window.statsSource) {
|
if (window.statsSource) {
|
||||||
window.statsSource.close();
|
window.statsSource.close();
|
||||||
@@ -60,6 +61,8 @@ window.reconnectSSE = function() {
|
|||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
if (typeof updateSystemStats === 'function') updateSystemStats(data);
|
if (typeof updateSystemStats === 'function') updateSystemStats(data);
|
||||||
};
|
};
|
||||||
|
if (window._statsOpenHandler) window.statsSource.addEventListener('open', window._statsOpenHandler);
|
||||||
|
if (window._statsErrorHandler) window.statsSource.addEventListener('error', window._statsErrorHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (window.displaySource) {
|
if (window.displaySource) {
|
||||||
@@ -69,6 +72,7 @@ window.reconnectSSE = function() {
|
|||||||
const data = JSON.parse(event.data);
|
const data = JSON.parse(event.data);
|
||||||
if (typeof updateDisplayPreview === 'function') updateDisplayPreview(data);
|
if (typeof updateDisplayPreview === 'function') updateDisplayPreview(data);
|
||||||
};
|
};
|
||||||
|
if (window._displayErrorHandler) window.displaySource.addEventListener('error', window._displayErrorHandler);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -1406,23 +1406,28 @@
|
|||||||
}
|
}
|
||||||
|
|
||||||
var _statsErrorCount = 0;
|
var _statsErrorCount = 0;
|
||||||
window.statsSource.addEventListener('open', function() {
|
|
||||||
|
// Named on window so reconnectSSE() in app.js can reattach them after
|
||||||
|
// replacing the EventSource instances.
|
||||||
|
window._statsOpenHandler = function() {
|
||||||
_statsErrorCount = 0;
|
_statsErrorCount = 0;
|
||||||
_setConnectionStatus(true, false);
|
_setConnectionStatus(true, false);
|
||||||
});
|
};
|
||||||
|
window._statsErrorHandler = function() {
|
||||||
window.statsSource.addEventListener('error', function() {
|
|
||||||
_statsErrorCount++;
|
_statsErrorCount++;
|
||||||
// EventSource readyState 0 = CONNECTING (auto-retrying), 2 = CLOSED
|
// EventSource readyState 0 = CONNECTING (auto-retrying), 2 = CLOSED
|
||||||
var reconnecting = window.statsSource.readyState === EventSource.CONNECTING;
|
var reconnecting = window.statsSource.readyState === EventSource.CONNECTING;
|
||||||
_setConnectionStatus(false, reconnecting && _statsErrorCount <= 3);
|
_setConnectionStatus(false, reconnecting && _statsErrorCount <= 3);
|
||||||
});
|
};
|
||||||
|
window._displayErrorHandler = function() {
|
||||||
window.displaySource.addEventListener('error', function() {
|
|
||||||
// Display stream errors don't change the status badge but log to console
|
// Display stream errors don't change the status badge but log to console
|
||||||
// so failures aren't completely silent.
|
// so failures aren't completely silent.
|
||||||
console.warn('LEDMatrix: display preview stream error (readyState=' + window.displaySource.readyState + ')');
|
console.warn('LEDMatrix: display preview stream error (readyState=' + window.displaySource.readyState + ')');
|
||||||
});
|
};
|
||||||
|
|
||||||
|
window.statsSource.addEventListener('open', window._statsOpenHandler);
|
||||||
|
window.statsSource.addEventListener('error', window._statsErrorHandler);
|
||||||
|
window.displaySource.addEventListener('error', window._displayErrorHandler);
|
||||||
|
|
||||||
function updateSystemStats(data) {
|
function updateSystemStats(data) {
|
||||||
// Update CPU in header
|
// Update CPU in header
|
||||||
|
|||||||
Reference in New Issue
Block a user