Update stock_manager.py

parse yahoo scripts
This commit is contained in:
Chuck
2025-04-08 21:31:04 -05:00
parent e87251caf4
commit cb8680e834

View File

@@ -6,6 +6,8 @@ import random
from typing import Dict, Any, List, Tuple from typing import Dict, Any, List, Tuple
from datetime import datetime from datetime import datetime
import os import os
import urllib.parse
import re
# Configure logging # Configure logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -19,7 +21,6 @@ class StockManager:
self.last_update = 0 self.last_update = 0
self.stock_data = {} self.stock_data = {}
self.current_stock_index = 0 self.current_stock_index = 0
self.base_url = "https://query1.finance.yahoo.com"
self.headers = { self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
} }
@@ -36,40 +37,104 @@ class StockManager:
return (255, 0, 0) # Red for negative return (255, 0, 0) # Red for negative
return (255, 255, 0) # Yellow for no change return (255, 255, 0) # Yellow for no change
def _fetch_stock_data(self, symbol: str) -> Dict[str, Any]: def _extract_json_from_html(self, html: str) -> Dict:
"""Fetch stock data from Yahoo Finance API.""" """Extract the JSON data from Yahoo Finance HTML."""
try: try:
# Use Yahoo Finance quote endpoint # Look for the finance data in the HTML
url = f"{self.base_url}/v7/finance/quote" patterns = [
params = { r'root\.App\.main = (.*?);\s*</script>',
"symbols": symbol r'"QuotePageStore":\s*({.*?}),\s*"',
} r'{"regularMarketPrice":.*?"regularMarketChangePercent".*?}'
]
response = requests.get(url, params=params, headers=self.headers) for pattern in patterns:
response.raise_for_status() # Raise an error for bad status codes match = re.search(pattern, html, re.DOTALL)
if match:
json_str = match.group(1)
try:
data = json.loads(json_str)
if isinstance(data, dict):
if 'context' in data:
# First pattern matched
context = data.get('context', {})
dispatcher = context.get('dispatcher', {})
stores = dispatcher.get('stores', {})
quote_data = stores.get('QuoteSummaryStore', {})
if quote_data:
return quote_data
else:
# Direct quote data
return data
except json.JSONDecodeError:
continue
# If we get here, try one last attempt to find the price data directly
price_match = re.search(r'"regularMarketPrice":{"raw":([\d.]+)', html)
change_match = re.search(r'"regularMarketChangePercent":{"raw":([-\d.]+)', html)
prev_close_match = re.search(r'"regularMarketPreviousClose":{"raw":([\d.]+)', html)
name_match = re.search(r'"shortName":"([^"]+)"', html)
data = response.json() if price_match:
logger.debug(f"Raw response for {symbol}: {data}") return {
"price": {
"regularMarketPrice": {"raw": float(price_match.group(1))},
"regularMarketChangePercent": {"raw": float(change_match.group(1)) if change_match else 0},
"regularMarketPreviousClose": {"raw": float(prev_close_match.group(1)) if prev_close_match else 0},
"shortName": name_match.group(1) if name_match else None
}
}
if not data or "quoteResponse" not in data or "result" not in data["quoteResponse"] or not data["quoteResponse"]["result"]: return {}
logger.error(f"Invalid response format for {symbol}") except Exception as e:
logger.error(f"Error extracting JSON data: {e}")
return {}
def _fetch_stock_data(self, symbol: str) -> Dict[str, Any]:
"""Fetch stock data from Yahoo Finance public API."""
try:
# Use Yahoo Finance public API
encoded_symbol = urllib.parse.quote(symbol)
url = f"https://finance.yahoo.com/quote/{encoded_symbol}"
response = requests.get(url, headers=self.headers, timeout=5)
if response.status_code != 200:
logger.error(f"Failed to fetch data for {symbol}: HTTP {response.status_code}")
return None return None
# Extract the embedded JSON data
quote_data = self._extract_json_from_html(response.text)
if not quote_data:
logger.error(f"Could not extract quote data for {symbol}")
return None
# Get the price data
price = quote_data.get('price', {})
if not price:
logger.error(f"No price data found for {symbol}")
return None
regular_market = price.get('regularMarketPrice', {})
previous_close = price.get('regularMarketPreviousClose', {})
change_percent = price.get('regularMarketChangePercent', {})
quote = data["quoteResponse"]["result"][0] # Extract raw values with fallbacks
current_price = regular_market.get('raw', 0) if isinstance(regular_market, dict) else regular_market
prev_close = previous_close.get('raw', current_price) if isinstance(previous_close, dict) else previous_close
change_pct = change_percent.get('raw', 0) if isinstance(change_percent, dict) else change_percent
# Extract required fields with fallbacks # If we don't have a change percentage, calculate it
price = quote.get("regularMarketPrice", 0) if change_pct == 0 and prev_close > 0:
prev_close = quote.get("regularMarketPreviousClose", price) change_pct = ((current_price - prev_close) / prev_close) * 100
change_pct = quote.get("regularMarketChangePercent", 0)
# If we didn't get a change percentage, calculate it # Get company name
if change_pct == 0 and prev_close != 0: name = price.get('shortName', symbol)
change_pct = ((price - prev_close) / prev_close) * 100
logger.debug(f"Processed data for {symbol}: price={current_price}, change={change_pct}%")
return { return {
"symbol": symbol, "symbol": symbol,
"name": quote.get("longName", symbol), "name": name,
"price": price, "price": current_price,
"change": change_pct, "change": change_pct,
"open": prev_close "open": prev_close
} }
@@ -77,8 +142,8 @@ class StockManager:
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"Network error fetching data for {symbol}: {e}") logger.error(f"Network error fetching data for {symbol}: {e}")
return None return None
except json.JSONDecodeError as e: except (ValueError, IndexError) as e:
logger.error(f"JSON decode error for {symbol}: {e}") logger.error(f"Error parsing data for {symbol}: {e}")
return None return None
except Exception as e: except Exception as e:
logger.error(f"Unexpected error fetching data for {symbol}: {e}") logger.error(f"Unexpected error fetching data for {symbol}: {e}")
@@ -87,7 +152,10 @@ class StockManager:
def update_stock_data(self): def update_stock_data(self):
"""Update stock data if enough time has passed.""" """Update stock data if enough time has passed."""
current_time = time.time() current_time = time.time()
if current_time - self.last_update < self.stocks_config.get('update_interval', 60): update_interval = self.stocks_config.get('update_interval', 60)
# Add a small random delay to prevent exact timing matches
if current_time - self.last_update < update_interval + random.uniform(0, 2):
return return
# Get symbols from config # Get symbols from config
@@ -103,6 +171,8 @@ class StockManager:
success = False # Track if we got any successful updates success = False # Track if we got any successful updates
for stock in symbols: for stock in symbols:
symbol = stock['symbol'] symbol = stock['symbol']
# Add a small delay between requests to avoid rate limiting
time.sleep(random.uniform(0.5, 1.5))
data = self._fetch_stock_data(symbol) data = self._fetch_stock_data(symbol)
if data: if data:
self.stock_data[symbol] = data self.stock_data[symbol] = data