mirror of
https://github.com/ChuckBuilds/LEDMatrix.git
synced 2026-04-10 21:03:01 +00:00
add music display with YTM and Spotify
This commit is contained in:
77
README.md
77
README.md
@@ -352,3 +352,80 @@ The LEDMatrix system includes a robust caching mechanism to optimize API calls a
|
|||||||
|
|
||||||
## Fonts
|
## Fonts
|
||||||
You can add any font to the assets/fonts/ folder but they need to be .ttf and updated in display_manager.py
|
You can add any font to the assets/fonts/ folder but they need to be .ttf and updated in display_manager.py
|
||||||
|
|
||||||
|
### Music Display Configuration
|
||||||
|
|
||||||
|
The Music Display module shows information about the currently playing track from either Spotify or YouTube Music (via the [YouTube Music Desktop App](https://ytmdesktop.app/) companion server).
|
||||||
|
|
||||||
|
**Setup Requirements:**
|
||||||
|
|
||||||
|
1. **Spotify:**
|
||||||
|
* Requires a Spotify Premium account (for API access).
|
||||||
|
* You need to register an application on the [Spotify Developer Dashboard](https://developer.spotify.com/dashboard/) to get API credentials.
|
||||||
|
* Go to the dashboard, log in, and click "Create App".
|
||||||
|
* Give it a name (e.g., "LEDMatrix Display") and description.
|
||||||
|
* For the "Redirect URI", enter `http://localhost:8888/callback` (or another unused port if 8888 is taken). You **must** add this exact URI in your app settings on the Spotify dashboard.
|
||||||
|
* Note down the `Client ID` and `Client Secret`.
|
||||||
|
|
||||||
|
2. **YouTube Music (YTM):**
|
||||||
|
* Requires the [YouTube Music Desktop App](https://ytmdesktop.app/) (YTMD) to be installed and running on a computer on the *same network* as the Raspberry Pi.
|
||||||
|
* In YTMD settings, enable the "Companion Server" under Integration options. Note the URL it provides (usually `http://localhost:9863` if running on the same machine, or `http://<YTMD-Computer-IP>:9863` if running on a different computer).
|
||||||
|
|
||||||
|
**Configuration:**
|
||||||
|
|
||||||
|
1. In `config/config_secrets.json`, add your Spotify API credentials under the `music` key:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"music": {
|
||||||
|
"SPOTIFY_CLIENT_ID": "YOUR_SPOTIFY_CLIENT_ID_HERE",
|
||||||
|
"SPOTIFY_CLIENT_SECRET": "YOUR_SPOTIFY_CLIENT_SECRET_HERE",
|
||||||
|
"SPOTIFY_REDIRECT_URI": "http://localhost:8888/callback"
|
||||||
|
}
|
||||||
|
// ... other secrets ...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
*(Ensure the `SPOTIFY_REDIRECT_URI` here matches exactly what you entered in the Spotify Developer Dashboard).*
|
||||||
|
|
||||||
|
2. In `config/config.json`, add/modify the `music` section:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"music": {
|
||||||
|
"enabled": true, // Set to false to disable this display
|
||||||
|
"preferred_source": "auto", // Options: "auto", "spotify", "ytm"
|
||||||
|
"YTM_COMPANION_URL": "http://<YTMD-Computer-IP>:9863", // Replace with actual URL if YTMD is not on the Pi
|
||||||
|
"POLLING_INTERVAL_SECONDS": 2 // How often to check for track updates
|
||||||
|
}
|
||||||
|
// ... other configurations ...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
Also, ensure the display duration is set in the `display_durations` section:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"display": {
|
||||||
|
"display_durations": {
|
||||||
|
"music": 20, // Duration in seconds
|
||||||
|
// ... other durations ...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// ... other configurations ...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**`preferred_source` Options:**
|
||||||
|
|
||||||
|
* `"auto"`: (Default) Checks Spotify first. If Spotify is playing, shows its track. If not, checks YTM.
|
||||||
|
* `"spotify"`: Only uses Spotify. Ignores YTM.
|
||||||
|
* `"ytm"`: Only uses the YTM Companion Server. Ignores Spotify.
|
||||||
|
|
||||||
|
**First Spotify Run (Headless Setup):**
|
||||||
|
|
||||||
|
Since the display runs on a headless Raspberry Pi, the Spotify authorization process requires a few manual steps:
|
||||||
|
|
||||||
|
1. **Start the Application:** Run the display controller script (`sudo python3 display_controller.py`).
|
||||||
|
2. **Copy Auth URL:** When Spotify needs authorization for the first time (or after a token expires), the application will **print a URL** to the console. Copy this full URL.
|
||||||
|
3. **Authorize in Browser (on another device):** Paste the copied URL into a web browser on your computer or phone. Log in to Spotify if prompted and click "Agree" to authorize the application.
|
||||||
|
4. **Get Redirected URL:** Your browser will be redirected to a URL starting with your `SPOTIFY_REDIRECT_URI` (e.g., `http://localhost:8888/callback`) followed by `?code=...`. The page will likely show an error like "Site can't be reached" - **this is expected and perfectly fine.**
|
||||||
|
5. **Copy Full Redirected URL:** **Immediately copy the complete URL** from your browser's address bar. Make sure you copy the *entire* thing, including the `?code=...` part.
|
||||||
|
6. **Paste URL Back to Pi:** Go back to the Raspberry Pi console where the display script is running. It should now be prompting you to "Enter the URL you were redirected to:". **Paste the full URL you just copied** from your browser into the console and press Enter.
|
||||||
|
|
||||||
|
The application will then use the provided code to get the necessary tokens and cache them (usually in a `.cache` file). Subsequent runs should not require this process unless the token expires.
|
||||||
|
|||||||
@@ -58,7 +58,8 @@
|
|||||||
"soccer_upcoming": 20,
|
"soccer_upcoming": 20,
|
||||||
"ncaam_live": 20,
|
"ncaam_live": 20,
|
||||||
"ncaam_recent": 15,
|
"ncaam_recent": 15,
|
||||||
"ncaam_upcoming": 15
|
"ncaam_upcoming": 15,
|
||||||
|
"music": 20
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"clock": {
|
"clock": {
|
||||||
@@ -243,5 +244,11 @@
|
|||||||
"soccer_recent": true,
|
"soccer_recent": true,
|
||||||
"soccer_upcoming": true
|
"soccer_upcoming": true
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
"music": {
|
||||||
|
"enabled": true,
|
||||||
|
"preferred_source": "ytm",
|
||||||
|
"YTM_COMPANION_URL": "http://192.168.86.12:9863",
|
||||||
|
"POLLING_INTERVAL_SECONDS": 2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -7,4 +7,5 @@ rgbmatrix
|
|||||||
google-auth-oauthlib==1.0.0
|
google-auth-oauthlib==1.0.0
|
||||||
google-auth-httplib2==0.1.0
|
google-auth-httplib2==0.1.0
|
||||||
google-api-python-client==2.86.0
|
google-api-python-client==2.86.0
|
||||||
freetype-py==2.5.1
|
freetype-py==2.5.1
|
||||||
|
spotipy
|
||||||
309
src/music_manager.py
Normal file
309
src/music_manager.py
Normal file
@@ -0,0 +1,309 @@
|
|||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from enum import Enum, auto
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Use relative imports for clients within the same package (src)
|
||||||
|
from .spotify_client import SpotifyClient
|
||||||
|
from .ytm_client import YTMClient
|
||||||
|
# Removed: import config
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
# Define paths relative to this file's location
|
||||||
|
CONFIG_DIR = os.path.join(os.path.dirname(__file__), '..', 'config')
|
||||||
|
CONFIG_PATH = os.path.join(CONFIG_DIR, 'config.json')
|
||||||
|
# SECRETS_PATH is handled within SpotifyClient
|
||||||
|
|
||||||
|
class MusicSource(Enum):
|
||||||
|
NONE = auto()
|
||||||
|
SPOTIFY = auto()
|
||||||
|
YTM = auto()
|
||||||
|
|
||||||
|
class MusicManager:
|
||||||
|
def __init__(self, update_callback=None):
|
||||||
|
self.spotify = None
|
||||||
|
self.ytm = None
|
||||||
|
self.current_track_info = None
|
||||||
|
self.current_source = MusicSource.NONE
|
||||||
|
self.update_callback = update_callback
|
||||||
|
self.polling_interval = 2 # Default
|
||||||
|
self.enabled = False # Default
|
||||||
|
self.preferred_source = "auto" # Default
|
||||||
|
self.stop_event = threading.Event()
|
||||||
|
self._load_config() # Load config first
|
||||||
|
self._initialize_clients() # Initialize based on loaded config
|
||||||
|
self.poll_thread = None
|
||||||
|
|
||||||
|
def _load_config(self):
|
||||||
|
default_interval = 2
|
||||||
|
default_preferred_source = "auto"
|
||||||
|
self.enabled = False # Assume disabled until config proves otherwise
|
||||||
|
|
||||||
|
if not os.path.exists(CONFIG_PATH):
|
||||||
|
logging.warning(f"Config file not found at {CONFIG_PATH}. Music manager disabled.")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(CONFIG_PATH, 'r') as f:
|
||||||
|
config_data = json.load(f)
|
||||||
|
music_config = config_data.get("music", {})
|
||||||
|
|
||||||
|
self.enabled = music_config.get("enabled", False)
|
||||||
|
self.polling_interval = music_config.get("POLLING_INTERVAL_SECONDS", default_interval)
|
||||||
|
self.preferred_source = music_config.get("preferred_source", default_preferred_source).lower()
|
||||||
|
|
||||||
|
if not self.enabled:
|
||||||
|
logging.info("Music manager is disabled in config.json.")
|
||||||
|
return # Don't proceed further if disabled
|
||||||
|
|
||||||
|
logging.info(f"Music manager enabled. Polling interval: {self.polling_interval}s. Preferred source: {self.preferred_source}")
|
||||||
|
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logging.error(f"Error decoding JSON from {CONFIG_PATH}. Music manager disabled.")
|
||||||
|
self.enabled = False
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error loading music config: {e}. Music manager disabled.")
|
||||||
|
self.enabled = False
|
||||||
|
|
||||||
|
def _initialize_clients(self):
|
||||||
|
# Only initialize if the manager is enabled
|
||||||
|
if not self.enabled:
|
||||||
|
self.spotify = None
|
||||||
|
self.ytm = None
|
||||||
|
return
|
||||||
|
|
||||||
|
logging.info("Initializing music clients...")
|
||||||
|
|
||||||
|
# Initialize Spotify Client if needed
|
||||||
|
if self.preferred_source in ["auto", "spotify"]:
|
||||||
|
try:
|
||||||
|
self.spotify = SpotifyClient()
|
||||||
|
if not self.spotify.is_authenticated():
|
||||||
|
logging.warning("Spotify client initialized but not authenticated.")
|
||||||
|
# We still might need manual intervention by the user based on console output
|
||||||
|
auth_url = self.spotify.get_auth_url()
|
||||||
|
if auth_url:
|
||||||
|
print(f"---> Spotify requires authorization. Please visit: {auth_url}")
|
||||||
|
print("---> After authorizing, restart the application.")
|
||||||
|
else:
|
||||||
|
print("---> Could not get Spotify auth URL. Check config/config_secrets.json")
|
||||||
|
else:
|
||||||
|
logging.info("Spotify client authenticated.")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to initialize Spotify client: {e}")
|
||||||
|
self.spotify = None
|
||||||
|
else:
|
||||||
|
logging.info("Spotify client initialization skipped due to preferred_source setting.")
|
||||||
|
self.spotify = None
|
||||||
|
|
||||||
|
# Initialize YTM Client if needed
|
||||||
|
if self.preferred_source in ["auto", "ytm"]:
|
||||||
|
try:
|
||||||
|
self.ytm = YTMClient()
|
||||||
|
if not self.ytm.is_available():
|
||||||
|
logging.warning(f"YTM Companion server not reachable at {self.ytm.base_url}. YTM features disabled.")
|
||||||
|
self.ytm = None
|
||||||
|
else:
|
||||||
|
logging.info(f"YTM Companion server connected at {self.ytm.base_url}.")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Failed to initialize YTM client: {e}")
|
||||||
|
self.ytm = None
|
||||||
|
else:
|
||||||
|
logging.info("YTM client initialization skipped due to preferred_source setting.")
|
||||||
|
self.ytm = None
|
||||||
|
|
||||||
|
def _poll_music_data(self):
|
||||||
|
"""Continuously polls music sources for updates, respecting preferences."""
|
||||||
|
if not self.enabled:
|
||||||
|
logging.warning("Polling attempted while music manager is disabled. Stopping polling thread.")
|
||||||
|
return # Should not happen if start_polling checks enabled, but safety check
|
||||||
|
|
||||||
|
while not self.stop_event.is_set():
|
||||||
|
polled_track_info = None
|
||||||
|
polled_source = MusicSource.NONE
|
||||||
|
is_playing = False
|
||||||
|
|
||||||
|
# Determine which sources to poll based on preference
|
||||||
|
poll_spotify = self.preferred_source in ["auto", "spotify"] and self.spotify and self.spotify.is_authenticated()
|
||||||
|
poll_ytm = self.preferred_source in ["auto", "ytm"] and self.ytm # Check if ytm object exists
|
||||||
|
|
||||||
|
# --- Try Spotify First (if allowed and available) ---
|
||||||
|
if poll_spotify:
|
||||||
|
try:
|
||||||
|
spotify_track = self.spotify.get_current_track()
|
||||||
|
if spotify_track and spotify_track.get('is_playing'):
|
||||||
|
polled_track_info = spotify_track
|
||||||
|
polled_source = MusicSource.SPOTIFY
|
||||||
|
is_playing = True
|
||||||
|
logging.debug(f"Polling Spotify: Active track - {spotify_track.get('item', {}).get('name')}")
|
||||||
|
else:
|
||||||
|
logging.debug("Polling Spotify: No active track or player paused.")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error polling Spotify: {e}")
|
||||||
|
if "token" in str(e).lower():
|
||||||
|
logging.warning("Spotify auth token issue detected during polling.")
|
||||||
|
|
||||||
|
# --- Try YTM if Spotify isn't playing OR if YTM is preferred ---
|
||||||
|
# If YTM is preferred, poll it even if Spotify might be playing (config override)
|
||||||
|
# If Auto, only poll YTM if Spotify wasn't found playing
|
||||||
|
should_poll_ytm_now = poll_ytm and (self.preferred_source == "ytm" or (self.preferred_source == "auto" and not is_playing))
|
||||||
|
|
||||||
|
if should_poll_ytm_now:
|
||||||
|
# Re-check availability just before polling
|
||||||
|
if self.ytm.is_available():
|
||||||
|
try:
|
||||||
|
ytm_track = self.ytm.get_current_track()
|
||||||
|
if ytm_track and not ytm_track.get('player', {}).get('isPaused'):
|
||||||
|
# If YTM is preferred, it overrides Spotify even if Spotify was playing
|
||||||
|
if self.preferred_source == "ytm" or not is_playing:
|
||||||
|
polled_track_info = ytm_track
|
||||||
|
polled_source = MusicSource.YTM
|
||||||
|
is_playing = True
|
||||||
|
logging.debug(f"Polling YTM: Active track - {ytm_track.get('track', {}).get('title')}")
|
||||||
|
else:
|
||||||
|
logging.debug("Polling YTM: No active track or player paused.")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error polling YTM: {e}")
|
||||||
|
else:
|
||||||
|
logging.debug("Skipping YTM poll: Server not available.")
|
||||||
|
# Consider setting self.ytm = None if it becomes unavailable repeatedly?
|
||||||
|
|
||||||
|
# --- Consolidate and Check for Changes ---
|
||||||
|
simplified_info = self.get_simplified_track_info(polled_track_info, polled_source)
|
||||||
|
current_simplified_info = self.get_simplified_track_info(self.current_track_info, self.current_source)
|
||||||
|
|
||||||
|
has_changed = False
|
||||||
|
if simplified_info != current_simplified_info:
|
||||||
|
has_changed = True
|
||||||
|
self.current_track_info = polled_track_info
|
||||||
|
self.current_source = polled_source
|
||||||
|
display_title = simplified_info.get('title', 'None') if simplified_info else 'None'
|
||||||
|
logging.info(f"Track change detected. Source: {self.current_source.name}. Track: {display_title}")
|
||||||
|
else:
|
||||||
|
logging.debug("No change in simplified track info.")
|
||||||
|
|
||||||
|
if has_changed and self.update_callback:
|
||||||
|
try:
|
||||||
|
self.update_callback(simplified_info)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error executing update callback: {e}")
|
||||||
|
|
||||||
|
time.sleep(self.polling_interval)
|
||||||
|
|
||||||
|
# Modified to accept data and source, making it more testable/reusable
|
||||||
|
def get_simplified_track_info(self, track_data, source):
|
||||||
|
"""Provides a consistent format for track info regardless of source."""
|
||||||
|
if source == MusicSource.SPOTIFY and track_data:
|
||||||
|
item = track_data.get('item', {})
|
||||||
|
if not item: return None
|
||||||
|
return {
|
||||||
|
'source': 'Spotify',
|
||||||
|
'title': item.get('name'),
|
||||||
|
'artist': ', '.join([a['name'] for a in item.get('artists', [])]),
|
||||||
|
'album': item.get('album', {}).get('name'),
|
||||||
|
'album_art_url': item.get('album', {}).get('images', [{}])[0].get('url') if item.get('album', {}).get('images') else None,
|
||||||
|
'duration_ms': item.get('duration_ms'),
|
||||||
|
'progress_ms': track_data.get('progress_ms'),
|
||||||
|
'is_playing': track_data.get('is_playing', False),
|
||||||
|
}
|
||||||
|
elif source == MusicSource.YTM and track_data:
|
||||||
|
track = track_data.get('track', {})
|
||||||
|
player = track_data.get('player', {})
|
||||||
|
duration_sec = track.get('durationSeconds') # YTM specific field
|
||||||
|
duration_ms = int(duration_sec * 1000) if duration_sec is not None else player.get('duration') # Fallback if needed
|
||||||
|
progress_ms = player.get('trackState') # Simplified access if structure is consistent
|
||||||
|
|
||||||
|
# Refined YTM data extraction
|
||||||
|
duration_sec_alt = player.get('trackState', {}).get('duration') # Alternative location
|
||||||
|
if duration_ms is None and duration_sec_alt is not None:
|
||||||
|
duration_ms = int(duration_sec_alt * 1000)
|
||||||
|
|
||||||
|
progress_sec = player.get('trackState', {}).get('currentTime')
|
||||||
|
progress_ms = int(progress_sec * 1000) if progress_sec is not None else None
|
||||||
|
|
||||||
|
return {
|
||||||
|
'source': 'YouTube Music',
|
||||||
|
'title': track.get('title'),
|
||||||
|
'artist': track.get('author'),
|
||||||
|
'album': track.get('album'),
|
||||||
|
'album_art_url': track.get('cover'),
|
||||||
|
'duration_ms': duration_ms,
|
||||||
|
'progress_ms': progress_ms,
|
||||||
|
'is_playing': not player.get('isPaused', True),
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# Return a default structure for 'nothing playing'
|
||||||
|
return {
|
||||||
|
'source': 'None',
|
||||||
|
'title': 'Nothing Playing',
|
||||||
|
'artist': '',
|
||||||
|
'album': '',
|
||||||
|
'album_art_url': None,
|
||||||
|
'duration_ms': 0,
|
||||||
|
'progress_ms': 0,
|
||||||
|
'is_playing': False,
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_current_display_info(self):
|
||||||
|
"""Returns the latest simplified info for display purposes."""
|
||||||
|
# Return default "Nothing Playing" state if manager is disabled
|
||||||
|
if not self.enabled:
|
||||||
|
return self.get_simplified_track_info(None, MusicSource.NONE)
|
||||||
|
return self.get_simplified_track_info(self.current_track_info, self.current_source)
|
||||||
|
|
||||||
|
def start_polling(self):
|
||||||
|
# Only start polling if enabled
|
||||||
|
if not self.enabled:
|
||||||
|
logging.info("Music manager disabled, polling not started.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self.poll_thread or not self.poll_thread.is_alive():
|
||||||
|
# Ensure at least one client is potentially available
|
||||||
|
if not self.spotify and not self.ytm:
|
||||||
|
logging.warning("Cannot start polling: No music clients initialized or available.")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.stop_event.clear()
|
||||||
|
self.poll_thread = threading.Thread(target=self._poll_music_data, daemon=True)
|
||||||
|
self.poll_thread.start()
|
||||||
|
logging.info("Music polling started.")
|
||||||
|
|
||||||
|
def stop_polling(self):
|
||||||
|
self.stop_event.set()
|
||||||
|
if self.poll_thread and self.poll_thread.is_alive():
|
||||||
|
self.poll_thread.join() # Wait for thread to finish
|
||||||
|
logging.info("Music polling stopped.")
|
||||||
|
|
||||||
|
# Example Usage (for testing)
|
||||||
|
if __name__ == '__main__':
|
||||||
|
def print_update(track_info):
|
||||||
|
print("-" * 20)
|
||||||
|
if track_info and track_info['source'] != 'None':
|
||||||
|
print(f"Source: {track_info.get('source')}")
|
||||||
|
print(f"Title: {track_info.get('title')}")
|
||||||
|
print(f"Artist: {track_info.get('artist')}")
|
||||||
|
print(f"Album: {track_info.get('album')}")
|
||||||
|
print(f"Playing: {track_info.get('is_playing')}")
|
||||||
|
print(f"Duration: {track_info.get('duration_ms')} ms")
|
||||||
|
print(f"Progress: {track_info.get('progress_ms')} ms")
|
||||||
|
print(f"Art URL: {track_info.get('album_art_url')}")
|
||||||
|
else:
|
||||||
|
print("Nothing playing or update is None.")
|
||||||
|
print("-" * 20)
|
||||||
|
|
||||||
|
manager = MusicManager(update_callback=print_update)
|
||||||
|
manager.start_polling()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Keep the main thread alive to allow polling thread to run
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Stopping polling...")
|
||||||
|
manager.stop_polling()
|
||||||
|
print("Exiting.")
|
||||||
132
src/spotify_client.py
Normal file
132
src/spotify_client.py
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
import spotipy
|
||||||
|
from spotipy.oauth2 import SpotifyOAuth
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
# Define paths relative to this file's location
|
||||||
|
CONFIG_DIR = os.path.join(os.path.dirname(__file__), '..', 'config')
|
||||||
|
SECRETS_PATH = os.path.join(CONFIG_DIR, 'config_secrets.json')
|
||||||
|
|
||||||
|
class SpotifyClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.client_id = None
|
||||||
|
self.client_secret = None
|
||||||
|
self.redirect_uri = None
|
||||||
|
self.scope = "user-read-currently-playing user-read-playback-state"
|
||||||
|
self.sp = None
|
||||||
|
self.load_credentials()
|
||||||
|
if self.client_id and self.client_secret and self.redirect_uri:
|
||||||
|
self._authenticate()
|
||||||
|
else:
|
||||||
|
logging.warning("Spotify credentials not loaded. Cannot authenticate.")
|
||||||
|
|
||||||
|
|
||||||
|
def load_credentials(self):
|
||||||
|
if not os.path.exists(SECRETS_PATH):
|
||||||
|
logging.error(f"Secrets file not found at {SECRETS_PATH}")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(SECRETS_PATH, 'r') as f:
|
||||||
|
secrets = json.load(f)
|
||||||
|
music_secrets = secrets.get("music", {})
|
||||||
|
self.client_id = music_secrets.get("SPOTIFY_CLIENT_ID")
|
||||||
|
self.client_secret = music_secrets.get("SPOTIFY_CLIENT_SECRET")
|
||||||
|
self.redirect_uri = music_secrets.get("SPOTIFY_REDIRECT_URI")
|
||||||
|
if not all([self.client_id, self.client_secret, self.redirect_uri]):
|
||||||
|
logging.warning("One or more Spotify credentials missing in config_secrets.json under the 'music' key.")
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logging.error(f"Error decoding JSON from {SECRETS_PATH}")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error loading Spotify credentials: {e}")
|
||||||
|
|
||||||
|
def _authenticate(self):
|
||||||
|
"""Handles the OAuth authentication flow."""
|
||||||
|
try:
|
||||||
|
# Spotipy handles token caching in .cache file by default
|
||||||
|
self.sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
|
||||||
|
client_id=self.client_id,
|
||||||
|
client_secret=self.client_secret,
|
||||||
|
redirect_uri=self.redirect_uri,
|
||||||
|
scope=self.scope,
|
||||||
|
open_browser=False # Important for headless environments
|
||||||
|
))
|
||||||
|
# Try making a call to ensure authentication is working or trigger refresh
|
||||||
|
self.sp.current_user()
|
||||||
|
logging.info("Spotify authenticated successfully.")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Spotify authentication failed: {e}")
|
||||||
|
self.sp = None # Ensure sp is None if auth fails
|
||||||
|
|
||||||
|
def is_authenticated(self):
|
||||||
|
"""Checks if the client is authenticated."""
|
||||||
|
# Check if sp object exists and try a lightweight API call
|
||||||
|
if not self.sp:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
# A simple call to verify token validity
|
||||||
|
self.sp.current_user()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
# Log specific auth errors if needed
|
||||||
|
logging.warning(f"Spotify token validation failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_auth_url(self):
|
||||||
|
"""Gets the authorization URL for the user."""
|
||||||
|
# Create a temporary auth manager just to get the URL
|
||||||
|
try:
|
||||||
|
auth_manager = SpotifyOAuth(
|
||||||
|
client_id=self.client_id,
|
||||||
|
client_secret=self.client_secret,
|
||||||
|
redirect_uri=self.redirect_uri,
|
||||||
|
scope=self.scope,
|
||||||
|
open_browser=False
|
||||||
|
)
|
||||||
|
return auth_manager.get_authorize_url()
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Could not get Spotify auth URL: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_current_track(self):
|
||||||
|
"""Fetches the currently playing track from Spotify."""
|
||||||
|
if not self.is_authenticated():
|
||||||
|
logging.warning("Spotify not authenticated. Cannot fetch track.")
|
||||||
|
# Maybe try re-authenticating?
|
||||||
|
self._authenticate()
|
||||||
|
if not self.is_authenticated():
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
track_info = self.sp.current_playback()
|
||||||
|
if track_info and track_info['item']:
|
||||||
|
# Simplify structure slightly if needed, or return raw
|
||||||
|
return track_info
|
||||||
|
else:
|
||||||
|
return None # Nothing playing or unavailable
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error fetching current track from Spotify: {e}")
|
||||||
|
# Check for specific errors like token expiration if spotipy doesn't handle it
|
||||||
|
if "expired" in str(e).lower():
|
||||||
|
logging.info("Spotify token might be expired, attempting refresh...")
|
||||||
|
self._authenticate() # Try to refresh/re-authenticate
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Example Usage (for testing)
|
||||||
|
# if __name__ == '__main__':
|
||||||
|
# client = SpotifyClient()
|
||||||
|
# if client.is_authenticated():
|
||||||
|
# track = client.get_current_track()
|
||||||
|
# if track:
|
||||||
|
# print(json.dumps(track, indent=2))
|
||||||
|
# else:
|
||||||
|
# print("No track currently playing or error fetching.")
|
||||||
|
# else:
|
||||||
|
# auth_url = client.get_auth_url()
|
||||||
|
# if auth_url:
|
||||||
|
# print(f"Please authorize here: {auth_url}")
|
||||||
|
# else:
|
||||||
|
# print("Could not authenticate or get auth URL. Check credentials and config.")
|
||||||
89
src/ytm_client.py
Normal file
89
src/ytm_client.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
import requests
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
|
||||||
|
# Define paths relative to this file's location
|
||||||
|
CONFIG_DIR = os.path.join(os.path.dirname(__file__), '..', 'config')
|
||||||
|
CONFIG_PATH = os.path.join(CONFIG_DIR, 'config.json')
|
||||||
|
|
||||||
|
class YTMClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.base_url = None
|
||||||
|
self.load_config()
|
||||||
|
|
||||||
|
def load_config(self):
|
||||||
|
default_url = "http://localhost:9863"
|
||||||
|
if not os.path.exists(CONFIG_PATH):
|
||||||
|
logging.error(f"Config file not found at {CONFIG_PATH}")
|
||||||
|
self.base_url = default_url
|
||||||
|
logging.warning(f"Using default YTM URL: {self.base_url}")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(CONFIG_PATH, 'r') as f:
|
||||||
|
config_data = json.load(f)
|
||||||
|
music_config = config_data.get("music", {})
|
||||||
|
self.base_url = music_config.get("YTM_COMPANION_URL", default_url)
|
||||||
|
if not self.base_url:
|
||||||
|
logging.warning("YTM_COMPANION_URL missing or empty in config.json music section, using default.")
|
||||||
|
self.base_url = default_url
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logging.error(f"Error decoding JSON from {CONFIG_PATH}")
|
||||||
|
self.base_url = default_url
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Error loading YTM config: {e}")
|
||||||
|
self.base_url = default_url
|
||||||
|
logging.info(f"YTM Companion URL set to: {self.base_url}")
|
||||||
|
|
||||||
|
def _make_request(self, endpoint):
|
||||||
|
"""Helper method to make requests to the companion server."""
|
||||||
|
if not self.base_url:
|
||||||
|
logging.error("YTM base URL not configured.")
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||||
|
response = requests.get(url, timeout=1) # Short timeout
|
||||||
|
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
|
||||||
|
return response.json()
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
# This is expected if the server isn't running
|
||||||
|
logging.debug(f"Could not connect to YTM Companion server at {self.base_url}")
|
||||||
|
return None
|
||||||
|
except requests.exceptions.Timeout:
|
||||||
|
logging.warning(f"Timeout connecting to YTM Companion server at {self.base_url}")
|
||||||
|
return None
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logging.error(f"Error requesting {endpoint} from YTM: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def is_available(self):
|
||||||
|
"""Checks if the YTM companion server is reachable."""
|
||||||
|
# Use a lightweight endpoint if available, otherwise try main query
|
||||||
|
# For now, just try the main query endpoint
|
||||||
|
return self._make_request('/query') is not None
|
||||||
|
|
||||||
|
def get_current_track(self):
|
||||||
|
"""Fetches the currently playing track from the YTM companion server."""
|
||||||
|
data = self._make_request('/query')
|
||||||
|
# Add more specific error handling or data validation if needed
|
||||||
|
if data and 'track' in data and 'player' in data:
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
logging.debug("Received no or incomplete data from YTM /query")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Example Usage (for testing)
|
||||||
|
# if __name__ == '__main__':
|
||||||
|
# client = YTMClient()
|
||||||
|
# if client.is_available():
|
||||||
|
# print("YTM Server is available.")
|
||||||
|
# track = client.get_current_track()
|
||||||
|
# if track:
|
||||||
|
# print(json.dumps(track, indent=2))
|
||||||
|
# else:
|
||||||
|
# print("No track currently playing or error fetching.")
|
||||||
|
# else:
|
||||||
|
# print(f"YTM Server not available at {client.base_url}. Is YTMD running with companion server enabled?")
|
||||||
Reference in New Issue
Block a user