From a5186d656ec2607d9c142450bb98a0cc5d40aa1e Mon Sep 17 00:00:00 2001 From: ChuckBuilds <33324927+ChuckBuilds@users.noreply.github.com> Date: Sat, 19 Apr 2025 16:56:09 -0500 Subject: [PATCH] Optimize cache and weather managers for better performance. Add thread-safe caching, reduce processing overhead, and improve error handling. --- src/cache_manager.py | 179 +++++++++++++++---------------- src/weather_manager.py | 233 ++++++++++++++++++----------------------- 2 files changed, 186 insertions(+), 226 deletions(-) diff --git a/src/cache_manager.py b/src/cache_manager.py index 06347250..358c9903 100644 --- a/src/cache_manager.py +++ b/src/cache_manager.py @@ -6,6 +6,7 @@ import pytz from typing import Any, Dict, Optional import logging import stat +import threading class DateTimeEncoder(json.JSONEncoder): def default(self, obj): @@ -17,94 +18,100 @@ class CacheManager: def __init__(self, cache_dir: str = "cache"): self.logger = logging.getLogger(__name__) self.cache_dir = cache_dir + self._memory_cache = {} + self._memory_cache_timestamps = {} + self._cache_lock = threading.Lock() self._ensure_cache_dir() def _ensure_cache_dir(self) -> None: - """Ensure the cache directory exists with proper permissions.""" + """Ensure cache directory exists.""" if not os.path.exists(self.cache_dir): - try: - # If running as root, use /tmp by default - if os.geteuid() == 0: - self.cache_dir = os.path.join("/tmp", "ledmatrix_cache") - self.logger.info(f"Running as root, using temporary cache directory: {self.cache_dir}") - # Try to create in user's home directory if current directory fails - elif not os.access(os.getcwd(), os.W_OK): - home_dir = os.path.expanduser("~") - self.cache_dir = os.path.join(home_dir, ".ledmatrix_cache") - self.logger.info(f"Using cache directory in home: {self.cache_dir}") - - # Create directory with 755 permissions (rwxr-xr-x) - os.makedirs(self.cache_dir, mode=0o755, exist_ok=True) - - # If running as sudo, change ownership to the real user - if os.geteuid() == 0: # Check if running as root - import pwd - # Get the real user (not root) - real_user = os.environ.get('SUDO_USER') - if real_user: - uid = pwd.getpwnam(real_user).pw_uid - gid = pwd.getpwnam(real_user).pw_gid - os.chown(self.cache_dir, uid, gid) - self.logger.info(f"Changed cache directory ownership to {real_user}") - except Exception as e: - self.logger.error(f"Error setting up cache directory: {e}") - # Fall back to /tmp if all else fails - self.cache_dir = os.path.join("/tmp", "ledmatrix_cache") - try: - os.makedirs(self.cache_dir, mode=0o755, exist_ok=True) - self.logger.info(f"Using temporary cache directory: {self.cache_dir}") - except Exception as e: - self.logger.error(f"Failed to create temporary cache directory: {e}") - raise + os.makedirs(self.cache_dir) - def _get_cache_path(self, data_type: str) -> str: - """Get the path for a specific cache file.""" - return os.path.join(self.cache_dir, f"{data_type}_cache.json") + def _get_cache_path(self, key: str) -> str: + """Get the path for a cache file.""" + return os.path.join(self.cache_dir, f"{key}.json") + + def load_cache(self, key: str) -> Optional[Dict[str, Any]]: + """Load data from cache with memory caching.""" + current_time = time.time() + + # Check memory cache first + if key in self._memory_cache: + if current_time - self._memory_cache_timestamps.get(key, 0) < 60: # 1 minute TTL + return self._memory_cache[key] + else: + # Clear expired memory cache + del self._memory_cache[key] + del self._memory_cache_timestamps[key] + + cache_path = self._get_cache_path(key) + if not os.path.exists(cache_path): + return None - def load_cache(self, data_type: str) -> Optional[Dict[str, Any]]: - """Load cached data for a specific type.""" - cache_path = self._get_cache_path(data_type) try: - if os.path.exists(cache_path): + with self._cache_lock: with open(cache_path, 'r') as f: - return json.load(f) + data = json.load(f) + # Update memory cache + self._memory_cache[key] = data + self._memory_cache_timestamps[key] = current_time + return data except Exception as e: - self.logger.error(f"Error loading cache for {data_type}: {e}") - return None + self.logger.error(f"Error loading cache for {key}: {e}") + return None + + def save_cache(self, key: str, data: Dict[str, Any]) -> None: + """Save data to cache with memory caching.""" + cache_path = self._get_cache_path(key) + current_time = time.time() - def save_cache(self, data_type: str, data: Dict[str, Any]) -> bool: - """Save data to cache with proper permissions.""" - cache_path = self._get_cache_path(data_type) try: - # Create a temporary file first - temp_path = cache_path + '.tmp' - with open(temp_path, 'w') as f: - json.dump(data, f, cls=DateTimeEncoder) - - # Set proper permissions (644 - rw-r--r--) - os.chmod(temp_path, 0o644) - - # If running as sudo, change ownership to the real user - if os.geteuid() == 0: # Check if running as root - import pwd - real_user = os.environ.get('SUDO_USER') - if real_user: - uid = pwd.getpwnam(real_user).pw_uid - gid = pwd.getpwnam(real_user).pw_gid - os.chown(temp_path, uid, gid) - - # Rename temp file to actual cache file (atomic operation) - os.replace(temp_path, cache_path) - return True + with self._cache_lock: + # Update memory cache first + self._memory_cache[key] = data + self._memory_cache_timestamps[key] = current_time + + # Then save to disk + with open(cache_path, 'w') as f: + json.dump(data, f) except Exception as e: - self.logger.error(f"Error saving cache for {data_type}: {e}") - # Clean up temp file if it exists - if os.path.exists(temp_path): - try: - os.remove(temp_path) - except: - pass - return False + self.logger.error(f"Error saving cache for {key}: {e}") + + def get_cached_data(self, key: str) -> Optional[Dict[str, Any]]: + """Get cached data with memory cache priority.""" + current_time = time.time() + + # Check memory cache first + if key in self._memory_cache: + if current_time - self._memory_cache_timestamps.get(key, 0) < 60: # 1 minute TTL + return self._memory_cache[key] + else: + # Clear expired memory cache + del self._memory_cache[key] + del self._memory_cache_timestamps[key] + + # Fall back to disk cache + return self.load_cache(key) + + def clear_cache(self, key: Optional[str] = None) -> None: + """Clear cache for a specific key or all keys.""" + with self._cache_lock: + if key: + # Clear specific key + if key in self._memory_cache: + del self._memory_cache[key] + del self._memory_cache_timestamps[key] + cache_path = self._get_cache_path(key) + if os.path.exists(cache_path): + os.remove(cache_path) + else: + # Clear all keys + self._memory_cache.clear() + self._memory_cache_timestamps.clear() + for file in os.listdir(self.cache_dir): + if file.endswith('.json'): + os.remove(os.path.join(self.cache_dir, file)) def has_data_changed(self, data_type: str, new_data: Dict[str, Any]) -> bool: """Check if data has changed from cached version.""" @@ -181,22 +188,4 @@ class CacheManager: 'data': data, 'timestamp': time.time() } - return self.save_cache(data_type, cache_data) - - def get_cached_data(self, data_type: str) -> Optional[Dict[str, Any]]: - """Get cached data if it exists and is still valid.""" - cached = self.load_cache(data_type) - if not cached: - return None - - # Check if cache is still valid based on data type - if data_type == 'weather' and time.time() - cached['timestamp'] > 600: # 10 minutes - return None - elif data_type == 'stocks' and time.time() - cached['timestamp'] > 300: # 5 minutes - return None - elif data_type == 'stock_news' and time.time() - cached['timestamp'] > 800: # ~13 minutes - return None - elif data_type == 'nhl' and time.time() - cached['timestamp'] > 60: # 1 minute - return None - - return cached['data'] \ No newline at end of file + return self.save_cache(data_type, cache_data) \ No newline at end of file diff --git a/src/weather_manager.py b/src/weather_manager.py index b3baee37..7bc6e2a7 100644 --- a/src/weather_manager.py +++ b/src/weather_manager.py @@ -1,10 +1,12 @@ import requests import time from datetime import datetime -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from PIL import Image, ImageDraw from .weather_icons import WeatherIcons from .cache_manager import CacheManager +import logging +import threading class WeatherManager: # Weather condition to larger colored icons (we'll use these as placeholders until you provide custom ones) @@ -27,11 +29,17 @@ class WeatherManager: } def __init__(self, config: Dict[str, Any], display_manager): + self.logger = logging.getLogger(__name__) self.config = config self.display_manager = display_manager self.weather_config = config.get('weather', {}) self.location = config.get('location', {}) - self.last_update = 0 + self._last_update = 0 + self._update_interval = config.get('update_interval', 600) # 10 minutes default + self._last_state = None + self._processing_lock = threading.Lock() + self._cached_processed_data = None + self._cache_timestamp = 0 self.weather_data = None self.forecast_data = None self.hourly_forecast = None @@ -60,140 +68,103 @@ class WeatherManager: self.last_hourly_state = None self.last_daily_state = None - def _fetch_weather(self) -> None: - """Fetch current weather and forecast data from OpenWeatherMap API.""" - api_key = self.weather_config.get('api_key') - if not api_key: - print("No API key configured for weather") - return - - # Try to get cached data first - cached_data = self.cache_manager.get_cached_data('weather') - if cached_data: - self.weather_data = cached_data.get('current') - self.forecast_data = cached_data.get('forecast') - if self.weather_data and self.forecast_data: - self._process_forecast_data(self.forecast_data) - self.last_update = time.time() - print("Using cached weather data") - return - - city = self.location['city'] - state = self.location['state'] - country = self.location['country'] - units = self.weather_config.get('units', 'imperial') - - # First get coordinates using geocoding API - geo_url = f"http://api.openweathermap.org/geo/1.0/direct?q={city},{state},{country}&limit=1&appid={api_key}" + def _process_forecast_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Process forecast data with caching.""" + current_time = time.time() - try: - # Get coordinates - response = requests.get(geo_url) - response.raise_for_status() - geo_data = response.json() - - if not geo_data: - print(f"Could not find coordinates for {city}, {state}") - return - - lat = geo_data[0]['lat'] - lon = geo_data[0]['lon'] - - # Get current weather and forecast using coordinates - weather_url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={api_key}&units={units}" - forecast_url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units={units}" - - # Fetch current weather - response = requests.get(weather_url) - response.raise_for_status() - self.weather_data = response.json() + # Check if we have valid cached processed data + if (self._cached_processed_data and + current_time - self._cache_timestamp < 60): # Cache for 1 minute + return self._cached_processed_data - # Fetch forecast - response = requests.get(forecast_url) - response.raise_for_status() - self.forecast_data = response.json() + with self._processing_lock: + # Double check after acquiring lock + if (self._cached_processed_data and + current_time - self._cache_timestamp < 60): + return self._cached_processed_data - # Process forecast data - self._process_forecast_data(self.forecast_data) - - # Cache the new data - cache_data = { - 'current': self.weather_data, - 'forecast': self.forecast_data - } - self.cache_manager.update_cache('weather', cache_data) - - self.last_update = time.time() - print("Weather data updated successfully") - except Exception as e: - print(f"Error fetching weather data: {e}") - # If we have cached data, use it as fallback - if cached_data: - self.weather_data = cached_data.get('current') - self.forecast_data = cached_data.get('forecast') - if self.weather_data and self.forecast_data: - self._process_forecast_data(self.forecast_data) - print("Using cached weather data as fallback") - else: - self.weather_data = None - self.forecast_data = None - - def _process_forecast_data(self, forecast_data: Dict[str, Any]) -> None: - """Process forecast data into hourly and daily forecasts.""" - if not forecast_data: - return - - # Process hourly forecast (next 5 hours) - hourly_list = forecast_data.get('list', [])[:5] # Changed from 6 to 5 to match image - self.hourly_forecast = [] - - for hour_data in hourly_list: - dt = datetime.fromtimestamp(hour_data['dt']) - temp = round(hour_data['main']['temp']) - condition = hour_data['weather'][0]['main'] - self.hourly_forecast.append({ - 'hour': dt.strftime('%I:00 %p').lstrip('0'), # Format as "2:00 PM" - 'temp': temp, - 'condition': condition - }) - - # Process daily forecast - daily_data = {} - full_forecast_list = forecast_data.get('list', []) # Use the full list - for item in full_forecast_list: # Iterate over the full list - date = datetime.fromtimestamp(item['dt']).strftime('%Y-%m-%d') - if date not in daily_data: - daily_data[date] = { - 'temps': [], - 'conditions': [], - 'date': datetime.fromtimestamp(item['dt']) + try: + # Process the data + processed_data = { + 'current': self._process_current_conditions(data.get('current', {})), + 'hourly': self._process_hourly_forecast(data.get('hourly', [])), + 'daily': self._process_daily_forecast(data.get('daily', [])) } - daily_data[date]['temps'].append(item['main']['temp']) - daily_data[date]['conditions'].append(item['weather'][0]['main']) - # Calculate daily summaries, excluding today - self.daily_forecast = [] - today_str = datetime.now().strftime('%Y-%m-%d') - - # Sort data by date to ensure chronological order - sorted_daily_items = sorted(daily_data.items(), key=lambda item: item[1]['date']) - - # Filter out today's data and take the next 3 days - future_days_data = [item for item in sorted_daily_items if item[0] != today_str][:3] + # Cache the processed data + self._cached_processed_data = processed_data + self._cache_timestamp = current_time + return processed_data + except Exception as e: + self.logger.error(f"Error processing forecast data: {e}") + return {} - for date_str, data in future_days_data: - temps = data['temps'] - temp_high = round(max(temps)) - temp_low = round(min(temps)) - condition = max(set(data['conditions']), key=data['conditions'].count) - - self.daily_forecast.append({ - 'date': data['date'].strftime('%a'), # Day name (Mon, Tue, etc.) - 'date_str': data['date'].strftime('%m/%d'), # Date (4/8, 4/9, etc.) - 'temp_high': temp_high, - 'temp_low': temp_low, - 'condition': condition - }) + def _fetch_weather(self) -> Optional[Dict[str, Any]]: + """Fetch weather data with optimized caching.""" + current_time = time.time() + + # Check if we need to update + if current_time - self._last_update < self._update_interval: + cached_data = self.cache_manager.get_cached_data('weather') + if cached_data: + self.logger.info("Using cached weather data") + return self._process_forecast_data(cached_data) + return None + + try: + # Fetch new data + data = self._fetch_from_api() + if data: + self._last_update = current_time + self.cache_manager.save_cache('weather', data) + return self._process_forecast_data(data) + except Exception as e: + self.logger.error(f"Error fetching weather data: {e}") + # Try to use cached data as fallback + cached_data = self.cache_manager.get_cached_data('weather') + if cached_data: + self.logger.info("Using cached weather data as fallback") + return self._process_forecast_data(cached_data) + return None + + def _process_current_conditions(self, current: Dict[str, Any]) -> Dict[str, Any]: + """Process current conditions with minimal processing.""" + if not current: + return {} + + return { + 'temp': current.get('temp', 0), + 'feels_like': current.get('feels_like', 0), + 'humidity': current.get('humidity', 0), + 'wind_speed': current.get('wind_speed', 0), + 'description': current.get('weather', [{}])[0].get('description', ''), + 'icon': current.get('weather', [{}])[0].get('icon', '') + } + + def _process_hourly_forecast(self, hourly: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Process hourly forecast with minimal processing.""" + if not hourly: + return [] + + return [{ + 'time': hour.get('dt', 0), + 'temp': hour.get('temp', 0), + 'description': hour.get('weather', [{}])[0].get('description', ''), + 'icon': hour.get('weather', [{}])[0].get('icon', '') + } for hour in hourly[:24]] # Only process next 24 hours + + def _process_daily_forecast(self, daily: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Process daily forecast with minimal processing.""" + if not daily: + return [] + + return [{ + 'time': day.get('dt', 0), + 'temp_min': day.get('temp', {}).get('min', 0), + 'temp_max': day.get('temp', {}).get('max', 0), + 'description': day.get('weather', [{}])[0].get('description', ''), + 'icon': day.get('weather', [{}])[0].get('icon', '') + } for day in daily[:7]] # Only process next 7 days def get_weather(self) -> Dict[str, Any]: """Get current weather data, fetching new data if needed.""" @@ -202,7 +173,7 @@ class WeatherManager: # Check if we need to update based on time or if we have no data if (not self.weather_data or - current_time - self.last_update > update_interval): + current_time - self._last_update > update_interval): # Check if data has changed before fetching current_state = self._get_weather_state()