refactor music display out of display controller

This commit is contained in:
ChuckBuilds
2025-05-23 13:35:43 -05:00
parent 1be8b9fd5a
commit a886ac3b2b
5 changed files with 528 additions and 339 deletions

View File

@@ -417,18 +417,42 @@ The Music Display module shows information about the currently playing track fro
* `"spotify"`: Only uses Spotify. Ignores YTM.
* `"ytm"`: Only uses the YTM Companion Server. Ignores Spotify.
**First Spotify Run (Headless Setup):**
**First Spotify Run / Spotify Token Refresh (Headless Setup):**
Since the display runs on a headless Raspberry Pi, the Spotify authorization process requires a few manual steps:
Spotify authentication is now handled by a separate script. This process is needed for the initial setup or if the Spotify token expires or is revoked.
1. **Start the Application:** Run the display controller script (`sudo python3 display_controller.py`).
2. **Copy Auth URL:** When Spotify needs authorization for the first time (or after a token expires), the application will **print a URL** to the console. Copy this full URL.
3. **Authorize in Browser (on another device):** Paste the copied URL into a web browser on your computer or phone. Log in to Spotify if prompted and click "Agree" to authorize the application.
4. **Get Redirected URL:** Your browser will be redirected to a URL starting with your `SPOTIFY_REDIRECT_URI` (e.g., `http://localhost:8888/callback`) followed by `?code=...`. The page will likely show an error like "Site can't be reached" - **this is expected and perfectly fine.**
5. **Copy Full Redirected URL:** **Immediately copy the complete URL** from your browser's address bar. Make sure you copy the *entire* thing, including the `?code=...` part.
6. **Paste URL Back to Pi:** Go back to the Raspberry Pi console where the display script is running. It should now be prompting you to "Enter the URL you were redirected to:". **Paste the full URL you just copied** from your browser into the console and press Enter.
**Step 1: Run the Authentication Script (as the `ledpi` user)**
The application will then use the provided code to get the necessary tokens and cache them (usually in a `.cache` file). Subsequent runs should not require this process unless the token expires.
1. Log in or `su` to the user account that will run the main display application (e.g., `ledpi`). **Do not use `sudo` for this step.**
```bash
su ledpi
# or if connecting via SSH directly as ledpi, just proceed
```
2. Navigate to the project directory:
```bash
cd /path/to/your/LEDMatrix # Replace with the actual path to your project
```
3. Run the `authenticate_spotify.py` script:
```bash
python3 src/authenticate_spotify.py
```
4. **Copy Auth URL:** The script will print an authorization URL to the console. Copy this full URL.
5. **Authorize in Browser (on another device):** Paste the copied URL into a web browser on your computer or phone. Log in to Spotify if prompted and click "Agree" to authorize the application.
6. **Get Redirected URL:** Your browser will be redirected to a URL starting with your `SPOTIFY_REDIRECT_URI` (e.g., `http://localhost:8888/callback`) followed by `?code=...`. The page will likely show an error like "Site can't be reached" - **this is expected and perfectly fine.**
7. **Copy Full Redirected URL:** **Immediately copy the complete URL** from your browser's address bar. Make sure you copy the *entire* thing, including the `?code=...` part.
8. **Paste URL Back to Pi:** Go back to the Raspberry Pi console where the `authenticate_spotify.py` script is running. It will prompt you to "Paste the full redirected URL here and press Enter:". Paste the full URL you just copied and press Enter.
The script will then fetch the necessary tokens and save them to `config/spotify_auth.json`. If successful, you'll see a confirmation message. If you were previously logged in as a different user (e.g. `root` or `pi`), you can now `exit` back to that session.
**Step 2: Run the Main Application**
Once the `spotify_auth.json` file has been created by the authentication script, you can run the main display controller as usual (e.g., with `sudo` if required for hardware access):
```bash
sudo python3 display_controller.py
```
The application will automatically load and use the cached token from `config/spotify_auth.json`. It will also attempt to refresh the token automatically when needed. If the token cannot be refreshed (e.g., if it's been too long or permissions were revoked via Spotify's website), you will need to repeat **Step 1**.
### Music Display (YouTube Music)
@@ -438,8 +462,8 @@ The system can display currently playing music information from YouTube Music De
1. **Enable Companion Server in YTMD:**
* In the YouTube Music Desktop application, go to `Settings` -> `Integrations`.
* Enable the "Companion Server" (it might also be labeled as "JSON RPC" or similar).
* Note the IP address and Port it's listening on (default is usually `http://localhost:9863`).
* Enable the "Companion Server".
* Note the IP address and Port it's listening on (default is usually `http://localhost:9863`), you'll need to know the local ip address if playing music on a device other than your rpi (probably are).
2. **Configure `config/config.json`:**
* Update the `music` section in your `config/config.json`:
@@ -448,26 +472,16 @@ The system can display currently playing music information from YouTube Music De
"enabled": true,
"preferred_source": "ytm",
"YTM_COMPANION_URL": "http://YOUR_YTMD_IP_ADDRESS:PORT", // e.g., "http://localhost:9863" or "http://192.168.1.100:9863"
"POLLING_INTERVAL_SECONDS": 2
"POLLING_INTERVAL_SECONDS": 1
}
```
3. **Initial Authentication & Token Storage:**
* The first time you run `display_controller.py` after enabling YTM, it will attempt to register itself with the YTMD Companion Server.
* The first time you run ` python3 src/authenticate_ytm.py` after enabling YTM, it will attempt to register itself with the YTMD Companion Server.
* You will see log messages in the terminal prompting you to **approve the "LEDMatrixController" application within the YouTube Music Desktop app.** You typically have 30 seconds to do this.
* Once approved, an authentication token is saved to your `config/config.json`.
4. **File Permissions for Token Saving (Important if running as a specific user e.g., `ledpi`):**
* If the script (e.g., `display_controller.py` or the systemd service) runs as a user like `ledpi`, that user needs permission to write the authentication token to `config/config.json`.
* Execute the following commands, replacing `ledpi` if you use a different user:
```bash
sudo chown ledpi:ledpi /home/ledpi/LEDMatrix/config /home/ledpi/LEDMatrix/config/config.json
sudo chmod 664 /home/ledpi/LEDMatrix/config/config.json
sudo chmod 775 /home/ledpi/LEDMatrix/config
```
* Once approved, an authentication token is saved to your `config/ytm_auth.json`.
* This ensures the `ledpi` user owns the config directory and file, and has the necessary write permissions.
**Troubleshooting:**
* "No authorized companions" in YTMD: Ensure you've approved the `LEDMatrixController` in YTMD settings after the first run.
* Connection errors: Double-check the `YTM_COMPANION_URL` in `config.json` matches what YTMD's companion server is set to.
* Permission denied saving token: Ensure you've run the `chown` and `chmod` commands above.

129
src/authenticate_spotify.py Normal file
View File

@@ -0,0 +1,129 @@
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import logging
import json
import os
import sys
# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Define paths relative to this file's location (assuming it's in src)
CONFIG_DIR = os.path.join(os.path.dirname(__file__), '..', 'config')
SECRETS_PATH = os.path.join(CONFIG_DIR, 'config_secrets.json')
SPOTIFY_AUTH_CACHE_PATH = os.path.join(CONFIG_DIR, 'spotify_auth.json') # Explicit cache path
# Resolve to absolute paths
CONFIG_DIR = os.path.abspath(CONFIG_DIR)
SECRETS_PATH = os.path.abspath(SECRETS_PATH)
SPOTIFY_AUTH_CACHE_PATH = os.path.abspath(SPOTIFY_AUTH_CACHE_PATH)
SCOPE = "user-read-currently-playing user-read-playback-state"
def load_spotify_credentials():
"""Loads Spotify credentials from config_secrets.json."""
if not os.path.exists(SECRETS_PATH):
logging.error(f"Secrets file not found at {SECRETS_PATH}")
return None, None, None
try:
with open(SECRETS_PATH, 'r') as f:
secrets = json.load(f)
music_secrets = secrets.get("music", {})
client_id = music_secrets.get("SPOTIFY_CLIENT_ID")
client_secret = music_secrets.get("SPOTIFY_CLIENT_SECRET")
redirect_uri = music_secrets.get("SPOTIFY_REDIRECT_URI")
if not all([client_id, client_secret, redirect_uri]):
logging.error("One or more Spotify credentials missing in config_secrets.json under the 'music' key.")
return None, None, None
return client_id, client_secret, redirect_uri
except json.JSONDecodeError:
logging.error(f"Error decoding JSON from {SECRETS_PATH}")
return None, None, None
except Exception as e:
logging.error(f"Error loading Spotify credentials: {e}")
return None, None, None
if __name__ == "__main__":
logging.info("Starting Spotify Authentication Process...")
client_id, client_secret, redirect_uri = load_spotify_credentials()
if not all([client_id, client_secret, redirect_uri]):
logging.error("Could not load Spotify credentials. Please check config/config_secrets.json. Exiting.")
sys.exit(1)
# Ensure the config directory exists for the cache file
if not os.path.exists(CONFIG_DIR):
try:
logging.info(f"Config directory {CONFIG_DIR} not found. Attempting to create it.")
os.makedirs(CONFIG_DIR)
logging.info(f"Successfully created config directory: {CONFIG_DIR}")
except OSError as e:
logging.error(f"Fatal: Could not create config directory {CONFIG_DIR}: {e}. Please create it manually. Exiting.")
sys.exit(1)
sp_oauth = SpotifyOAuth(
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
scope=SCOPE,
cache_path=SPOTIFY_AUTH_CACHE_PATH, # Use explicit cache path
open_browser=False
)
# Step 1: Get the authorization URL
auth_url = sp_oauth.get_authorize_url()
print("-" * 50)
print("SPOTIFY AUTHORIZATION NEEDED:")
print("1. Please visit this URL in a browser (on any device):")
print(f" {auth_url}")
print("2. Authorize the application.")
print("3. You will be redirected to a URL (likely showing an error). Copy that FULL redirected URL.")
print("-" * 50)
# Step 2: Get the redirected URL from the user
redirected_url = input("4. Paste the full redirected URL here and press Enter: ").strip()
if not redirected_url:
logging.error("No redirected URL provided. Exiting.")
sys.exit(1)
# Step 3: Parse the code from the redirected URL
try:
# Spotipy's parse_auth_response_url is not directly part of the public API of SpotifyOAuth
# for this specific flow where we manually handle the redirect.
# We need to extract the 'code' query parameter.
# A more robust way would be to use urllib.parse, but for simplicity:
if "?code=" in redirected_url:
auth_code = redirected_url.split("?code=")[1].split("&")[0]
elif "&code=" in redirected_url: # Should not happen if code is first param
auth_code = redirected_url.split("&code=")[1].split("&")[0]
else:
logging.error("Could not find 'code=' in the redirected URL. Please ensure you copied the full URL.")
logging.error(f"Received URL: {redirected_url}")
sys.exit(1)
except Exception as e:
logging.error(f"Error parsing authorization code from redirected URL: {e}")
logging.error(f"Received URL: {redirected_url}")
sys.exit(1)
# Step 4: Get the access token using the code and cache it
try:
# check_cache=False forces it to use the provided code rather than a potentially stale cached one for this specific step.
# The token will still be written to the cache_path.
token_info = sp_oauth.get_access_token(auth_code, check_cache=False)
if token_info:
logging.info(f"Spotify authentication successful. Token info cached at {SPOTIFY_AUTH_CACHE_PATH}")
else:
logging.error("Failed to obtain Spotify token info with the provided code.")
logging.error("Please ensure the code was correct and not expired.")
sys.exit(1)
except Exception as e:
logging.error(f"Error obtaining Spotify access token: {e}")
logging.error("This can happen if the authorization code is incorrect, expired, or already used.")
sys.exit(1)
logging.info("Spotify Authentication Process Finished.")

View File

@@ -2,9 +2,6 @@ import time
import logging
import sys
from typing import Dict, Any, List
import requests
from io import BytesIO
from PIL import Image
# Configure logging
logging.basicConfig(
@@ -63,26 +60,20 @@ class DisplayController:
# Initialize Music Manager
music_init_time = time.time()
self.music_manager = None
self.current_music_info = None
self.album_art_image = None
self.last_album_art_url = None
self.scroll_position_title = 0
self.scroll_position_artist = 0
self.title_scroll_tick = 0 # For slowing down title scroll
self.artist_scroll_tick = 0 # For slowing down artist scroll
# Ensure config is loaded before accessing it for music_manager
if hasattr(self, 'config'):
music_config_main = self.config.get('music', {})
if music_config_main.get('enabled', False):
try:
# Pass the update_callback here
self.music_manager = MusicManager(update_callback=self._handle_music_update)
if self.music_manager.enabled: # Check MusicManager's internal enabled status
# Pass display_manager and config. The callback is now optional for MusicManager.
# DisplayController might not need a specific music update callback anymore if MusicManager handles all display.
self.music_manager = MusicManager(display_manager=self.display_manager, config=self.config, update_callback=self._handle_music_update)
if self.music_manager.enabled:
logger.info("MusicManager initialized successfully.")
self.music_manager.start_polling()
else:
logger.info("MusicManager initialized but is internally disabled or failed to load its own config.")
self.music_manager = None # Ensure it's None if not truly usable
self.music_manager = None
except Exception as e:
logger.error(f"Failed to initialize MusicManager: {e}", exc_info=True)
self.music_manager = None
@@ -320,59 +311,21 @@ class DisplayController:
logger.info(f"NCAA FB Favorite teams: {self.ncaa_fb_favorite_teams}") # Log NCAA FB teams
# Removed redundant NHL/MLB init time logs
def _fetch_and_resize_image(self, url: str, target_size: tuple[int, int]) -> Image.Image | None:
"""Fetches an image from a URL, resizes it, and returns a PIL Image object."""
if not url:
return None
try:
response = requests.get(url, timeout=5) # 5-second timeout for image download
response.raise_for_status() # Raise an exception for bad status codes
img_data = BytesIO(response.content)
img = Image.open(img_data)
# Ensure image is RGB for compatibility with the matrix
img = img.convert("RGB")
# Resize while maintaining aspect ratio (letterbox/pillarbox if necessary)
# This creates a new image of target_size with the original image pasted into it.
# Original image is scaled to fit within target_size.
img.thumbnail(target_size, Image.Resampling.LANCZOS)
# Create a new image with a black background (or any color)
# and paste the thumbnail onto it to ensure it's exactly target_size
# This is good if the matrix hardware expects exact dimensions.
final_img = Image.new("RGB", target_size, (0,0,0)) # Black background
paste_x = (target_size[0] - img.width) // 2
paste_y = (target_size[1] - img.height) // 2
final_img.paste(img, (paste_x, paste_y))
return final_img
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching image from {url}: {e}")
return None
except IOError as e:
logger.error(f"Error processing image from {url}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching/processing image {url}: {e}")
return None
def _handle_music_update(self, track_info: Dict[str, Any]):
"""Callback for when music track info changes."""
self.current_music_info = track_info # Store the latest info
# Reset album art if URL changes or no URL
new_album_art_url = track_info.get('album_art_url') if track_info else None
if new_album_art_url != self.last_album_art_url:
self.album_art_image = None # Clear old image, will be refetched
self.last_album_art_url = new_album_art_url
"""Callback for when music track info changes. (Simplified)"""
# MusicManager now handles its own display state (album art, etc.)
# This callback might still be useful if DisplayController needs to react to music changes
# for reasons other than directly re-drawing the music screen (e.g., logging, global state).
# For now, we'll keep it simple. If the music screen is active, it will redraw on its own.
if track_info:
logger.info(f"DisplayController received music update (via callback): Title - {track_info.get('title')}, Playing - {track_info.get('is_playing')}")
else:
logger.info("DisplayController received music update (via callback): Track is None or not playing.")
if self.current_display_mode == 'music' and track_info:
logger.info("Music screen is active and track changed, queueing redraw.")
# The run loop will call display_music_screen, which will use self.current_music_info
# No need to force clear here unless specifically desired for music updates
# self.force_clear = True
elif self.current_display_mode == 'music' and not track_info:
logger.info("Music screen is active and track stopped/is None, queueing redraw for 'Nothing Playing'.")
# If the current display mode is music, the MusicManager's display method will be called
# in the main loop and will use its own updated internal state. No explicit action needed here
# to force a redraw of the music screen itself, unless DisplayController wants to switch TO music mode.
# Example: if self.current_display_mode == 'music': self.force_clear = True (but MusicManager.display handles this)
def get_current_duration(self) -> int:
"""Get the duration for the current display mode."""
@@ -615,149 +568,6 @@ class DisplayController:
self.ncaa_fb_current_team_index = (self.ncaa_fb_current_team_index + 1) % len(self.ncaa_fb_favorite_teams)
self.ncaa_fb_showing_recent = True # Reset to recent for the new team
def display_music_screen(self, force_clear: bool = False):
if force_clear or self.force_clear:
self.display_manager.clear()
self.force_clear = False
display_info = self.current_music_info
if not display_info or not display_info.get('is_playing', False) or display_info.get('title') == 'Nothing Playing':
if not hasattr(self, '_last_nothing_playing_log_time') or time.time() - self._last_nothing_playing_log_time > 30:
logger.info("Music Screen: Nothing playing or info unavailable.")
self._last_nothing_playing_log_time = time.time()
self.display_manager.clear()
text_width = self.display_manager.get_text_width("Nothing Playing", self.display_manager.regular_font)
x_pos = (self.display_manager.matrix.width - text_width) // 2
y_pos = (self.display_manager.matrix.height // 2) - 4
self.display_manager.draw_text("Nothing Playing", x=x_pos, y=y_pos, font=self.display_manager.regular_font)
self.display_manager.update_display()
self.scroll_position_title = 0
self.scroll_position_artist = 0
self.title_scroll_tick = 0 # Reset ticks
self.artist_scroll_tick = 0
self.album_art_image = None
return
self.display_manager.draw.rectangle([0, 0, self.display_manager.matrix.width, self.display_manager.matrix.height], fill=(0, 0, 0))
# Album Art Configuration
matrix_height = self.display_manager.matrix.height
album_art_size = matrix_height - 2 # Slightly smaller than matrix height, with 1px padding top/bottom
album_art_target_size = (album_art_size, album_art_size)
album_art_x = 1
album_art_y = 1
text_area_x_start = album_art_x + album_art_size + 2 # Start text 2px to the right of art
text_area_width = self.display_manager.matrix.width - text_area_x_start - 1 # 1px padding on right
# Fetch and display album art
if self.last_album_art_url and not self.album_art_image:
logger.info(f"Fetching album art from: {self.last_album_art_url}")
self.album_art_image = self._fetch_and_resize_image(self.last_album_art_url, album_art_target_size)
if self.album_art_image:
logger.info(f"Album art fetched and processed successfully.")
else:
logger.warning(f"Failed to fetch or process album art.")
if self.album_art_image:
self.display_manager.image.paste(self.album_art_image, (album_art_x, album_art_y))
else:
# No album art, text area uses full width
text_area_x_start = 1
text_area_width = self.display_manager.matrix.width - 2
# Optionally draw a placeholder for album art
self.display_manager.draw.rectangle([album_art_x, album_art_y,
album_art_x + album_art_size -1, album_art_y + album_art_size -1],
outline=(50,50,50), fill=(10,10,10))
self.display_manager.draw_text("?", x=album_art_x + album_art_size//2 - 3, y=album_art_y + album_art_size//2 - 4, color=(100,100,100))
title = display_info.get('title', ' ')
artist = display_info.get('artist', ' ')
album = display_info.get('album', ' ') # Added album
font_title = self.display_manager.small_font
font_artist_album = self.display_manager.extra_small_font
line_height_title = 8 # Approximate height for PressStart2P 8px
line_height_artist_album = 7 # Approximate height for 4x6 font 6px
padding_between_lines = 1
TEXT_SCROLL_DIVISOR = 5 # Adjust this value to change scroll speed (higher is slower)
# --- Title ---
y_pos_title = 2 # Small top margin for title
title_width = self.display_manager.get_text_width(title, font_title)
# Always draw the current state of the text based on scroll_position_title
current_title_display_text = title
if title_width > text_area_width:
current_title_display_text = title[self.scroll_position_title:] + " " + title[:self.scroll_position_title]
self.display_manager.draw_text(current_title_display_text,
x=text_area_x_start, y=y_pos_title, color=(255, 255, 255), font=font_title)
# Only update scroll position based on the divisor
if title_width > text_area_width:
self.title_scroll_tick += 1
if self.title_scroll_tick % TEXT_SCROLL_DIVISOR == 0:
self.scroll_position_title = (self.scroll_position_title + 1) % len(title)
self.title_scroll_tick = 0 # Reset tick to avoid large numbers
else:
self.scroll_position_title = 0
self.title_scroll_tick = 0
# --- Artist ---
y_pos_artist = y_pos_title + line_height_title + padding_between_lines
artist_width = self.display_manager.get_text_width(artist, font_artist_album)
current_artist_display_text = artist
if artist_width > text_area_width:
current_artist_display_text = artist[self.scroll_position_artist:] + " " + artist[:self.scroll_position_artist]
self.display_manager.draw_text(current_artist_display_text,
x=text_area_x_start, y=y_pos_artist, color=(180, 180, 180), font=font_artist_album)
if artist_width > text_area_width:
self.artist_scroll_tick += 1
if self.artist_scroll_tick % TEXT_SCROLL_DIVISOR == 0:
self.scroll_position_artist = (self.scroll_position_artist + 1) % len(artist)
self.artist_scroll_tick = 0
else:
self.scroll_position_artist = 0
self.artist_scroll_tick = 0
# --- Album (optional, if space permits, or scroll, or alternate with artist) ---
# For now, let's place it below artist if it fits, otherwise omit or consider more complex layouts later.
y_pos_album = y_pos_artist + line_height_artist_album + padding_between_lines
# Check if album can fit before progress bar.
# Progress bar height: ~3px + padding. Let's say 5px total.
# Available space for album: matrix_height - y_pos_album - 5
if (matrix_height - y_pos_album - 5) >= line_height_artist_album :
album_width = self.display_manager.get_text_width(album, font_artist_album)
if album_width <= text_area_width: # Only display if it fits without scrolling for now
self.display_manager.draw_text(album, x=text_area_x_start, y=y_pos_album, color=(150, 150, 150), font=font_artist_album)
# --- Progress Bar ---
progress_bar_height = 3
progress_bar_y = matrix_height - progress_bar_height - 1 # 1px from bottom
duration_ms = display_info.get('duration_ms', 0)
progress_ms = display_info.get('progress_ms', 0)
if duration_ms > 0:
bar_total_width = text_area_width
filled_ratio = progress_ms / duration_ms
filled_width = int(filled_ratio * bar_total_width)
# Draw background/empty part of progress bar
self.display_manager.draw.rectangle([
text_area_x_start, progress_bar_y,
text_area_x_start + bar_total_width -1, progress_bar_y + progress_bar_height -1
], outline=(60, 60, 60), fill=(30,30,30)) # Dim outline and fill for empty part
# Draw filled part of progress bar
if filled_width > 0:
self.display_manager.draw.rectangle([
text_area_x_start, progress_bar_y,
text_area_x_start + filled_width -1, progress_bar_y + progress_bar_height -1
], fill=(200, 200, 200)) # Brighter fill for progress
self.display_manager.update_display()
def run(self):
"""Run the display controller, switching between displays."""
if not self.available_modes:
@@ -965,7 +775,8 @@ class DisplayController:
# --- Perform Display Update ---
try:
if self.current_display_mode == 'music' and self.music_manager:
self.display_music_screen(force_clear=self.force_clear)
# Call MusicManager's display method
self.music_manager.display(force_clear=self.force_clear)
# Reset force_clear if it was true for this mode
if self.force_clear:
self.force_clear = False

View File

@@ -4,6 +4,9 @@ from enum import Enum, auto
import logging
import json
import os
from io import BytesIO
import requests
from PIL import Image
# Use relative imports for clients within the same package (src)
from .spotify_client import SpotifyClient
@@ -12,6 +15,7 @@ from .ytm_client import YTMClient
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Define paths relative to this file's location
CONFIG_DIR = os.path.join(os.path.dirname(__file__), '..', 'config')
@@ -24,7 +28,9 @@ class MusicSource(Enum):
YTM = auto()
class MusicManager:
def __init__(self, update_callback=None):
def __init__(self, display_manager, config, update_callback=None):
self.display_manager = display_manager
self.config = config
self.spotify = None
self.ytm = None
self.current_track_info = None
@@ -34,6 +40,15 @@ class MusicManager:
self.enabled = False # Default
self.preferred_source = "auto" # Default
self.stop_event = threading.Event()
# Display related attributes moved from DisplayController
self.album_art_image = None
self.last_album_art_url = None
self.scroll_position_title = 0
self.scroll_position_artist = 0
self.title_scroll_tick = 0
self.artist_scroll_tick = 0
self._load_config() # Load config first
self._initialize_clients() # Initialize based on loaded config
self.poll_thread = None
@@ -117,6 +132,37 @@ class MusicManager:
logging.info("YTM client initialization skipped due to preferred_source setting.")
self.ytm = None
def _fetch_and_resize_image(self, url: str, target_size: tuple[int, int]) -> Image.Image | None:
"""Fetches an image from a URL, resizes it, and returns a PIL Image object."""
if not url:
return None
try:
response = requests.get(url, timeout=5) # 5-second timeout for image download
response.raise_for_status() # Raise an exception for bad status codes
img_data = BytesIO(response.content)
img = Image.open(img_data)
# Ensure image is RGB for compatibility with the matrix
img = img.convert("RGB")
img.thumbnail(target_size, Image.Resampling.LANCZOS)
final_img = Image.new("RGB", target_size, (0,0,0)) # Black background
paste_x = (target_size[0] - img.width) // 2
paste_y = (target_size[1] - img.height) // 2
final_img.paste(img, (paste_x, paste_y))
return final_img
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching image from {url}: {e}")
return None
except IOError as e:
logger.error(f"Error processing image from {url}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching/processing image {url}: {e}")
return None
def _poll_music_data(self):
"""Continuously polls music sources for updates, respecting preferences."""
if not self.enabled:
@@ -175,24 +221,33 @@ class MusicManager:
# --- Consolidate and Check for Changes ---
simplified_info = self.get_simplified_track_info(polled_track_info, polled_source)
current_simplified_info = self.get_simplified_track_info(self.current_track_info, self.current_source)
has_changed = False
if simplified_info != current_simplified_info:
if simplified_info != self.current_track_info:
has_changed = True
self.current_track_info = polled_track_info
# Update internal state
old_album_art_url = self.current_track_info.get('album_art_url') if self.current_track_info else None
new_album_art_url = simplified_info.get('album_art_url') if simplified_info else None
self.current_track_info = simplified_info
self.current_source = polled_source
display_title = simplified_info.get('title', 'None') if simplified_info else 'None'
logging.debug(f"Track change detected. Source: {self.current_source.name}. Track: {display_title}")
if new_album_art_url != old_album_art_url:
self.album_art_image = None
self.last_album_art_url = new_album_art_url
display_title = self.current_track_info.get('title', 'None') if self.current_track_info else 'None'
logger.debug(f"Track change detected. Source: {self.current_source.name}. Track: {display_title}")
else:
logging.debug("No change in simplified track info.")
logger.debug("No change in simplified track info.")
if has_changed and self.update_callback:
try:
self.update_callback(simplified_info)
self.update_callback(self.current_track_info)
except Exception as e:
logging.error(f"Error executing update callback: {e}")
logger.error(f"Error executing update callback: {e}")
time.sleep(self.polling_interval)
# Modified to accept data and source, making it more testable/reusable
@@ -263,11 +318,9 @@ class MusicManager:
}
def get_current_display_info(self):
"""Returns the latest simplified info for display purposes."""
# Return default "Nothing Playing" state if manager is disabled
if not self.enabled:
return self.get_simplified_track_info(None, MusicSource.NONE)
return self.get_simplified_track_info(self.current_track_info, self.current_source)
"""Returns the currently stored track information for display."""
# This method might be used by DisplayController if it still needs a snapshot
return self.current_track_info
def start_polling(self):
# Only start polling if enabled
@@ -287,36 +340,232 @@ class MusicManager:
logging.info("Music polling started.")
def stop_polling(self):
"""Stops the music polling thread."""
logger.info("Music manager: Stopping polling thread...")
self.stop_event.set()
if self.poll_thread and self.poll_thread.is_alive():
self.poll_thread.join() # Wait for thread to finish
logging.info("Music polling stopped.")
# Example Usage (for testing)
if __name__ == '__main__':
def print_update(track_info):
print("-" * 20)
if track_info and track_info['source'] != 'None':
print(f"Source: {track_info.get('source')}")
print(f"Title: {track_info.get('title')}")
print(f"Artist: {track_info.get('artist')}")
print(f"Album: {track_info.get('album')}")
print(f"Playing: {track_info.get('is_playing')}")
print(f"Duration: {track_info.get('duration_ms')} ms")
print(f"Progress: {track_info.get('progress_ms')} ms")
print(f"Art URL: {track_info.get('album_art_url')}")
self.poll_thread.join(timeout=self.polling_interval + 1) # Wait for thread to finish
if self.poll_thread and self.poll_thread.is_alive():
logger.warning("Music manager: Polling thread did not terminate cleanly.")
else:
print("Nothing playing or update is None.")
print("-" * 20)
logger.info("Music manager: Polling thread stopped.")
self.poll_thread = None # Clear the thread object
manager = MusicManager(update_callback=print_update)
manager.start_polling()
# Method moved from DisplayController and renamed
def display(self, force_clear: bool = False):
if force_clear: # Removed self.force_clear as it's passed directly
self.display_manager.clear()
# self.force_clear = False # Not needed here
try:
# Keep the main thread alive to allow polling thread to run
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping polling...")
manager.stop_polling()
print("Exiting.")
# Use self.current_track_info which is updated by _poll_music_data
display_info = self.current_track_info
if not display_info or not display_info.get('is_playing', False) or display_info.get('title') == 'Nothing Playing':
# Debounce "Nothing playing" log for this manager
if not hasattr(self, '_last_nothing_playing_log_time') or \
time.time() - getattr(self, '_last_nothing_playing_log_time', 0) > 30:
logger.info("Music Screen (MusicManager): Nothing playing or info unavailable.")
self._last_nothing_playing_log_time = time.time()
self.display_manager.clear() # Clear before drawing "Nothing Playing"
text_width = self.display_manager.get_text_width("Nothing Playing", self.display_manager.regular_font)
x_pos = (self.display_manager.matrix.width - text_width) // 2
y_pos = (self.display_manager.matrix.height // 2) - 4
self.display_manager.draw_text("Nothing Playing", x=x_pos, y=y_pos, font=self.display_manager.regular_font)
self.display_manager.update_display()
self.scroll_position_title = 0
self.scroll_position_artist = 0
self.title_scroll_tick = 0
self.artist_scroll_tick = 0
self.album_art_image = None # Clear album art if nothing is playing
self.last_album_art_url = None # Also clear the URL
return
# Ensure screen is cleared if not force_clear but needed (e.g. transition from "Nothing Playing")
# This might be handled by DisplayController's force_clear logic, but can be an internal check too.
# For now, assuming DisplayController manages the initial clear for a new mode.
self.display_manager.draw.rectangle([0, 0, self.display_manager.matrix.width, self.display_manager.matrix.height], fill=(0, 0, 0))
# Album Art Configuration
matrix_height = self.display_manager.matrix.height
album_art_size = matrix_height - 2
album_art_target_size = (album_art_size, album_art_size)
album_art_x = 1
album_art_y = 1
text_area_x_start = album_art_x + album_art_size + 2
text_area_width = self.display_manager.matrix.width - text_area_x_start - 1
# Fetch and display album art using self.last_album_art_url and self.album_art_image
if self.last_album_art_url and not self.album_art_image:
logger.info(f"MusicManager: Fetching album art from: {self.last_album_art_url}")
self.album_art_image = self._fetch_and_resize_image(self.last_album_art_url, album_art_target_size)
if self.album_art_image:
logger.info(f"MusicManager: Album art fetched and processed successfully.")
else:
logger.warning(f"MusicManager: Failed to fetch or process album art.")
if self.album_art_image:
self.display_manager.image.paste(self.album_art_image, (album_art_x, album_art_y))
else:
text_area_x_start = 1
text_area_width = self.display_manager.matrix.width - 2
self.display_manager.draw.rectangle([album_art_x, album_art_y,
album_art_x + album_art_size -1, album_art_y + album_art_size -1],
outline=(50,50,50), fill=(10,10,10))
self.display_manager.draw_text("?", x=album_art_x + album_art_size//2 - 3, y=album_art_y + album_art_size//2 - 4, color=(100,100,100))
title = display_info.get('title', ' ')
artist = display_info.get('artist', ' ')
album = display_info.get('album', ' ')
font_title = self.display_manager.small_font
font_artist_album = self.display_manager.extra_small_font
line_height_title = 8
line_height_artist_album = 7
padding_between_lines = 1
TEXT_SCROLL_DIVISOR = 5
# --- Title ---
y_pos_title = 2
title_width = self.display_manager.get_text_width(title, font_title)
current_title_display_text = title
if title_width > text_area_width:
# Ensure scroll_position_title is valid for the current title length
if self.scroll_position_title >= len(title):
self.scroll_position_title = 0
current_title_display_text = title[self.scroll_position_title:] + " " + title[:self.scroll_position_title]
self.display_manager.draw_text(current_title_display_text,
x=text_area_x_start, y=y_pos_title, color=(255, 255, 255), font=font_title)
if title_width > text_area_width:
self.title_scroll_tick += 1
if self.title_scroll_tick % TEXT_SCROLL_DIVISOR == 0:
self.scroll_position_title = (self.scroll_position_title + 1) % len(title)
self.title_scroll_tick = 0
else:
self.scroll_position_title = 0
self.title_scroll_tick = 0
# --- Artist ---
y_pos_artist = y_pos_title + line_height_title + padding_between_lines
artist_width = self.display_manager.get_text_width(artist, font_artist_album)
current_artist_display_text = artist
if artist_width > text_area_width:
# Ensure scroll_position_artist is valid for the current artist length
if self.scroll_position_artist >= len(artist):
self.scroll_position_artist = 0
current_artist_display_text = artist[self.scroll_position_artist:] + " " + artist[:self.scroll_position_artist]
self.display_manager.draw_text(current_artist_display_text,
x=text_area_x_start, y=y_pos_artist, color=(180, 180, 180), font=font_artist_album)
if artist_width > text_area_width:
self.artist_scroll_tick += 1
if self.artist_scroll_tick % TEXT_SCROLL_DIVISOR == 0:
self.scroll_position_artist = (self.scroll_position_artist + 1) % len(artist)
self.artist_scroll_tick = 0
else:
self.scroll_position_artist = 0
self.artist_scroll_tick = 0
# --- Album ---
y_pos_album = y_pos_artist + line_height_artist_album + padding_between_lines
if (matrix_height - y_pos_album - 5) >= line_height_artist_album :
album_width = self.display_manager.get_text_width(album, font_artist_album)
if album_width <= text_area_width:
self.display_manager.draw_text(album, x=text_area_x_start, y=y_pos_album, color=(150, 150, 150), font=font_artist_album)
# --- Progress Bar ---
progress_bar_height = 3
progress_bar_y = matrix_height - progress_bar_height - 1
duration_ms = display_info.get('duration_ms', 0)
progress_ms = display_info.get('progress_ms', 0)
if duration_ms > 0:
bar_total_width = text_area_width
filled_ratio = progress_ms / duration_ms
filled_width = int(filled_ratio * bar_total_width)
self.display_manager.draw.rectangle([
text_area_x_start, progress_bar_y,
text_area_x_start + bar_total_width -1, progress_bar_y + progress_bar_height -1
], outline=(60, 60, 60), fill=(30,30,30))
if filled_width > 0:
self.display_manager.draw.rectangle([
text_area_x_start, progress_bar_y,
text_area_x_start + filled_width -1, progress_bar_y + progress_bar_height -1
], fill=(200, 200, 200))
self.display_manager.update_display()
# Example usage (for testing this module standalone, if needed)
# def print_update(track_info):
# logging.info(f"Callback: Track update received by dummy callback: {track_info}")
if __name__ == '__main__':
# This is a placeholder for testing.
# To test properly, you'd need a mock DisplayManager and ConfigManager.
logging.basicConfig(level=logging.DEBUG)
logger.info("Running MusicManager standalone test (limited)...")
# Mock DisplayManager and Config objects
class MockDisplayManager:
def __init__(self):
self.matrix = type('Matrix', (), {'width': 64, 'height': 32})() # Mock matrix
self.image = Image.new("RGB", (self.matrix.width, self.matrix.height))
self.draw = ImageDraw.Draw(self.image) # Requires ImageDraw
self.regular_font = None # Needs font loading
self.small_font = None
self.extra_small_font = None
# Add other methods/attributes DisplayManager uses if they are called by MusicManager's display
# For simplicity, we won't fully mock font loading here.
# self.regular_font = ImageFont.truetype("path/to/font.ttf", 8)
def clear(self): logger.debug("MockDisplayManager: clear() called")
def get_text_width(self, text, font): return len(text) * 5 # Rough mock
def draw_text(self, text, x, y, color=(255,255,255), font=None): logger.debug(f"MockDisplayManager: draw_text '{text}' at ({x},{y})")
def update_display(self): logger.debug("MockDisplayManager: update_display() called")
class MockConfig:
def get(self, key, default=None):
if key == "music":
return {"enabled": True, "POLLING_INTERVAL_SECONDS": 2, "preferred_source": "auto"}
return default
# Need to import ImageDraw for the mock to work if draw_text is complex
try: from PIL import ImageDraw, ImageFont
except ImportError: ImageDraw = None; ImageFont = None; logger.warning("Pillow ImageDraw/ImageFont not fully available for mock")
mock_display = MockDisplayManager()
mock_config_main = {"music": {"enabled": True, "POLLING_INTERVAL_SECONDS": 2, "preferred_source": "auto"}}
# The MusicManager expects the overall config, not just the music part directly for its _load_config
# So we simulate a config object that has a .get('music', {}) method.
# However, MusicManager's _load_config reads from CONFIG_PATH.
# For a true standalone test, we might need to mock file IO or provide a test config file.
# Simplified test:
# manager = MusicManager(display_manager=mock_display, config=mock_config_main) # This won't work due to file reading
# To truly test, you'd point CONFIG_PATH to a test config.json or mock open()
# For now, this __main__ block is mostly a placeholder.
logger.info("MusicManager standalone test setup is complex due to file dependencies for config.")
logger.info("To test: run the main application and observe logs from MusicManager.")
# if manager.enabled:
# manager.start_polling()
# try:
# while True:
# time.sleep(1)
# # In a real test, you might manually call manager.display() after setting some track info
# except KeyboardInterrupt:
# logger.info("Stopping standalone test...")
# finally:
# if manager.enabled:
# manager.stop_polling()
# logger.info("Test finished.")

View File

@@ -9,6 +9,12 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(
# Define paths relative to this file's location
CONFIG_DIR = os.path.join(os.path.dirname(__file__), '..', 'config')
SECRETS_PATH = os.path.join(CONFIG_DIR, 'config_secrets.json')
SPOTIFY_AUTH_CACHE_PATH = os.path.join(CONFIG_DIR, 'spotify_auth.json') # Explicit cache path for token
# Resolve to absolute paths
CONFIG_DIR = os.path.abspath(CONFIG_DIR)
SECRETS_PATH = os.path.abspath(SECRETS_PATH)
SPOTIFY_AUTH_CACHE_PATH = os.path.abspath(SPOTIFY_AUTH_CACHE_PATH)
class SpotifyClient:
def __init__(self):
@@ -19,14 +25,14 @@ class SpotifyClient:
self.sp = None
self.load_credentials()
if self.client_id and self.client_secret and self.redirect_uri:
# Attempt to authenticate once using the cache path
self._authenticate()
else:
logging.warning("Spotify credentials not loaded. Cannot authenticate.")
logging.warning("Spotify credentials not loaded. Spotify client will not be functional.")
def load_credentials(self):
if not os.path.exists(SECRETS_PATH):
logging.error(f"Secrets file not found at {SECRETS_PATH}")
logging.error(f"Secrets file not found at {SECRETS_PATH}. Spotify features will be unavailable.")
return
try:
@@ -37,96 +43,76 @@ class SpotifyClient:
self.client_secret = music_secrets.get("SPOTIFY_CLIENT_SECRET")
self.redirect_uri = music_secrets.get("SPOTIFY_REDIRECT_URI")
if not all([self.client_id, self.client_secret, self.redirect_uri]):
logging.warning("One or more Spotify credentials missing in config_secrets.json under the 'music' key.")
logging.warning("One or more Spotify credentials missing in config_secrets.json. Spotify will be unavailable.")
except json.JSONDecodeError:
logging.error(f"Error decoding JSON from {SECRETS_PATH}")
logging.error(f"Error decoding JSON from {SECRETS_PATH}. Spotify will be unavailable.")
except Exception as e:
logging.error(f"Error loading Spotify credentials: {e}")
logging.error(f"Error loading Spotify credentials: {e}. Spotify will be unavailable.")
def _authenticate(self):
"""Handles the OAuth authentication flow."""
"""Initializes Spotipy with SpotifyOAuth, relying on a cached token."""
if not self.client_id or not self.client_secret or not self.redirect_uri:
logging.warning("Cannot authenticate Spotify: credentials missing.")
return
try:
# Spotipy handles token caching in .cache file by default
self.sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
# Use the explicit cache path. Spotipy will try to load/refresh token from here.
auth_manager = SpotifyOAuth(
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uri=self.redirect_uri,
scope=self.scope,
open_browser=False # Important for headless environments
))
# Try making a call to ensure authentication is working or trigger refresh
self.sp.current_user()
logging.info("Spotify authenticated successfully.")
cache_path=SPOTIFY_AUTH_CACHE_PATH, # Use the defined cache path
open_browser=False
)
self.sp = spotipy.Spotify(auth_manager=auth_manager)
# Try making a lightweight call to verify if the token from cache is valid or can be refreshed.
self.sp.current_user() # This will raise an exception if token is invalid/expired and cannot be refreshed.
logging.info("Spotify client initialized and authenticated using cached token.")
except Exception as e:
logging.error(f"Spotify authentication failed: {e}")
logging.warning(f"Spotify client initialization/authentication failed: {e}. Run authenticate_spotify.py if needed.")
self.sp = None # Ensure sp is None if auth fails
def is_authenticated(self):
"""Checks if the client is authenticated."""
# Check if sp object exists and try a lightweight API call
if not self.sp:
return False
try:
# A simple call to verify token validity
self.sp.current_user()
return True
except Exception as e:
# Log specific auth errors if needed
logging.warning(f"Spotify token validation failed: {e}")
return False
"""Checks if the client is currently considered authenticated and usable."""
return self.sp is not None # Relies on _authenticate setting sp to None on failure
def get_auth_url(self):
"""Gets the authorization URL for the user."""
# Create a temporary auth manager just to get the URL
try:
auth_manager = SpotifyOAuth(
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uri=self.redirect_uri,
scope=self.scope,
open_browser=False
)
return auth_manager.get_authorize_url()
except Exception as e:
logging.error(f"Could not get Spotify auth URL: {e}")
return None
# Removed get_auth_url method - this is now handled by authenticate_spotify.py
def get_current_track(self):
"""Fetches the currently playing track from Spotify."""
if not self.is_authenticated():
logging.warning("Spotify not authenticated. Cannot fetch track.")
# Maybe try re-authenticating?
self._authenticate()
if not self.is_authenticated():
return None
if not self.is_authenticated(): # Check our internal state
# Do not attempt to re-authenticate here. User must run authenticate_spotify.py
# logging.debug("Spotify not authenticated. Cannot fetch track. Run authenticate_spotify.py if needed.")
return None
try:
track_info = self.sp.current_playback()
if track_info and track_info['item']:
# Simplify structure slightly if needed, or return raw
return track_info
else:
return None # Nothing playing or unavailable
except Exception as e:
logging.error(f"Error fetching current track from Spotify: {e}")
# Check for specific errors like token expiration if spotipy doesn't handle it
if "expired" in str(e).lower():
logging.info("Spotify token might be expired, attempting refresh...")
self._authenticate() # Try to refresh/re-authenticate
return None
except spotipy.exceptions.SpotifyException as e:
logging.error(f"Spotify API error when fetching current track: {e}")
# If it's an auth error (e.g. token revoked server-side), set sp to None so is_authenticated reflects it.
if e.http_status == 401 or e.http_status == 403:
logging.warning("Spotify authentication error (token may be revoked or expired). Please re-run authenticate_spotify.py.")
self.sp = None # Mark as not authenticated
return None
except Exception as e: # Catch other potential errors (network, etc.)
logging.error(f"Unexpected error fetching current track from Spotify: {e}")
return None
# Example Usage (for testing)
# Example Usage (for testing, adapt to new auth flow)
# if __name__ == '__main__':
# # First, ensure you have run authenticate_spotify.py successfully as the user.
# client = SpotifyClient()
# if client.is_authenticated():
# print("Spotify client is authenticated.")
# track = client.get_current_track()
# if track:
# print(json.dumps(track, indent=2))
# else:
# print("No track currently playing or error fetching.")
# else:
# auth_url = client.get_auth_url()
# if auth_url:
# print(f"Please authorize here: {auth_url}")
# else:
# print("Could not authenticate or get auth URL. Check credentials and config.")
# print("Spotify client not authenticated. Please run src/authenticate_spotify.py as the correct user.")