Files
LEDMatrix/src/display_manager.py
2025-08-18 15:21:39 -05:00

743 lines
33 KiB
Python

from rgbmatrix import RGBMatrix, RGBMatrixOptions
from PIL import Image, ImageDraw, ImageFont
import time
from typing import Dict, Any, List, Tuple
import logging
import math
from .weather_icons import WeatherIcons
import os
import freetype
# Get logger without configuring
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) # Set to INFO level
class DisplayManager:
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(DisplayManager, cls).__new__(cls)
return cls._instance
def __init__(self, config: Dict[str, Any] = None, force_fallback: bool = False, suppress_test_pattern: bool = False):
start_time = time.time()
self.config = config or {}
self._force_fallback = force_fallback
self._suppress_test_pattern = suppress_test_pattern
# Snapshot settings for web preview integration (service writes, web reads)
self._snapshot_path = "/tmp/led_matrix_preview.png"
self._snapshot_min_interval_sec = 0.2 # max ~5 fps
self._last_snapshot_ts = 0.0
# Scrolling state tracking for graceful updates
self._scrolling_state = {
'is_scrolling': False,
'last_scroll_activity': 0,
'scroll_inactivity_threshold': 2.0, # seconds of inactivity before considering "not scrolling"
'deferred_updates': []
}
self._setup_matrix()
logger.info("Matrix setup completed in %.3f seconds", time.time() - start_time)
font_time = time.time()
self._load_fonts()
logger.info("Font loading completed in %.3f seconds", time.time() - font_time)
# Initialize managers
# Calendar manager is now initialized by DisplayController
def _setup_matrix(self):
"""Initialize the RGB matrix with configuration settings."""
setup_start = time.time()
try:
# Allow callers (e.g., web UI) to force non-hardware fallback mode
if getattr(self, '_force_fallback', False):
raise RuntimeError('Forced fallback mode requested')
options = RGBMatrixOptions()
# Hardware configuration
hardware_config = self.config.get('display', {}).get('hardware', {})
runtime_config = self.config.get('display', {}).get('runtime', {})
# Basic hardware settings
options.rows = hardware_config.get('rows', 32)
options.cols = hardware_config.get('cols', 64)
options.chain_length = hardware_config.get('chain_length', 2)
options.parallel = hardware_config.get('parallel', 1)
options.hardware_mapping = hardware_config.get('hardware_mapping', 'adafruit-hat-pwm')
# Performance and stability settings
options.brightness = hardware_config.get('brightness', 90)
options.pwm_bits = hardware_config.get('pwm_bits', 10)
options.pwm_lsb_nanoseconds = hardware_config.get('pwm_lsb_nanoseconds', 150)
options.led_rgb_sequence = hardware_config.get('led_rgb_sequence', 'RGB')
options.pixel_mapper_config = hardware_config.get('pixel_mapper_config', '')
options.row_address_type = hardware_config.get('row_address_type', 0)
options.multiplexing = hardware_config.get('multiplexing', 0)
options.disable_hardware_pulsing = hardware_config.get('disable_hardware_pulsing', False)
options.show_refresh_rate = hardware_config.get('show_refresh_rate', False)
options.limit_refresh_rate_hz = hardware_config.get('limit_refresh_rate_hz', 90)
options.gpio_slowdown = runtime_config.get('gpio_slowdown', 2)
# Additional settings from config
if 'scan_mode' in hardware_config:
options.scan_mode = hardware_config.get('scan_mode')
if 'pwm_dither_bits' in hardware_config:
options.pwm_dither_bits = hardware_config.get('pwm_dither_bits')
if 'inverse_colors' in hardware_config:
options.inverse_colors = hardware_config.get('inverse_colors')
logger.info(f"Initializing RGB Matrix with settings: rows={options.rows}, cols={options.cols}, chain_length={options.chain_length}, parallel={options.parallel}, hardware_mapping={options.hardware_mapping}")
# Initialize the matrix
self.matrix = RGBMatrix(options=options)
logger.info("RGB Matrix initialized successfully")
# Create double buffer for smooth updates
self.offscreen_canvas = self.matrix.CreateFrameCanvas()
self.current_canvas = self.matrix.CreateFrameCanvas()
logger.info("Frame canvases created successfully")
# Create image with full chain width
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image)
logger.info(f"Image canvas created with dimensions: {self.matrix.width}x{self.matrix.height}")
# Initialize font with Press Start 2P
try:
self.font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
logger.info("Initial Press Start 2P font loaded successfully")
except Exception as e:
logger.error(f"Failed to load initial font: {e}")
self.font = ImageFont.load_default()
# Draw a test pattern unless caller suppressed it (e.g., web on-demand)
if not getattr(self, '_suppress_test_pattern', False):
self._draw_test_pattern()
except Exception as e:
logger.error(f"Failed to initialize RGB Matrix: {e}", exc_info=True)
# Create a fallback image for web preview using configured dimensions when available
self.matrix = None
try:
hardware_config = self.config.get('display', {}).get('hardware', {}) if self.config else {}
rows = int(hardware_config.get('rows', 32))
cols = int(hardware_config.get('cols', 64))
chain_length = int(hardware_config.get('chain_length', 2))
fallback_width = max(1, cols * chain_length)
fallback_height = max(1, rows)
except Exception:
fallback_width, fallback_height = 128, 32
self.image = Image.new('RGB', (fallback_width, fallback_height))
self.draw = ImageDraw.Draw(self.image)
# Simple fallback visualization so web UI shows a realistic canvas
try:
self.draw.rectangle([0, 0, fallback_width - 1, fallback_height - 1], outline=(255, 0, 0))
self.draw.line([0, 0, fallback_width - 1, fallback_height - 1], fill=(0, 255, 0))
self.draw.text((2, max(0, (fallback_height // 2) - 4)), "Simulation", fill=(0, 128, 255))
except Exception:
# Best-effort; ignore drawing errors in fallback
pass
logger.error(f"Matrix initialization failed, using fallback mode with size {fallback_width}x{fallback_height}. Error: {e}")
# Do not raise here; allow fallback mode so web preview and non-hardware environments work
@property
def width(self):
"""Get the display width."""
if hasattr(self, 'matrix') and self.matrix is not None:
return self.matrix.width
elif hasattr(self, 'image'):
return self.image.width
else:
return 128 # Default fallback width
@property
def height(self):
"""Get the display height."""
if hasattr(self, 'matrix') and self.matrix is not None:
return self.matrix.height
elif hasattr(self, 'image'):
return self.image.height
else:
return 32 # Default fallback height
def _draw_test_pattern(self):
"""Draw a test pattern to verify the display is working."""
try:
self.clear()
if self.matrix is None:
# Fallback mode - just draw on the image
self.draw.rectangle([0, 0, self.image.width-1, self.image.height-1], outline=(255, 0, 0))
self.draw.line([0, 0, self.image.width-1, self.image.height-1], fill=(0, 255, 0))
self.draw.text((10, 10), "Simulation", font=self.font, fill=(0, 0, 255))
logger.info("Drew test pattern in fallback mode")
return
# Draw a red rectangle border
self.draw.rectangle([0, 0, self.matrix.width-1, self.matrix.height-1], outline=(255, 0, 0))
# Draw a diagonal line
self.draw.line([0, 0, self.matrix.width-1, self.matrix.height-1], fill=(0, 255, 0))
# Draw some text - changed from "TEST" to "Initializing" with smaller font
self.draw.text((10, 10), "Initializing", font=self.font, fill=(0, 0, 255))
# Update the display once after everything is drawn
self.update_display()
time.sleep(0.5) # Reduced from 1 second to 0.5 seconds for faster animation
except Exception as e:
logger.error(f"Error drawing test pattern: {e}", exc_info=True)
def update_display(self):
"""Update the display using double buffering with proper sync."""
try:
if self.matrix is None:
# Fallback mode - no actual hardware to update
logger.debug("Update display called in fallback mode (no hardware)")
# Still write a snapshot so the web UI can preview
self._write_snapshot_if_due()
return
# Copy the current image to the offscreen canvas
self.offscreen_canvas.SetImage(self.image)
# Swap buffers immediately
self.matrix.SwapOnVSync(self.offscreen_canvas, False)
# Swap our canvas references
self.offscreen_canvas, self.current_canvas = self.current_canvas, self.offscreen_canvas
# Write a snapshot for the web preview (throttled)
self._write_snapshot_if_due()
except Exception as e:
logger.error(f"Error updating display: {e}")
def clear(self):
"""Clear the display completely."""
try:
if self.matrix is None:
# Fallback mode - just clear the image
self.image = Image.new('RGB', (self.image.width, self.image.height))
self.draw = ImageDraw.Draw(self.image)
logger.debug("Cleared display in fallback mode")
return
# Create a new black image
self.image = Image.new('RGB', (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image)
# Clear both canvases and the underlying matrix to ensure no artifacts
try:
self.offscreen_canvas.Clear()
except Exception:
pass
try:
self.current_canvas.Clear()
except Exception:
pass
try:
# Extra safety: clear the matrix front buffer as well
self.matrix.Clear()
except Exception:
pass
# Update the display to show the clear. Swap twice to flush any latent frame.
self.update_display()
time.sleep(0.01)
self.update_display()
except Exception as e:
logger.error(f"Error clearing display: {e}")
def _draw_bdf_text(self, text, x, y, color=(255, 255, 255), font=None):
"""Draw text using BDF font with proper bitmap handling."""
try:
# Use the passed font or fall back to calendar_font
face = font if font else self.calendar_font
# Compute baseline from font ascender so caller can pass top-left y
try:
ascender_px = face.size.ascender >> 6
except Exception:
ascender_px = 0
baseline_y = y + ascender_px
for char in text:
face.load_char(char)
bitmap = face.glyph.bitmap
# Get glyph metrics
glyph_left = face.glyph.bitmap_left
glyph_top = face.glyph.bitmap_top
# Draw the character
for i in range(bitmap.rows):
for j in range(bitmap.width):
byte_index = i * bitmap.pitch + (j // 8)
if byte_index < len(bitmap.buffer):
byte = bitmap.buffer[byte_index]
if byte & (1 << (7 - (j % 8))):
# Calculate actual pixel position
pixel_x = x + glyph_left + j
pixel_y = baseline_y - glyph_top + i
# Only draw if within bounds
if (0 <= pixel_x < self.width and 0 <= pixel_y < self.height):
self.draw.point((pixel_x, pixel_y), fill=color)
# Move to next character
x += face.glyph.advance.x >> 6
except Exception as e:
logger.error(f"Error drawing BDF text: {e}", exc_info=True)
def _load_fonts(self):
"""Load fonts with proper error handling."""
try:
# Load Press Start 2P font
self.regular_font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
logger.info("Press Start 2P font loaded successfully")
# Use the same font for small text (currently same size; adjust size here if needed)
self.small_font = ImageFont.truetype("assets/fonts/PressStart2P-Regular.ttf", 8)
logger.info("Press Start 2P small font loaded successfully")
# Load 5x7 BDF font for calendar events
try:
self.calendar_font_path = "assets/fonts/5x7.bdf"
logger.info(f"Attempting to load 5x7 font from: {self.calendar_font_path}")
if not os.path.exists(self.calendar_font_path):
raise FileNotFoundError(f"Font file not found at {self.calendar_font_path}")
# Load with freetype for proper BDF handling
face = freetype.Face(self.calendar_font_path)
logger.info(f"5x7 calendar font loaded successfully from {self.calendar_font_path}")
logger.info(f"Calendar font size: {face.size.height >> 6} pixels")
# Store the face for later use
self.calendar_font = face
except Exception as font_err:
logger.error(f"Failed to load 5x7 font: {str(font_err)}", exc_info=True)
logger.error("Falling back to small font")
self.calendar_font = self.small_font
# Assign the loaded calendar_font (which should be 5x7 BDF or its fallback)
# to a new attribute for specific use, e.g., in MusicManager.
self.bdf_5x7_font = self.calendar_font
logger.info(f"Assigned calendar_font (type: {type(self.bdf_5x7_font).__name__}) to bdf_5x7_font.")
# Load 4x6 font as extra_small_font
try:
font_path = "assets/fonts/4x6-font.ttf"
logger.info(f"Attempting to load 4x6 TTF font from: {font_path} at size 6")
self.extra_small_font = ImageFont.truetype(font_path, 6)
logger.info(f"4x6 TTF extra small font loaded successfully from {font_path}")
except Exception as font_err:
logger.error(f"Failed to load 4x6 TTF font: {font_err}. Falling back.")
self.extra_small_font = self.small_font
except Exception as e:
logger.error(f"Error in font loading: {e}", exc_info=True)
# Fallback to default font
self.regular_font = ImageFont.load_default()
self.small_font = self.regular_font
self.calendar_font = self.regular_font
if not hasattr(self, 'extra_small_font'):
self.extra_small_font = self.regular_font
if not hasattr(self, 'bdf_5x7_font'): # Ensure bdf_5x7_font also gets a fallback
self.bdf_5x7_font = self.regular_font
def get_text_width(self, text, font):
"""Get the width of text when rendered with the given font."""
try:
if isinstance(font, freetype.Face):
# For FreeType faces, calculate width using freetype
width = 0
for char in text:
font.load_char(char)
width += font.glyph.advance.x >> 6
return width
else:
# For PIL fonts, use textbbox
bbox = self.draw.textbbox((0, 0), text, font=font)
return bbox[2] - bbox[0]
except Exception as e:
logger.error(f"Error getting text width: {e}")
return 0 # Return 0 as fallback
def get_font_height(self, font):
"""Get the height of the given font for line spacing purposes."""
try:
if isinstance(font, freetype.Face):
# For FreeType faces (BDF), the 'height' metric gives the recommended line spacing.
return font.size.height >> 6
else:
# For PIL TTF fonts, getmetrics() provides ascent and descent.
# The line height is the sum of ascent and descent.
ascent, descent = font.getmetrics()
return ascent + descent
except Exception as e:
logger.error(f"Error getting font height for font type {type(font).__name__}: {e}")
# Fallback for TTF font if getmetrics() fails, or for other font types.
if hasattr(font, 'size'):
return font.size
return 8 # A reasonable default for an 8px font.
def draw_text(self, text: str, x: int = None, y: int = None, color: tuple = (255, 255, 255),
small_font: bool = False, font: ImageFont = None):
"""Draw text on the canvas with optional font selection."""
try:
# Select font based on parameters
if font:
current_font = font
else:
current_font = self.small_font if small_font else self.regular_font
# Calculate x position if not provided (center text)
if x is None:
text_width = self.get_text_width(text, current_font)
x = (self.width - text_width) // 2
# Set default y position if not provided
if y is None:
y = 0 # Default to top of display
# Draw the text
if isinstance(current_font, freetype.Face):
# For BDF fonts, _draw_bdf_text will compute the baseline from the
# provided top-left y using the font ascender. Do not adjust here.
self._draw_bdf_text(text, x, y, color, current_font)
else:
# For TTF fonts, use PIL's text drawing which expects top-left.
self.draw.text((x, y), text, font=current_font, fill=color)
except Exception as e:
logger.error(f"Error drawing text: {e}", exc_info=True)
def draw_sun(self, x: int, y: int, size: int = 16):
"""Draw a sun icon using yellow circles and lines."""
center = (x + size//2, y + size//2)
radius = size//3
# Draw the center circle
self.draw.ellipse([center[0]-radius, center[1]-radius,
center[0]+radius, center[1]+radius],
fill=(255, 255, 0)) # Yellow
# Draw the rays
ray_length = size//4
for angle in range(0, 360, 45):
rad = math.radians(angle)
start_x = center[0] + (radius * math.cos(rad))
start_y = center[1] + (radius * math.sin(rad))
end_x = center[0] + ((radius + ray_length) * math.cos(rad))
end_y = center[1] + ((radius + ray_length) * math.sin(rad))
self.draw.line([start_x, start_y, end_x, end_y], fill=(255, 255, 0), width=2)
def draw_cloud(self, x: int, y: int, size: int = 16, color=(200, 200, 200)):
"""Draw a cloud icon."""
# Draw multiple circles to form a cloud shape
self.draw.ellipse([x+size//4, y+size//3, x+size//4+size//2, y+size//3+size//2], fill=color)
self.draw.ellipse([x+size//2, y+size//3, x+size//2+size//2, y+size//3+size//2], fill=color)
self.draw.ellipse([x+size//3, y+size//6, x+size//3+size//2, y+size//6+size//2], fill=color)
def draw_rain(self, x: int, y: int, size: int = 16):
"""Draw rain icon with cloud and droplets."""
# Draw cloud
self.draw_cloud(x, y, size)
# Draw rain drops
drop_color = (0, 0, 255) # Blue
drop_size = size//6
for i in range(3):
drop_x = x + size//4 + (i * size//3)
drop_y = y + size//2
self.draw.line([drop_x, drop_y, drop_x, drop_y+drop_size],
fill=drop_color, width=2)
def draw_snow(self, x: int, y: int, size: int = 16):
"""Draw snow icon with cloud and snowflakes."""
# Draw cloud
self.draw_cloud(x, y, size)
# Draw snowflakes
snow_color = (200, 200, 255) # Light blue
for i in range(3):
center_x = x + size//4 + (i * size//3)
center_y = y + size//2 + size//4
# Draw a small star shape
for angle in range(0, 360, 60):
rad = math.radians(angle)
end_x = center_x + (size//8 * math.cos(rad))
end_y = center_y + (size//8 * math.sin(rad))
self.draw.line([center_x, center_y, end_x, end_y],
fill=snow_color, width=1)
# Weather icon color constants
WEATHER_COLORS = {
'sun': (255, 200, 0), # Bright yellow
'cloud': (200, 200, 200), # Light gray
'rain': (0, 100, 255), # Light blue
'snow': (220, 220, 255), # Ice blue
'storm': (255, 255, 0) # Lightning yellow
}
def _draw_sun(self, x: int, y: int, size: int) -> None:
"""Draw a sun icon with rays."""
center_x, center_y = x + size//2, y + size//2
radius = size//4
ray_length = size//3
# Draw the main sun circle
self.draw.ellipse([center_x - radius, center_y - radius,
center_x + radius, center_y + radius],
fill=self.WEATHER_COLORS['sun'])
# Draw sun rays
for angle in range(0, 360, 45):
rad = math.radians(angle)
start_x = center_x + int((radius + 2) * math.cos(rad))
start_y = center_y + int((radius + 2) * math.sin(rad))
end_x = center_x + int((radius + ray_length) * math.cos(rad))
end_y = center_y + int((radius + ray_length) * math.sin(rad))
self.draw.line([start_x, start_y, end_x, end_y],
fill=self.WEATHER_COLORS['sun'], width=2)
def _draw_cloud(self, x: int, y: int, size: int) -> None:
"""Draw a cloud using multiple circles."""
cloud_color = self.WEATHER_COLORS['cloud']
base_y = y + size//2
# Draw main cloud body (3 overlapping circles)
circle_radius = size//4
positions = [
(x + size//3, base_y), # Left circle
(x + size//2, base_y - size//6), # Top circle
(x + 2*size//3, base_y) # Right circle
]
for cx, cy in positions:
self.draw.ellipse([cx - circle_radius, cy - circle_radius,
cx + circle_radius, cy + circle_radius],
fill=cloud_color)
def _draw_rain(self, x: int, y: int, size: int) -> None:
"""Draw rain drops falling from a cloud."""
self._draw_cloud(x, y, size)
rain_color = self.WEATHER_COLORS['rain']
# Draw rain drops at an angle
drop_size = size//8
drops = [
(x + size//4, y + 2*size//3),
(x + size//2, y + 3*size//4),
(x + 3*size//4, y + 2*size//3)
]
for dx, dy in drops:
# Draw angled rain drops
self.draw.line([dx, dy, dx - drop_size//2, dy + drop_size],
fill=rain_color, width=2)
def _draw_snow(self, x: int, y: int, size: int) -> None:
"""Draw snowflakes falling from a cloud."""
self._draw_cloud(x, y, size)
snow_color = self.WEATHER_COLORS['snow']
# Draw snowflakes
flake_size = size//6
flakes = [
(x + size//4, y + 2*size//3),
(x + size//2, y + 3*size//4),
(x + 3*size//4, y + 2*size//3)
]
for fx, fy in flakes:
# Draw a snowflake (six-pointed star)
for angle in range(0, 360, 60):
rad = math.radians(angle)
end_x = fx + int(flake_size * math.cos(rad))
end_y = fy + int(flake_size * math.sin(rad))
self.draw.line([fx, fy, end_x, end_y],
fill=snow_color, width=1)
def _draw_storm(self, x: int, y: int, size: int) -> None:
"""Draw a storm cloud with lightning bolt."""
self._draw_cloud(x, y, size)
# Draw lightning bolt
bolt_color = self.WEATHER_COLORS['storm']
bolt_points = [
(x + size//2, y + size//2), # Top
(x + 3*size//5, y + 2*size//3), # Middle right
(x + 2*size//5, y + 2*size//3), # Middle left
(x + size//2, y + 5*size//6) # Bottom
]
self.draw.polygon(bolt_points, fill=bolt_color)
def draw_weather_icon(self, condition: str, x: int, y: int, size: int = 16) -> None:
"""Draw a weather icon based on the condition."""
if condition.lower() in ['clear', 'sunny']:
self._draw_sun(x, y, size)
elif condition.lower() in ['clouds', 'cloudy', 'partly cloudy']:
self._draw_cloud(x, y, size)
elif condition.lower() in ['rain', 'drizzle', 'shower']:
self._draw_rain(x, y, size)
elif condition.lower() in ['snow', 'sleet', 'hail']:
self._draw_snow(x, y, size)
elif condition.lower() in ['thunderstorm', 'storm']:
self._draw_storm(x, y, size)
else:
self._draw_sun(x, y, size)
# Note: No update_display() here - let the caller handle the update
def draw_text_with_icons(self, text: str, icons: List[tuple] = None, x: int = None, y: int = None,
color: tuple = (255, 255, 255)):
"""Draw text with weather icons at specified positions."""
# Draw the text
self.draw_text(text, x, y, color)
# Draw any icons
if icons:
for icon_type, icon_x, icon_y in icons:
self.draw_weather_icon(icon_type, icon_x, icon_y)
# Update the display once after everything is drawn
self.update_display()
def cleanup(self):
"""Clean up resources."""
if hasattr(self, 'matrix') and self.matrix is not None:
try:
self.matrix.Clear()
except Exception as e:
logger.warning(f"Error clearing matrix during cleanup: {e}")
# Ensure image/draw are reset to a blank state
if hasattr(self, 'image') and hasattr(self, 'draw'):
try:
self.image = Image.new('RGB', (self.width, self.height))
self.draw = ImageDraw.Draw(self.image)
except Exception:
pass
# Reset the singleton state when cleaning up
DisplayManager._instance = None
DisplayManager._initialized = False
def format_date_with_ordinal(self, dt):
"""Formats a datetime object into 'Mon Aug 30th' style."""
day = dt.day
if 11 <= day <= 13:
suffix = 'th'
else:
suffix = {1: 'st', 2: 'nd', 3: 'rd'}.get(day % 10, 'th')
return dt.strftime(f"%b %-d{suffix}")
def set_scrolling_state(self, is_scrolling: bool):
"""Set the current scrolling state. Call this when a display starts/stops scrolling."""
current_time = time.time()
self._scrolling_state['is_scrolling'] = is_scrolling
if is_scrolling:
self._scrolling_state['last_scroll_activity'] = current_time
logger.debug(f"Scrolling state set to: {is_scrolling}")
def is_currently_scrolling(self) -> bool:
"""Check if the display is currently in a scrolling state."""
current_time = time.time()
# If explicitly not scrolling, return False
if not self._scrolling_state['is_scrolling']:
return False
# If we've been inactive for the threshold period, consider it not scrolling
if current_time - self._scrolling_state['last_scroll_activity'] > self._scrolling_state['scroll_inactivity_threshold']:
self._scrolling_state['is_scrolling'] = False
return False
return True
def defer_update(self, update_func, priority: int = 0):
"""Defer an update function to be called when not scrolling.
Args:
update_func: Function to call when not scrolling
priority: Priority level (lower numbers = higher priority)
"""
self._scrolling_state['deferred_updates'].append({
'func': update_func,
'priority': priority,
'timestamp': time.time()
})
# Sort by priority (lower numbers first)
self._scrolling_state['deferred_updates'].sort(key=lambda x: x['priority'])
logger.debug(f"Deferred update added. Total deferred: {len(self._scrolling_state['deferred_updates'])}")
def process_deferred_updates(self):
"""Process any deferred updates if not currently scrolling."""
if self.is_currently_scrolling():
return
if not self._scrolling_state['deferred_updates']:
return
# Process all deferred updates
updates_to_process = self._scrolling_state['deferred_updates'].copy()
self._scrolling_state['deferred_updates'].clear()
logger.debug(f"Processing {len(updates_to_process)} deferred updates")
for update_info in updates_to_process:
try:
update_info['func']()
logger.debug("Deferred update executed successfully")
except Exception as e:
logger.error(f"Error executing deferred update: {e}")
# Re-add failed updates for retry
self._scrolling_state['deferred_updates'].append(update_info)
def get_scrolling_stats(self) -> dict:
"""Get current scrolling statistics for debugging."""
return {
'is_scrolling': self._scrolling_state['is_scrolling'],
'last_activity': self._scrolling_state['last_scroll_activity'],
'deferred_count': len(self._scrolling_state['deferred_updates']),
'inactivity_threshold': self._scrolling_state['scroll_inactivity_threshold']
}
def _write_snapshot_if_due(self) -> None:
"""Write the current image to a PNG snapshot file at a limited frequency."""
try:
now = time.time()
if (now - self._last_snapshot_ts) < self._snapshot_min_interval_sec:
return
# Ensure directory exists
snapshot_dir = os.path.dirname(self._snapshot_path)
if snapshot_dir and not os.path.exists(snapshot_dir):
os.makedirs(snapshot_dir, exist_ok=True)
# Write atomically: temp then replace
tmp_path = f"{self._snapshot_path}.tmp"
self.image.save(tmp_path, format='PNG')
try:
os.replace(tmp_path, self._snapshot_path)
except Exception:
# Fallback to direct save if replace not supported
self.image.save(self._snapshot_path, format='PNG')
# Try to make the snapshot world-readable so the web UI can read it regardless of user
try:
os.chmod(self._snapshot_path, 0o644)
except Exception:
pass
self._last_snapshot_ts = now
except Exception as e:
# Snapshot failures should never break display; log at debug to avoid noise
logger.debug(f"Snapshot write skipped: {e}")