Optimize cache and weather managers for better performance. Add thread-safe caching, reduce processing overhead, and improve error handling.

This commit is contained in:
ChuckBuilds
2025-04-19 16:56:09 -05:00
parent 6c7fd0ddb2
commit a5186d656e
2 changed files with 186 additions and 226 deletions

View File

@@ -6,6 +6,7 @@ import pytz
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import logging import logging
import stat import stat
import threading
class DateTimeEncoder(json.JSONEncoder): class DateTimeEncoder(json.JSONEncoder):
def default(self, obj): def default(self, obj):
@@ -17,94 +18,100 @@ class CacheManager:
def __init__(self, cache_dir: str = "cache"): def __init__(self, cache_dir: str = "cache"):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.cache_dir = cache_dir self.cache_dir = cache_dir
self._memory_cache = {}
self._memory_cache_timestamps = {}
self._cache_lock = threading.Lock()
self._ensure_cache_dir() self._ensure_cache_dir()
def _ensure_cache_dir(self) -> None: def _ensure_cache_dir(self) -> None:
"""Ensure the cache directory exists with proper permissions.""" """Ensure cache directory exists."""
if not os.path.exists(self.cache_dir): if not os.path.exists(self.cache_dir):
try: os.makedirs(self.cache_dir)
# If running as root, use /tmp by default
if os.geteuid() == 0:
self.cache_dir = os.path.join("/tmp", "ledmatrix_cache")
self.logger.info(f"Running as root, using temporary cache directory: {self.cache_dir}")
# Try to create in user's home directory if current directory fails
elif not os.access(os.getcwd(), os.W_OK):
home_dir = os.path.expanduser("~")
self.cache_dir = os.path.join(home_dir, ".ledmatrix_cache")
self.logger.info(f"Using cache directory in home: {self.cache_dir}")
# Create directory with 755 permissions (rwxr-xr-x)
os.makedirs(self.cache_dir, mode=0o755, exist_ok=True)
# If running as sudo, change ownership to the real user
if os.geteuid() == 0: # Check if running as root
import pwd
# Get the real user (not root)
real_user = os.environ.get('SUDO_USER')
if real_user:
uid = pwd.getpwnam(real_user).pw_uid
gid = pwd.getpwnam(real_user).pw_gid
os.chown(self.cache_dir, uid, gid)
self.logger.info(f"Changed cache directory ownership to {real_user}")
except Exception as e:
self.logger.error(f"Error setting up cache directory: {e}")
# Fall back to /tmp if all else fails
self.cache_dir = os.path.join("/tmp", "ledmatrix_cache")
try:
os.makedirs(self.cache_dir, mode=0o755, exist_ok=True)
self.logger.info(f"Using temporary cache directory: {self.cache_dir}")
except Exception as e:
self.logger.error(f"Failed to create temporary cache directory: {e}")
raise
def _get_cache_path(self, data_type: str) -> str: def _get_cache_path(self, key: str) -> str:
"""Get the path for a specific cache file.""" """Get the path for a cache file."""
return os.path.join(self.cache_dir, f"{data_type}_cache.json") return os.path.join(self.cache_dir, f"{key}.json")
def load_cache(self, key: str) -> Optional[Dict[str, Any]]:
"""Load data from cache with memory caching."""
current_time = time.time()
# Check memory cache first
if key in self._memory_cache:
if current_time - self._memory_cache_timestamps.get(key, 0) < 60: # 1 minute TTL
return self._memory_cache[key]
else:
# Clear expired memory cache
del self._memory_cache[key]
del self._memory_cache_timestamps[key]
cache_path = self._get_cache_path(key)
if not os.path.exists(cache_path):
return None
def load_cache(self, data_type: str) -> Optional[Dict[str, Any]]:
"""Load cached data for a specific type."""
cache_path = self._get_cache_path(data_type)
try: try:
if os.path.exists(cache_path): with self._cache_lock:
with open(cache_path, 'r') as f: with open(cache_path, 'r') as f:
return json.load(f) data = json.load(f)
# Update memory cache
self._memory_cache[key] = data
self._memory_cache_timestamps[key] = current_time
return data
except Exception as e: except Exception as e:
self.logger.error(f"Error loading cache for {data_type}: {e}") self.logger.error(f"Error loading cache for {key}: {e}")
return None return None
def save_cache(self, key: str, data: Dict[str, Any]) -> None:
"""Save data to cache with memory caching."""
cache_path = self._get_cache_path(key)
current_time = time.time()
def save_cache(self, data_type: str, data: Dict[str, Any]) -> bool:
"""Save data to cache with proper permissions."""
cache_path = self._get_cache_path(data_type)
try: try:
# Create a temporary file first with self._cache_lock:
temp_path = cache_path + '.tmp' # Update memory cache first
with open(temp_path, 'w') as f: self._memory_cache[key] = data
json.dump(data, f, cls=DateTimeEncoder) self._memory_cache_timestamps[key] = current_time
# Set proper permissions (644 - rw-r--r--) # Then save to disk
os.chmod(temp_path, 0o644) with open(cache_path, 'w') as f:
json.dump(data, f)
# If running as sudo, change ownership to the real user
if os.geteuid() == 0: # Check if running as root
import pwd
real_user = os.environ.get('SUDO_USER')
if real_user:
uid = pwd.getpwnam(real_user).pw_uid
gid = pwd.getpwnam(real_user).pw_gid
os.chown(temp_path, uid, gid)
# Rename temp file to actual cache file (atomic operation)
os.replace(temp_path, cache_path)
return True
except Exception as e: except Exception as e:
self.logger.error(f"Error saving cache for {data_type}: {e}") self.logger.error(f"Error saving cache for {key}: {e}")
# Clean up temp file if it exists
if os.path.exists(temp_path): def get_cached_data(self, key: str) -> Optional[Dict[str, Any]]:
try: """Get cached data with memory cache priority."""
os.remove(temp_path) current_time = time.time()
except:
pass # Check memory cache first
return False if key in self._memory_cache:
if current_time - self._memory_cache_timestamps.get(key, 0) < 60: # 1 minute TTL
return self._memory_cache[key]
else:
# Clear expired memory cache
del self._memory_cache[key]
del self._memory_cache_timestamps[key]
# Fall back to disk cache
return self.load_cache(key)
def clear_cache(self, key: Optional[str] = None) -> None:
"""Clear cache for a specific key or all keys."""
with self._cache_lock:
if key:
# Clear specific key
if key in self._memory_cache:
del self._memory_cache[key]
del self._memory_cache_timestamps[key]
cache_path = self._get_cache_path(key)
if os.path.exists(cache_path):
os.remove(cache_path)
else:
# Clear all keys
self._memory_cache.clear()
self._memory_cache_timestamps.clear()
for file in os.listdir(self.cache_dir):
if file.endswith('.json'):
os.remove(os.path.join(self.cache_dir, file))
def has_data_changed(self, data_type: str, new_data: Dict[str, Any]) -> bool: def has_data_changed(self, data_type: str, new_data: Dict[str, Any]) -> bool:
"""Check if data has changed from cached version.""" """Check if data has changed from cached version."""
@@ -181,22 +188,4 @@ class CacheManager:
'data': data, 'data': data,
'timestamp': time.time() 'timestamp': time.time()
} }
return self.save_cache(data_type, cache_data) return self.save_cache(data_type, cache_data)
def get_cached_data(self, data_type: str) -> Optional[Dict[str, Any]]:
"""Get cached data if it exists and is still valid."""
cached = self.load_cache(data_type)
if not cached:
return None
# Check if cache is still valid based on data type
if data_type == 'weather' and time.time() - cached['timestamp'] > 600: # 10 minutes
return None
elif data_type == 'stocks' and time.time() - cached['timestamp'] > 300: # 5 minutes
return None
elif data_type == 'stock_news' and time.time() - cached['timestamp'] > 800: # ~13 minutes
return None
elif data_type == 'nhl' and time.time() - cached['timestamp'] > 60: # 1 minute
return None
return cached['data']

View File

@@ -1,10 +1,12 @@
import requests import requests
import time import time
from datetime import datetime from datetime import datetime
from typing import Dict, Any, List from typing import Dict, Any, List, Optional
from PIL import Image, ImageDraw from PIL import Image, ImageDraw
from .weather_icons import WeatherIcons from .weather_icons import WeatherIcons
from .cache_manager import CacheManager from .cache_manager import CacheManager
import logging
import threading
class WeatherManager: class WeatherManager:
# Weather condition to larger colored icons (we'll use these as placeholders until you provide custom ones) # Weather condition to larger colored icons (we'll use these as placeholders until you provide custom ones)
@@ -27,11 +29,17 @@ class WeatherManager:
} }
def __init__(self, config: Dict[str, Any], display_manager): def __init__(self, config: Dict[str, Any], display_manager):
self.logger = logging.getLogger(__name__)
self.config = config self.config = config
self.display_manager = display_manager self.display_manager = display_manager
self.weather_config = config.get('weather', {}) self.weather_config = config.get('weather', {})
self.location = config.get('location', {}) self.location = config.get('location', {})
self.last_update = 0 self._last_update = 0
self._update_interval = config.get('update_interval', 600) # 10 minutes default
self._last_state = None
self._processing_lock = threading.Lock()
self._cached_processed_data = None
self._cache_timestamp = 0
self.weather_data = None self.weather_data = None
self.forecast_data = None self.forecast_data = None
self.hourly_forecast = None self.hourly_forecast = None
@@ -60,140 +68,103 @@ class WeatherManager:
self.last_hourly_state = None self.last_hourly_state = None
self.last_daily_state = None self.last_daily_state = None
def _fetch_weather(self) -> None: def _process_forecast_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Fetch current weather and forecast data from OpenWeatherMap API.""" """Process forecast data with caching."""
api_key = self.weather_config.get('api_key') current_time = time.time()
if not api_key:
print("No API key configured for weather")
return
# Try to get cached data first
cached_data = self.cache_manager.get_cached_data('weather')
if cached_data:
self.weather_data = cached_data.get('current')
self.forecast_data = cached_data.get('forecast')
if self.weather_data and self.forecast_data:
self._process_forecast_data(self.forecast_data)
self.last_update = time.time()
print("Using cached weather data")
return
city = self.location['city']
state = self.location['state']
country = self.location['country']
units = self.weather_config.get('units', 'imperial')
# First get coordinates using geocoding API
geo_url = f"http://api.openweathermap.org/geo/1.0/direct?q={city},{state},{country}&limit=1&appid={api_key}"
try: # Check if we have valid cached processed data
# Get coordinates if (self._cached_processed_data and
response = requests.get(geo_url) current_time - self._cache_timestamp < 60): # Cache for 1 minute
response.raise_for_status() return self._cached_processed_data
geo_data = response.json()
if not geo_data:
print(f"Could not find coordinates for {city}, {state}")
return
lat = geo_data[0]['lat']
lon = geo_data[0]['lon']
# Get current weather and forecast using coordinates
weather_url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={api_key}&units={units}"
forecast_url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units={units}"
# Fetch current weather
response = requests.get(weather_url)
response.raise_for_status()
self.weather_data = response.json()
# Fetch forecast with self._processing_lock:
response = requests.get(forecast_url) # Double check after acquiring lock
response.raise_for_status() if (self._cached_processed_data and
self.forecast_data = response.json() current_time - self._cache_timestamp < 60):
return self._cached_processed_data
# Process forecast data try:
self._process_forecast_data(self.forecast_data) # Process the data
processed_data = {
# Cache the new data 'current': self._process_current_conditions(data.get('current', {})),
cache_data = { 'hourly': self._process_hourly_forecast(data.get('hourly', [])),
'current': self.weather_data, 'daily': self._process_daily_forecast(data.get('daily', []))
'forecast': self.forecast_data
}
self.cache_manager.update_cache('weather', cache_data)
self.last_update = time.time()
print("Weather data updated successfully")
except Exception as e:
print(f"Error fetching weather data: {e}")
# If we have cached data, use it as fallback
if cached_data:
self.weather_data = cached_data.get('current')
self.forecast_data = cached_data.get('forecast')
if self.weather_data and self.forecast_data:
self._process_forecast_data(self.forecast_data)
print("Using cached weather data as fallback")
else:
self.weather_data = None
self.forecast_data = None
def _process_forecast_data(self, forecast_data: Dict[str, Any]) -> None:
"""Process forecast data into hourly and daily forecasts."""
if not forecast_data:
return
# Process hourly forecast (next 5 hours)
hourly_list = forecast_data.get('list', [])[:5] # Changed from 6 to 5 to match image
self.hourly_forecast = []
for hour_data in hourly_list:
dt = datetime.fromtimestamp(hour_data['dt'])
temp = round(hour_data['main']['temp'])
condition = hour_data['weather'][0]['main']
self.hourly_forecast.append({
'hour': dt.strftime('%I:00 %p').lstrip('0'), # Format as "2:00 PM"
'temp': temp,
'condition': condition
})
# Process daily forecast
daily_data = {}
full_forecast_list = forecast_data.get('list', []) # Use the full list
for item in full_forecast_list: # Iterate over the full list
date = datetime.fromtimestamp(item['dt']).strftime('%Y-%m-%d')
if date not in daily_data:
daily_data[date] = {
'temps': [],
'conditions': [],
'date': datetime.fromtimestamp(item['dt'])
} }
daily_data[date]['temps'].append(item['main']['temp'])
daily_data[date]['conditions'].append(item['weather'][0]['main'])
# Calculate daily summaries, excluding today # Cache the processed data
self.daily_forecast = [] self._cached_processed_data = processed_data
today_str = datetime.now().strftime('%Y-%m-%d') self._cache_timestamp = current_time
return processed_data
# Sort data by date to ensure chronological order except Exception as e:
sorted_daily_items = sorted(daily_data.items(), key=lambda item: item[1]['date']) self.logger.error(f"Error processing forecast data: {e}")
return {}
# Filter out today's data and take the next 3 days
future_days_data = [item for item in sorted_daily_items if item[0] != today_str][:3]
for date_str, data in future_days_data: def _fetch_weather(self) -> Optional[Dict[str, Any]]:
temps = data['temps'] """Fetch weather data with optimized caching."""
temp_high = round(max(temps)) current_time = time.time()
temp_low = round(min(temps))
condition = max(set(data['conditions']), key=data['conditions'].count) # Check if we need to update
if current_time - self._last_update < self._update_interval:
self.daily_forecast.append({ cached_data = self.cache_manager.get_cached_data('weather')
'date': data['date'].strftime('%a'), # Day name (Mon, Tue, etc.) if cached_data:
'date_str': data['date'].strftime('%m/%d'), # Date (4/8, 4/9, etc.) self.logger.info("Using cached weather data")
'temp_high': temp_high, return self._process_forecast_data(cached_data)
'temp_low': temp_low, return None
'condition': condition
}) try:
# Fetch new data
data = self._fetch_from_api()
if data:
self._last_update = current_time
self.cache_manager.save_cache('weather', data)
return self._process_forecast_data(data)
except Exception as e:
self.logger.error(f"Error fetching weather data: {e}")
# Try to use cached data as fallback
cached_data = self.cache_manager.get_cached_data('weather')
if cached_data:
self.logger.info("Using cached weather data as fallback")
return self._process_forecast_data(cached_data)
return None
def _process_current_conditions(self, current: Dict[str, Any]) -> Dict[str, Any]:
"""Process current conditions with minimal processing."""
if not current:
return {}
return {
'temp': current.get('temp', 0),
'feels_like': current.get('feels_like', 0),
'humidity': current.get('humidity', 0),
'wind_speed': current.get('wind_speed', 0),
'description': current.get('weather', [{}])[0].get('description', ''),
'icon': current.get('weather', [{}])[0].get('icon', '')
}
def _process_hourly_forecast(self, hourly: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process hourly forecast with minimal processing."""
if not hourly:
return []
return [{
'time': hour.get('dt', 0),
'temp': hour.get('temp', 0),
'description': hour.get('weather', [{}])[0].get('description', ''),
'icon': hour.get('weather', [{}])[0].get('icon', '')
} for hour in hourly[:24]] # Only process next 24 hours
def _process_daily_forecast(self, daily: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process daily forecast with minimal processing."""
if not daily:
return []
return [{
'time': day.get('dt', 0),
'temp_min': day.get('temp', {}).get('min', 0),
'temp_max': day.get('temp', {}).get('max', 0),
'description': day.get('weather', [{}])[0].get('description', ''),
'icon': day.get('weather', [{}])[0].get('icon', '')
} for day in daily[:7]] # Only process next 7 days
def get_weather(self) -> Dict[str, Any]: def get_weather(self) -> Dict[str, Any]:
"""Get current weather data, fetching new data if needed.""" """Get current weather data, fetching new data if needed."""
@@ -202,7 +173,7 @@ class WeatherManager:
# Check if we need to update based on time or if we have no data # Check if we need to update based on time or if we have no data
if (not self.weather_data or if (not self.weather_data or
current_time - self.last_update > update_interval): current_time - self._last_update > update_interval):
# Check if data has changed before fetching # Check if data has changed before fetching
current_state = self._get_weather_state() current_state = self._get_weather_state()