diff --git a/web_interface/app.py b/web_interface/app.py index 1bccd49c..97287119 100644 --- a/web_interface/app.py +++ b/web_interface/app.py @@ -2,8 +2,10 @@ from flask import Flask, request, redirect, url_for, jsonify, Response, send_fro import json import logging import os +import queue import sys import subprocess +import threading import time from pathlib import Path from datetime import datetime, timedelta @@ -413,13 +415,53 @@ def add_security_headers(response): return response -# SSE helper function -def sse_response(generator_func): - """Helper to create SSE responses""" - def generate(): - for data in generator_func(): - yield f"data: {json.dumps(data)}\n\n" - return Response(generate(), mimetype='text/event-stream') +class _StreamBroadcaster: + """Fan-out broadcaster: one background generator thread pushes to all SSE clients. + + This means N browser tabs share one generator instead of each running their own, + keeping PIL encodes / subprocess forks constant regardless of how many tabs are open. + """ + + def __init__(self, generator_factory): + self._generator_factory = generator_factory + self._clients: set = set() + self._lock = threading.Lock() + self._thread: threading.Thread | None = None + + def subscribe(self) -> queue.Queue: + q: queue.Queue = queue.Queue(maxsize=5) + with self._lock: + self._clients.add(q) + if not (self._thread and self._thread.is_alive()): + self._thread = threading.Thread(target=self._broadcast, daemon=True) + self._thread.start() + return q + + def unsubscribe(self, q: queue.Queue) -> None: + with self._lock: + self._clients.discard(q) + + def _broadcast(self): + for data in self._generator_factory(): + with self._lock: + if not self._clients: + # No subscribers — exit so the thread doesn't spin indefinitely. + # subscribe() will restart it when a new client arrives. + break + for q in self._clients: + try: + q.put_nowait(data) + except queue.Full: + # Client is reading too slowly; drop the oldest item and + # deliver the latest so the queue never stalls the client. + try: + q.get_nowait() + except queue.Empty: + pass + try: + q.put_nowait(data) + except queue.Full: + pass # System status generator for SSE def system_status_generator(): @@ -596,20 +638,50 @@ def logs_generator(): time.sleep(5) # Update every 5 seconds (reduced frequency for better performance) +# One broadcaster per stream — shared across all SSE clients +_stats_broadcaster = _StreamBroadcaster(system_status_generator) +_display_broadcaster = _StreamBroadcaster(display_preview_generator) +_logs_broadcaster = _StreamBroadcaster(logs_generator) + + +def _sse_stream(broadcaster: _StreamBroadcaster) -> Response: + """Return a streaming SSE response backed by a shared broadcaster.""" + q = broadcaster.subscribe() + + def generate(): + try: + while True: + try: + data = q.get(timeout=30) + yield f"data: {json.dumps(data)}\n\n" + except queue.Empty: + # Send an SSE comment heartbeat to keep the connection alive + # through proxies that close idle connections. + yield ": heartbeat\n\n" + except GeneratorExit: + pass + finally: + broadcaster.unsubscribe(q) + + return Response(generate(), mimetype='text/event-stream') + + # SSE endpoints @app.route('/api/v3/stream/stats') def stream_stats(): - return sse_response(system_status_generator) + return _sse_stream(_stats_broadcaster) @app.route('/api/v3/stream/display') def stream_display(): - return sse_response(display_preview_generator) + return _sse_stream(_display_broadcaster) @app.route('/api/v3/stream/logs') def stream_logs(): - return sse_response(logs_generator) + return _sse_stream(_logs_broadcaster) -# Exempt SSE streams from CSRF and add rate limiting +# Exempt SSE streams from CSRF and apply a generous rate limit. +# SSE connections are long-lived HTTP requests, not repeated API calls, so the +# tight "20 per minute" default would be exhausted quickly on reconnects. if csrf: csrf.exempt(stream_stats) csrf.exempt(stream_display) @@ -617,9 +689,9 @@ if csrf: # Note: api_v3 blueprint is exempted above after registration if limiter: - limiter.limit("20 per minute")(stream_stats) - limiter.limit("20 per minute")(stream_display) - limiter.limit("20 per minute")(stream_logs) + limiter.limit("200 per minute")(stream_stats) + limiter.limit("200 per minute")(stream_display) + limiter.limit("200 per minute")(stream_logs) # Main route - redirect to v3 interface as default @app.route('/') diff --git a/web_interface/static/v3/app.js b/web_interface/static/v3/app.js index 5fc310ca..01ed814a 100644 --- a/web_interface/static/v3/app.js +++ b/web_interface/static/v3/app.js @@ -1,4 +1,4 @@ -/* global showNotification, updateSystemStats, htmx */ +/* global showNotification, updateSystemStats, updateDisplayPreview, htmx */ // LED Matrix v3 JavaScript // Additional helpers for HTMX and Alpine.js integration @@ -51,7 +51,8 @@ document.body.addEventListener('htmx:afterRequest', function(event) { } }); -// SSE reconnection helper +// SSE reconnection helper — closes and reopens both SSE streams, +// reattaching the open/error handlers defined in base.html. window.reconnectSSE = function() { if (window.statsSource) { window.statsSource.close(); @@ -60,14 +61,18 @@ window.reconnectSSE = function() { const data = JSON.parse(event.data); if (typeof updateSystemStats === 'function') updateSystemStats(data); }; + if (window._statsOpenHandler) window.statsSource.addEventListener('open', window._statsOpenHandler); + if (window._statsErrorHandler) window.statsSource.addEventListener('error', window._statsErrorHandler); } if (window.displaySource) { window.displaySource.close(); window.displaySource = new EventSource('/api/v3/stream/display'); - window.displaySource.onmessage = function() { - // Handle display updates + window.displaySource.onmessage = function(event) { + const data = JSON.parse(event.data); + if (typeof updateDisplayPreview === 'function') updateDisplayPreview(data); }; + if (window._displayErrorHandler) window.displaySource.addEventListener('error', window._displayErrorHandler); } }; diff --git a/web_interface/templates/v3/base.html b/web_interface/templates/v3/base.html index 081d0870..bca338d1 100644 --- a/web_interface/templates/v3/base.html +++ b/web_interface/templates/v3/base.html @@ -1370,34 +1370,64 @@