From cb8680e834b6aca5d40b588ae78b8b75168d14f1 Mon Sep 17 00:00:00 2001 From: Chuck <33324927+ChuckBuilds@users.noreply.github.com> Date: Tue, 8 Apr 2025 21:31:04 -0500 Subject: [PATCH] Update stock_manager.py parse yahoo scripts --- src/stock_manager.py | 124 +++++++++++++++++++++++++++++++++---------- 1 file changed, 97 insertions(+), 27 deletions(-) diff --git a/src/stock_manager.py b/src/stock_manager.py index c151783f..5f841b6d 100644 --- a/src/stock_manager.py +++ b/src/stock_manager.py @@ -6,6 +6,8 @@ import random from typing import Dict, Any, List, Tuple from datetime import datetime import os +import urllib.parse +import re # Configure logging logging.basicConfig(level=logging.INFO) @@ -19,7 +21,6 @@ class StockManager: self.last_update = 0 self.stock_data = {} self.current_stock_index = 0 - self.base_url = "https://query1.finance.yahoo.com" self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } @@ -36,40 +37,104 @@ class StockManager: return (255, 0, 0) # Red for negative return (255, 255, 0) # Yellow for no change - def _fetch_stock_data(self, symbol: str) -> Dict[str, Any]: - """Fetch stock data from Yahoo Finance API.""" + def _extract_json_from_html(self, html: str) -> Dict: + """Extract the JSON data from Yahoo Finance HTML.""" try: - # Use Yahoo Finance quote endpoint - url = f"{self.base_url}/v7/finance/quote" - params = { - "symbols": symbol - } + # Look for the finance data in the HTML + patterns = [ + r'root\.App\.main = (.*?);\s*', + r'"QuotePageStore":\s*({.*?}),\s*"', + r'{"regularMarketPrice":.*?"regularMarketChangePercent".*?}' + ] - response = requests.get(url, params=params, headers=self.headers) - response.raise_for_status() # Raise an error for bad status codes + for pattern in patterns: + match = re.search(pattern, html, re.DOTALL) + if match: + json_str = match.group(1) + try: + data = json.loads(json_str) + if isinstance(data, dict): + if 'context' in data: + # First pattern matched + context = data.get('context', {}) + dispatcher = context.get('dispatcher', {}) + stores = dispatcher.get('stores', {}) + quote_data = stores.get('QuoteSummaryStore', {}) + if quote_data: + return quote_data + else: + # Direct quote data + return data + except json.JSONDecodeError: + continue + + # If we get here, try one last attempt to find the price data directly + price_match = re.search(r'"regularMarketPrice":{"raw":([\d.]+)', html) + change_match = re.search(r'"regularMarketChangePercent":{"raw":([-\d.]+)', html) + prev_close_match = re.search(r'"regularMarketPreviousClose":{"raw":([\d.]+)', html) + name_match = re.search(r'"shortName":"([^"]+)"', html) - data = response.json() - logger.debug(f"Raw response for {symbol}: {data}") + if price_match: + return { + "price": { + "regularMarketPrice": {"raw": float(price_match.group(1))}, + "regularMarketChangePercent": {"raw": float(change_match.group(1)) if change_match else 0}, + "regularMarketPreviousClose": {"raw": float(prev_close_match.group(1)) if prev_close_match else 0}, + "shortName": name_match.group(1) if name_match else None + } + } - if not data or "quoteResponse" not in data or "result" not in data["quoteResponse"] or not data["quoteResponse"]["result"]: - logger.error(f"Invalid response format for {symbol}") + return {} + except Exception as e: + logger.error(f"Error extracting JSON data: {e}") + return {} + + def _fetch_stock_data(self, symbol: str) -> Dict[str, Any]: + """Fetch stock data from Yahoo Finance public API.""" + try: + # Use Yahoo Finance public API + encoded_symbol = urllib.parse.quote(symbol) + url = f"https://finance.yahoo.com/quote/{encoded_symbol}" + + response = requests.get(url, headers=self.headers, timeout=5) + if response.status_code != 200: + logger.error(f"Failed to fetch data for {symbol}: HTTP {response.status_code}") return None + + # Extract the embedded JSON data + quote_data = self._extract_json_from_html(response.text) + if not quote_data: + logger.error(f"Could not extract quote data for {symbol}") + return None + + # Get the price data + price = quote_data.get('price', {}) + if not price: + logger.error(f"No price data found for {symbol}") + return None + + regular_market = price.get('regularMarketPrice', {}) + previous_close = price.get('regularMarketPreviousClose', {}) + change_percent = price.get('regularMarketChangePercent', {}) - quote = data["quoteResponse"]["result"][0] + # Extract raw values with fallbacks + current_price = regular_market.get('raw', 0) if isinstance(regular_market, dict) else regular_market + prev_close = previous_close.get('raw', current_price) if isinstance(previous_close, dict) else previous_close + change_pct = change_percent.get('raw', 0) if isinstance(change_percent, dict) else change_percent - # Extract required fields with fallbacks - price = quote.get("regularMarketPrice", 0) - prev_close = quote.get("regularMarketPreviousClose", price) - change_pct = quote.get("regularMarketChangePercent", 0) + # If we don't have a change percentage, calculate it + if change_pct == 0 and prev_close > 0: + change_pct = ((current_price - prev_close) / prev_close) * 100 - # If we didn't get a change percentage, calculate it - if change_pct == 0 and prev_close != 0: - change_pct = ((price - prev_close) / prev_close) * 100 + # Get company name + name = price.get('shortName', symbol) + + logger.debug(f"Processed data for {symbol}: price={current_price}, change={change_pct}%") return { "symbol": symbol, - "name": quote.get("longName", symbol), - "price": price, + "name": name, + "price": current_price, "change": change_pct, "open": prev_close } @@ -77,8 +142,8 @@ class StockManager: except requests.exceptions.RequestException as e: logger.error(f"Network error fetching data for {symbol}: {e}") return None - except json.JSONDecodeError as e: - logger.error(f"JSON decode error for {symbol}: {e}") + except (ValueError, IndexError) as e: + logger.error(f"Error parsing data for {symbol}: {e}") return None except Exception as e: logger.error(f"Unexpected error fetching data for {symbol}: {e}") @@ -87,7 +152,10 @@ class StockManager: def update_stock_data(self): """Update stock data if enough time has passed.""" current_time = time.time() - if current_time - self.last_update < self.stocks_config.get('update_interval', 60): + update_interval = self.stocks_config.get('update_interval', 60) + + # Add a small random delay to prevent exact timing matches + if current_time - self.last_update < update_interval + random.uniform(0, 2): return # Get symbols from config @@ -103,6 +171,8 @@ class StockManager: success = False # Track if we got any successful updates for stock in symbols: symbol = stock['symbol'] + # Add a small delay between requests to avoid rate limiting + time.sleep(random.uniform(0.5, 1.5)) data = self._fetch_stock_data(symbol) if data: self.stock_data[symbol] = data