Files
LEDMatrix/src/weather_manager.py

439 lines
18 KiB
Python

import requests
import time
from datetime import datetime
from typing import Dict, Any, List
from PIL import Image, ImageDraw
from .weather_icons import WeatherIcons
class WeatherManager:
# Weather condition to larger colored icons (we'll use these as placeholders until you provide custom ones)
WEATHER_ICONS = {
'Clear': '🌞', # Larger sun with rays
'Clouds': '☁️', # Cloud
'Rain': '🌧️', # Rain cloud
'Snow': '❄️', # Snowflake
'Thunderstorm': '⛈️', # Storm cloud
'Drizzle': '🌦️', # Sun behind rain cloud
'Mist': '🌫️', # Fog
'Fog': '🌫️', # Fog
'Haze': '🌫️', # Fog
'Smoke': '🌫️', # Fog
'Dust': '🌫️', # Fog
'Sand': '🌫️', # Fog
'Ash': '🌫️', # Fog
'Squall': '💨', # Dash symbol
'Tornado': '🌪️' # Tornado
}
def __init__(self, config: Dict[str, Any], display_manager):
self.config = config
self.display_manager = display_manager
self.weather_config = config.get('weather', {})
self.location = config.get('location', {})
self.last_update = 0
self.weather_data = None
self.forecast_data = None
self.hourly_forecast = None
self.daily_forecast = None
self.last_draw_time = 0
# Get matrix dimensions from config
display_config = config.get('display', {}).get('hardware', {})
self.matrix_width = display_config.get('cols', 64)
self.matrix_height = display_config.get('rows', 32)
# Calculate icon sizes based on matrix dimensions
self.ICON_SIZE = {
'large': min(12, self.matrix_height // 3), # Increased from 10 to 12
'medium': min(10, self.matrix_height // 4), # Adjusted ratio
'small': min(8, self.matrix_height // 5) # Increased from 6 to 8
}
# Layout constants - scale with matrix dimensions
self.PADDING = max(2, self.matrix_width // 32) # Increased minimum padding
self.VERTICAL_SPACING = max(2, self.matrix_height // 16) # Added vertical spacing
self.COLORS = {
'text': (255, 255, 255),
'highlight': (255, 200, 0),
'separator': (64, 64, 64),
'temp_high': (255, 100, 100),
'temp_low': (100, 100, 255),
'dim': (180, 180, 180),
'extra_dim': (120, 120, 120)
}
# Add caching for last drawn states
self.last_weather_state = None
self.last_hourly_state = None
self.last_daily_state = None
def _fetch_weather(self) -> None:
"""Fetch current weather and forecast data from OpenWeatherMap API."""
api_key = self.weather_config.get('api_key')
if not api_key:
print("No API key configured for weather")
return
city = self.location['city']
state = self.location['state']
country = self.location['country']
units = self.weather_config.get('units', 'imperial')
# First get coordinates using geocoding API
geo_url = f"http://api.openweathermap.org/geo/1.0/direct?q={city},{state},{country}&limit=1&appid={api_key}"
try:
# Get coordinates
response = requests.get(geo_url)
response.raise_for_status()
geo_data = response.json()
if not geo_data:
print(f"Could not find coordinates for {city}, {state}")
return
lat = geo_data[0]['lat']
lon = geo_data[0]['lon']
# Get current weather and forecast using coordinates
weather_url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={api_key}&units={units}"
forecast_url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units={units}"
# Fetch current weather
response = requests.get(weather_url)
response.raise_for_status()
self.weather_data = response.json()
# Fetch forecast
response = requests.get(forecast_url)
response.raise_for_status()
self.forecast_data = response.json()
# Process forecast data
self._process_forecast_data(self.forecast_data)
self.last_update = time.time()
print("Weather data updated successfully")
except Exception as e:
print(f"Error fetching weather data: {e}")
self.weather_data = None
self.forecast_data = None
def _process_forecast_data(self, forecast_data: Dict[str, Any]) -> None:
"""Process forecast data into hourly and daily forecasts."""
if not forecast_data:
return
# Process hourly forecast (next 5 hours)
hourly_list = forecast_data.get('list', [])[:5] # Changed from 6 to 5 to match image
self.hourly_forecast = []
for hour_data in hourly_list:
dt = datetime.fromtimestamp(hour_data['dt'])
temp = round(hour_data['main']['temp'])
condition = hour_data['weather'][0]['main']
self.hourly_forecast.append({
'hour': dt.strftime('%I:00 %p').lstrip('0'), # Format as "2:00 PM"
'temp': temp,
'condition': condition
})
# Process daily forecast
daily_data = {}
for item in hourly_list:
date = datetime.fromtimestamp(item['dt']).strftime('%Y-%m-%d')
if date not in daily_data:
daily_data[date] = {
'temps': [],
'conditions': [],
'date': datetime.fromtimestamp(item['dt'])
}
daily_data[date]['temps'].append(item['main']['temp'])
daily_data[date]['conditions'].append(item['weather'][0]['main'])
# Calculate daily summaries
self.daily_forecast = []
for date, data in list(daily_data.items())[:4]: # Changed to 4 days for better spacing
temps = data['temps']
temp_high = round(max(temps))
temp_low = round(min(temps))
condition = max(set(data['conditions']), key=data['conditions'].count)
self.daily_forecast.append({
'date': data['date'].strftime('%a'), # Day name (Mon, Tue, etc.)
'date_str': data['date'].strftime('%m/%d'), # Date (4/8, 4/9, etc.)
'temp_high': temp_high,
'temp_low': temp_low,
'condition': condition
})
def get_weather(self) -> Dict[str, Any]:
"""Get current weather data, fetching new data if needed."""
current_time = time.time()
if (not self.weather_data or
current_time - self.last_update > self.weather_config.get('update_interval', 300)):
self._fetch_weather()
return self.weather_data
def _get_weather_state(self) -> Dict[str, Any]:
"""Get current weather state for comparison."""
if not self.weather_data:
return None
return {
'temp': round(self.weather_data['main']['temp']),
'condition': self.weather_data['weather'][0]['main'],
'humidity': self.weather_data['main']['humidity']
}
def _get_hourly_state(self) -> List[Dict[str, Any]]:
"""Get current hourly forecast state for comparison."""
if not self.hourly_forecast:
return None
return [
{'hour': f['hour'], 'temp': round(f['temp']), 'condition': f['condition']}
for f in self.hourly_forecast[:3]
]
def _get_daily_state(self) -> List[Dict[str, Any]]:
"""Get current daily forecast state for comparison."""
if not self.daily_forecast:
return None
return [
{
'date': f['date'],
'temp_high': round(f['temp_high']),
'temp_low': round(f['temp_low']),
'condition': f['condition']
}
for f in self.daily_forecast[:4] # Changed to 4 days
]
def display_weather(self, force_clear: bool = False) -> None:
"""Display current weather information using a modern layout."""
try:
weather_data = self.get_weather()
if not weather_data:
print("No weather data available")
return
# Check if state has changed
current_state = self._get_weather_state()
if not force_clear and current_state == self.last_weather_state:
return # No need to redraw if nothing changed
# Clear the display once at the start
self.display_manager.clear()
# Create a new image for drawing
image = Image.new('RGB', (self.matrix_width, self.matrix_height))
draw = ImageDraw.Draw(image)
# Calculate scaled positions
top_margin = self.VERTICAL_SPACING
left_margin = self.PADDING
# Draw weather condition icon and text at the top
condition = weather_data['weather'][0]['main']
icon_x = left_margin
icon_y = top_margin
WeatherIcons.draw_weather_icon(draw, condition, icon_x, icon_y, size=self.ICON_SIZE['large'])
# Draw condition text next to icon
condition_text = condition
text_x = icon_x + self.ICON_SIZE['large'] + self.PADDING
draw.text((text_x, icon_y),
condition_text,
font=self.display_manager.small_font,
fill=self.COLORS['text'])
# Draw "time ago" text below condition
time_since_update = int((time.time() - self.last_update) / 3600)
time_text = f"{time_since_update}h"
draw.text((text_x, icon_y + self.VERTICAL_SPACING * 2),
time_text,
font=self.display_manager.small_font,
fill=self.COLORS['dim'])
# Draw current temperature on the right
temp = round(weather_data['main']['temp'])
temp_text = f"{temp}°"
temp_width = draw.textlength(temp_text, font=self.display_manager.small_font)
temp_x = self.matrix_width - temp_width - self.PADDING
draw.text((temp_x, top_margin),
temp_text,
font=self.display_manager.small_font,
fill=self.COLORS['highlight'])
# Draw high/low temperatures below current temp
temp_max = round(weather_data['main']['temp_max'])
temp_min = round(weather_data['main']['temp_min'])
high_low_text = f"{temp_min}°/{temp_max}°"
high_low_width = draw.textlength(high_low_text, font=self.display_manager.small_font)
draw.text((self.matrix_width - high_low_width - self.PADDING, top_margin + self.VERTICAL_SPACING * 2),
high_low_text,
font=self.display_manager.small_font,
fill=self.COLORS['dim'])
# Draw additional weather metrics in bottom half
metrics_y = self.matrix_height - (self.VERTICAL_SPACING * 4)
metrics_x = left_margin
# Air pressure
pressure = weather_data['main']['pressure'] * 0.02953
pressure_text = f"P:{pressure:.1f}in"
draw.text((metrics_x, metrics_y),
pressure_text,
font=self.display_manager.small_font,
fill=self.COLORS['dim'])
# Humidity
humidity = weather_data['main']['humidity']
humidity_text = f"H:{humidity}%"
draw.text((metrics_x, metrics_y + self.VERTICAL_SPACING * 2),
humidity_text,
font=self.display_manager.small_font,
fill=self.COLORS['dim'])
# Wind speed and direction
wind_speed = weather_data['wind']['speed']
wind_deg = weather_data.get('wind', {}).get('deg', 0)
wind_dir = self._get_wind_direction(wind_deg)
wind_text = f"W:{wind_speed:.0f}{wind_dir}"
draw.text((metrics_x, metrics_y + self.VERTICAL_SPACING * 4),
wind_text,
font=self.display_manager.small_font,
fill=self.COLORS['dim'])
# Update the display
self.display_manager.image = image
self.display_manager.update_display()
self.last_weather_state = current_state
except Exception as e:
print(f"Error displaying weather: {e}")
def _get_wind_direction(self, degrees: float) -> str:
"""Convert wind degrees to cardinal direction."""
directions = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
index = round(degrees / 45) % 8
return directions[index]
def display_hourly_forecast(self, force_clear: bool = False):
"""Display the next few hours of weather forecast."""
try:
if not self.hourly_forecast:
print("No hourly forecast data available")
return
# Check if state has changed
current_state = self._get_hourly_state()
if not force_clear and current_state == self.last_hourly_state:
return
# Clear the display
self.display_manager.clear()
# Create a new image for drawing
image = Image.new('RGB', (self.matrix_width, self.matrix_height))
draw = ImageDraw.Draw(image)
# Calculate layout based on matrix dimensions
hours_to_show = min(4, len(self.hourly_forecast))
total_width = self.matrix_width
section_width = total_width // hours_to_show
padding = max(self.PADDING, section_width // 6)
for i in range(hours_to_show):
forecast = self.hourly_forecast[i]
x = i * section_width + padding
center_x = x + (section_width - 2 * padding) // 2
# Draw hour at top
hour_text = forecast['hour']
hour_text = hour_text.replace(":00 ", "").replace("PM", "p").replace("AM", "a")
hour_width = draw.textlength(hour_text, font=self.display_manager.small_font)
draw.text((center_x - hour_width // 2, self.VERTICAL_SPACING),
hour_text,
font=self.display_manager.small_font,
fill=self.COLORS['extra_dim'])
# Draw weather icon
icon_size = self.ICON_SIZE['medium']
icon_y = self.matrix_height // 3
icon_x = center_x - icon_size // 2
WeatherIcons.draw_weather_icon(draw, forecast['condition'], icon_x, icon_y, icon_size)
# Draw temperature
temp_text = f"{forecast['temp']}°"
temp_width = draw.textlength(temp_text, font=self.display_manager.small_font)
draw.text((center_x - temp_width // 2, icon_y + icon_size + self.VERTICAL_SPACING),
temp_text,
font=self.display_manager.small_font,
fill=self.COLORS['text'])
# Update the display
self.display_manager.image = image
self.display_manager.update_display()
self.last_hourly_state = current_state
except Exception as e:
print(f"Error displaying hourly forecast: {e}")
def display_daily_forecast(self, force_clear: bool = False):
"""Display the daily weather forecast."""
try:
if not self.daily_forecast:
print("No daily forecast data available")
return
# Check if state has changed
current_state = self._get_daily_state()
if not force_clear and current_state == self.last_daily_state:
return
# Clear the display
self.display_manager.clear()
# Create a new image for drawing
image = Image.new('RGB', (self.matrix_width, self.matrix_height))
draw = ImageDraw.Draw(image)
# Calculate layout based on matrix dimensions
days_to_show = min(4, len(self.daily_forecast))
total_width = self.matrix_width
section_width = total_width // days_to_show
padding = max(self.PADDING, section_width // 6)
for i in range(days_to_show):
forecast = self.daily_forecast[i]
x = i * section_width + padding
center_x = x + (section_width - 2 * padding) // 2
# Draw day name
day_text = forecast['date']
day_width = draw.textlength(day_text, font=self.display_manager.small_font)
draw.text((center_x - day_width // 2, self.VERTICAL_SPACING),
day_text,
font=self.display_manager.small_font,
fill=self.COLORS['extra_dim'])
# Draw weather icon
icon_size = self.ICON_SIZE['medium']
icon_y = self.matrix_height // 3
icon_x = center_x - icon_size // 2
WeatherIcons.draw_weather_icon(draw, forecast['condition'], icon_x, icon_y, icon_size)
# Draw high/low temperatures
temp_text = f"{forecast['temp_low']}°/{forecast['temp_high']}°"
temp_width = draw.textlength(temp_text, font=self.display_manager.small_font)
draw.text((center_x - temp_width // 2, icon_y + icon_size + self.VERTICAL_SPACING),
temp_text,
font=self.display_manager.small_font,
fill=self.COLORS['text'])
# Update the display
self.display_manager.image = image
self.display_manager.update_display()
self.last_daily_state = current_state
except Exception as e:
print(f"Error displaying daily forecast: {e}")