From 704e99f55c6de8bb26800ca99e12db4121b18c2f Mon Sep 17 00:00:00 2001 From: Chuck Date: Sun, 24 May 2026 17:16:47 -0400 Subject: [PATCH] fix(web-ui): support multiple browser tabs via SSE broadcaster pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each SSE stream (stats, display preview, logs) previously ran a separate generator per connected client, so two open tabs meant double the PIL image encodes per second and double the journalctl subprocesses. Under load or on reconnect storms the tight "20 per minute" rate limit was easily exhausted, silently breaking tabs without any user-facing explanation. - Replace per-client sse_response generators with _StreamBroadcaster: one background thread per stream type fans data to all subscribed client queues, keeping CPU/subprocess work constant regardless of how many tabs are open - Add 30-second SSE heartbeat comments to keep idle connections alive through proxies - Raise SSE rate limit from "20/min" to "200/min" to prevent reconnect storms from exhausting the limit - Assign statsSource/displaySource to window.* so reconnectSSE() in app.js can actually reach them (was dead code due to const scoping) - Add displaySource error handler so display preview failures are no longer completely silent - Improve connection status badge: shows "Reconnecting…" on first few errors, "Disconnected" with tooltip hint after persistent failure - Complete the empty displaySource.onmessage stub in reconnectSSE() Co-Authored-By: Claude Sonnet 4.6 --- web_interface/app.py | 91 +++++++++++++++++++++++----- web_interface/static/v3/app.js | 7 ++- web_interface/templates/v3/base.html | 57 ++++++++++++----- 3 files changed, 122 insertions(+), 33 deletions(-) diff --git a/web_interface/app.py b/web_interface/app.py index 1bccd49c..33321b93 100644 --- a/web_interface/app.py +++ b/web_interface/app.py @@ -2,8 +2,10 @@ from flask import Flask, request, redirect, url_for, jsonify, Response, send_fro import json import logging import os +import queue import sys import subprocess +import threading import time from pathlib import Path from datetime import datetime, timedelta @@ -413,13 +415,44 @@ def add_security_headers(response): return response -# SSE helper function -def sse_response(generator_func): - """Helper to create SSE responses""" - def generate(): - for data in generator_func(): - yield f"data: {json.dumps(data)}\n\n" - return Response(generate(), mimetype='text/event-stream') +class _StreamBroadcaster: + """Fan-out broadcaster: one background generator thread pushes to all SSE clients. + + This means N browser tabs share one generator instead of each running their own, + keeping PIL encodes / subprocess forks constant regardless of how many tabs are open. + """ + + def __init__(self, generator_factory): + self._generator_factory = generator_factory + self._clients: set = set() + self._lock = threading.Lock() + self._thread: threading.Thread | None = None + + def subscribe(self) -> queue.Queue: + q: queue.Queue = queue.Queue(maxsize=5) + with self._lock: + self._clients.add(q) + if not (self._thread and self._thread.is_alive()): + self._thread = threading.Thread(target=self._broadcast, daemon=True) + self._thread.start() + return q + + def unsubscribe(self, q: queue.Queue) -> None: + with self._lock: + self._clients.discard(q) + + def _broadcast(self): + for data in self._generator_factory(): + with self._lock: + if not self._clients: + continue + dead = set() + for q in self._clients: + try: + q.put_nowait(data) + except queue.Full: + dead.add(q) + self._clients -= dead # System status generator for SSE def system_status_generator(): @@ -596,20 +629,50 @@ def logs_generator(): time.sleep(5) # Update every 5 seconds (reduced frequency for better performance) +# One broadcaster per stream — shared across all SSE clients +_stats_broadcaster = _StreamBroadcaster(system_status_generator) +_display_broadcaster = _StreamBroadcaster(display_preview_generator) +_logs_broadcaster = _StreamBroadcaster(logs_generator) + + +def _sse_stream(broadcaster: _StreamBroadcaster) -> Response: + """Return a streaming SSE response backed by a shared broadcaster.""" + q = broadcaster.subscribe() + + def generate(): + try: + while True: + try: + data = q.get(timeout=30) + yield f"data: {json.dumps(data)}\n\n" + except queue.Empty: + # Send an SSE comment heartbeat to keep the connection alive + # through proxies that close idle connections. + yield ": heartbeat\n\n" + except GeneratorExit: + pass + finally: + broadcaster.unsubscribe(q) + + return Response(generate(), mimetype='text/event-stream') + + # SSE endpoints @app.route('/api/v3/stream/stats') def stream_stats(): - return sse_response(system_status_generator) + return _sse_stream(_stats_broadcaster) @app.route('/api/v3/stream/display') def stream_display(): - return sse_response(display_preview_generator) + return _sse_stream(_display_broadcaster) @app.route('/api/v3/stream/logs') def stream_logs(): - return sse_response(logs_generator) + return _sse_stream(_logs_broadcaster) -# Exempt SSE streams from CSRF and add rate limiting +# Exempt SSE streams from CSRF and apply a generous rate limit. +# SSE connections are long-lived HTTP requests, not repeated API calls, so the +# tight "20 per minute" default would be exhausted quickly on reconnects. if csrf: csrf.exempt(stream_stats) csrf.exempt(stream_display) @@ -617,9 +680,9 @@ if csrf: # Note: api_v3 blueprint is exempted above after registration if limiter: - limiter.limit("20 per minute")(stream_stats) - limiter.limit("20 per minute")(stream_display) - limiter.limit("20 per minute")(stream_logs) + limiter.limit("200 per minute")(stream_stats) + limiter.limit("200 per minute")(stream_display) + limiter.limit("200 per minute")(stream_logs) # Main route - redirect to v3 interface as default @app.route('/') diff --git a/web_interface/static/v3/app.js b/web_interface/static/v3/app.js index 5fc310ca..aa2f933a 100644 --- a/web_interface/static/v3/app.js +++ b/web_interface/static/v3/app.js @@ -51,7 +51,7 @@ document.body.addEventListener('htmx:afterRequest', function(event) { } }); -// SSE reconnection helper +// SSE reconnection helper — closes and reopens both SSE streams. window.reconnectSSE = function() { if (window.statsSource) { window.statsSource.close(); @@ -65,8 +65,9 @@ window.reconnectSSE = function() { if (window.displaySource) { window.displaySource.close(); window.displaySource = new EventSource('/api/v3/stream/display'); - window.displaySource.onmessage = function() { - // Handle display updates + window.displaySource.onmessage = function(event) { + const data = JSON.parse(event.data); + if (typeof updateDisplayPreview === 'function') updateDisplayPreview(data); }; } }; diff --git a/web_interface/templates/v3/base.html b/web_interface/templates/v3/base.html index 081d0870..b4b9cf47 100644 --- a/web_interface/templates/v3/base.html +++ b/web_interface/templates/v3/base.html @@ -1370,33 +1370,58 @@