|
|
"""
|
|
|
OHLC Data Background Worker - REAL DATA FROM MULTIPLE FREE APIs
|
|
|
|
|
|
CRITICAL RULES:
|
|
|
- MUST fetch REAL candlestick data from multiple sources with automatic fallback
|
|
|
- MUST store actual OHLC values, not fake data
|
|
|
- MUST use actual timestamps from API responses
|
|
|
- NEVER generate or interpolate candles
|
|
|
- If primary API fails, automatically try alternative sources
|
|
|
|
|
|
SUPPORTED DATA SOURCES (in priority order):
|
|
|
1. CoinGecko (FREE, no API key, 365-day history)
|
|
|
2. Kraken (FREE, no API key, up to 720 candles)
|
|
|
3. Coinbase Pro (FREE, no API key, up to 300 candles)
|
|
|
4. Binance (FREE, but may be geo-restricted in some regions)
|
|
|
5. CoinPaprika (FREE, no API key, 366-day history)
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import time
|
|
|
import logging
|
|
|
import os
|
|
|
from datetime import datetime
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
import httpx
|
|
|
|
|
|
from database.cache_queries import get_cache_queries
|
|
|
from database.db_manager import db_manager
|
|
|
from utils.logger import setup_logger
|
|
|
|
|
|
logger = setup_logger("ohlc_worker")
|
|
|
|
|
|
|
|
|
cache = get_cache_queries(db_manager)
|
|
|
|
|
|
|
|
|
HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN"))
|
|
|
if HF_UPLOAD_ENABLED:
|
|
|
try:
|
|
|
from hf_dataset_uploader import get_dataset_uploader
|
|
|
hf_uploader = get_dataset_uploader()
|
|
|
logger.info("✅ HuggingFace Dataset upload ENABLED for OHLC data")
|
|
|
except Exception as e:
|
|
|
logger.warning(f"HuggingFace Dataset upload disabled: {e}")
|
|
|
HF_UPLOAD_ENABLED = False
|
|
|
hf_uploader = None
|
|
|
else:
|
|
|
logger.info("ℹ️ HuggingFace Dataset upload DISABLED (no HF_TOKEN)")
|
|
|
hf_uploader = None
|
|
|
|
|
|
|
|
|
SYMBOLS = ["BTC", "ETH", "BNB", "XRP", "ADA", "SOL", "DOT", "DOGE", "MATIC", "AVAX",
|
|
|
"LINK", "LTC", "UNI", "ALGO", "XLM", "ATOM", "TRX", "XMR", "ETC", "XTZ"]
|
|
|
|
|
|
|
|
|
INTERVALS = ["1h", "4h", "1d"]
|
|
|
|
|
|
|
|
|
SYMBOL_MAP = {
|
|
|
"coingecko": {
|
|
|
"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin", "XRP": "ripple",
|
|
|
"ADA": "cardano", "SOL": "solana", "DOT": "polkadot", "DOGE": "dogecoin",
|
|
|
"MATIC": "matic-network", "AVAX": "avalanche-2", "LINK": "chainlink",
|
|
|
"LTC": "litecoin", "UNI": "uniswap", "ALGO": "algorand", "XLM": "stellar",
|
|
|
"ATOM": "cosmos", "TRX": "tron", "XMR": "monero", "ETC": "ethereum-classic",
|
|
|
"XTZ": "tezos"
|
|
|
},
|
|
|
"kraken": {
|
|
|
"BTC": "XXBTZUSD", "ETH": "XETHZUSD", "XRP": "XXRPZUSD", "ADA": "ADAUSD",
|
|
|
"SOL": "SOLUSD", "DOT": "DOTUSD", "DOGE": "XDGUSD", "LINK": "LINKUSD",
|
|
|
"LTC": "XLTCZUSD", "UNI": "UNIUSD", "ALGO": "ALGOUSD", "XLM": "XXLMZUSD",
|
|
|
"ATOM": "ATOMUSD", "TRX": "TRXUSD", "ETC": "XETCZUSD", "XTZ": "XTZUSD"
|
|
|
},
|
|
|
"coinbase": {
|
|
|
"BTC": "BTC-USD", "ETH": "ETH-USD", "XRP": "XRP-USD", "ADA": "ADA-USD",
|
|
|
"SOL": "SOL-USD", "DOT": "DOT-USD", "DOGE": "DOGE-USD", "LINK": "LINK-USD",
|
|
|
"LTC": "LTC-USD", "UNI": "UNI-USD", "ALGO": "ALGO-USD", "XLM": "XLM-USD",
|
|
|
"ATOM": "ATOM-USD", "MATIC": "MATIC-USD", "AVAX": "AVAX-USD"
|
|
|
},
|
|
|
"binance": {
|
|
|
"BTC": "BTCUSDT", "ETH": "ETHUSDT", "BNB": "BNBUSDT", "XRP": "XRPUSDT",
|
|
|
"ADA": "ADAUSDT", "SOL": "SOLUSDT", "DOT": "DOTUSDT", "DOGE": "DOGEUSDT",
|
|
|
"MATIC": "MATICUSDT", "AVAX": "AVAXUSDT", "LINK": "LINKUSDT", "LTC": "LTCUSDT",
|
|
|
"UNI": "UNIUSDT", "ALGO": "ALGOUSDT", "XLM": "XLMUSDT", "ATOM": "ATOMUSDT",
|
|
|
"TRX": "TRXUSDT", "XMR": "XMRUSDT", "ETC": "ETCUSDT", "XTZ": "XTZUSDT"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
async def fetch_from_coingecko(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch OHLC data from CoinGecko (FREE, no API key required)
|
|
|
|
|
|
Args:
|
|
|
symbol: Base symbol (e.g., 'BTC')
|
|
|
interval: Interval (only '1d' supported by CoinGecko)
|
|
|
limit: Number of days to fetch (max 365)
|
|
|
|
|
|
Returns:
|
|
|
List of OHLC candles
|
|
|
"""
|
|
|
try:
|
|
|
coin_id = SYMBOL_MAP["coingecko"].get(symbol)
|
|
|
if not coin_id:
|
|
|
logger.debug(f"CoinGecko: No mapping for {symbol}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
if interval not in ["1d", "4h", "1h"]:
|
|
|
return []
|
|
|
|
|
|
|
|
|
days = min(limit if interval == "1d" else limit // 6 if interval == "4h" else limit // 24, 365)
|
|
|
|
|
|
url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/ohlc"
|
|
|
params = {"vs_currency": "usd", "days": days}
|
|
|
|
|
|
logger.debug(f"Fetching from CoinGecko: {coin_id} ({symbol})")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
if not data or not isinstance(data, list):
|
|
|
return []
|
|
|
|
|
|
ohlc_data = []
|
|
|
for candle in data:
|
|
|
try:
|
|
|
|
|
|
ohlc_data.append({
|
|
|
"symbol": symbol,
|
|
|
"interval": interval,
|
|
|
"timestamp": datetime.fromtimestamp(candle[0] / 1000),
|
|
|
"open": float(candle[1]),
|
|
|
"high": float(candle[2]),
|
|
|
"low": float(candle[3]),
|
|
|
"close": float(candle[4]),
|
|
|
"volume": 0.0,
|
|
|
"provider": "coingecko"
|
|
|
})
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error parsing CoinGecko candle: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.info(f"✅ CoinGecko: Fetched {len(ohlc_data)} candles for {symbol}")
|
|
|
return ohlc_data
|
|
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
|
logger.debug(f"CoinGecko HTTP error for {symbol}: {e.response.status_code}")
|
|
|
return []
|
|
|
except Exception as e:
|
|
|
logger.debug(f"CoinGecko error for {symbol}: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def fetch_from_kraken(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch OHLC data from Kraken (FREE, no API key required)
|
|
|
|
|
|
Args:
|
|
|
symbol: Base symbol (e.g., 'BTC')
|
|
|
interval: Interval
|
|
|
limit: Number of candles
|
|
|
|
|
|
Returns:
|
|
|
List of OHLC candles
|
|
|
"""
|
|
|
try:
|
|
|
pair = SYMBOL_MAP["kraken"].get(symbol)
|
|
|
if not pair:
|
|
|
logger.debug(f"Kraken: No mapping for {symbol}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
interval_map = {"1h": "60", "4h": "240", "1d": "1440"}
|
|
|
kraken_interval = interval_map.get(interval)
|
|
|
if not kraken_interval:
|
|
|
return []
|
|
|
|
|
|
url = "https://api.kraken.com/0/public/OHLC"
|
|
|
params = {"pair": pair, "interval": kraken_interval}
|
|
|
|
|
|
logger.debug(f"Fetching from Kraken: {pair} ({symbol})")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
if data.get("error") and len(data["error"]) > 0:
|
|
|
logger.debug(f"Kraken error for {symbol}: {data['error']}")
|
|
|
return []
|
|
|
|
|
|
result = data.get("result", {})
|
|
|
candles = result.get(pair, [])
|
|
|
|
|
|
if not candles:
|
|
|
return []
|
|
|
|
|
|
ohlc_data = []
|
|
|
for candle in candles[:limit]:
|
|
|
try:
|
|
|
|
|
|
ohlc_data.append({
|
|
|
"symbol": symbol,
|
|
|
"interval": interval,
|
|
|
"timestamp": datetime.fromtimestamp(int(candle[0])),
|
|
|
"open": float(candle[1]),
|
|
|
"high": float(candle[2]),
|
|
|
"low": float(candle[3]),
|
|
|
"close": float(candle[4]),
|
|
|
"volume": float(candle[6]),
|
|
|
"provider": "kraken"
|
|
|
})
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error parsing Kraken candle: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.info(f"✅ Kraken: Fetched {len(ohlc_data)} candles for {symbol}")
|
|
|
return ohlc_data
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Kraken error for {symbol}: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def fetch_from_coinbase(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch OHLC data from Coinbase Pro (FREE, no API key required)
|
|
|
|
|
|
Args:
|
|
|
symbol: Base symbol (e.g., 'BTC')
|
|
|
interval: Interval
|
|
|
limit: Number of candles (max 300)
|
|
|
|
|
|
Returns:
|
|
|
List of OHLC candles
|
|
|
"""
|
|
|
try:
|
|
|
pair = SYMBOL_MAP["coinbase"].get(symbol)
|
|
|
if not pair:
|
|
|
logger.debug(f"Coinbase: No mapping for {symbol}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
interval_map = {"1h": "3600", "4h": "21600", "1d": "86400"}
|
|
|
granularity = interval_map.get(interval)
|
|
|
if not granularity:
|
|
|
return []
|
|
|
|
|
|
url = f"https://api.exchange.coinbase.com/products/{pair}/candles"
|
|
|
params = {"granularity": granularity}
|
|
|
|
|
|
logger.debug(f"Fetching from Coinbase: {pair} ({symbol})")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
if not data or not isinstance(data, list):
|
|
|
return []
|
|
|
|
|
|
ohlc_data = []
|
|
|
for candle in data[:limit]:
|
|
|
try:
|
|
|
|
|
|
ohlc_data.append({
|
|
|
"symbol": symbol,
|
|
|
"interval": interval,
|
|
|
"timestamp": datetime.fromtimestamp(int(candle[0])),
|
|
|
"open": float(candle[3]),
|
|
|
"high": float(candle[2]),
|
|
|
"low": float(candle[1]),
|
|
|
"close": float(candle[4]),
|
|
|
"volume": float(candle[5]),
|
|
|
"provider": "coinbase"
|
|
|
})
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error parsing Coinbase candle: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.info(f"✅ Coinbase: Fetched {len(ohlc_data)} candles for {symbol}")
|
|
|
return ohlc_data
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Coinbase error for {symbol}: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def fetch_from_binance(symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch OHLC data from Binance (FREE, may be geo-restricted)
|
|
|
|
|
|
Args:
|
|
|
symbol: Base symbol (e.g., 'BTC')
|
|
|
interval: Interval
|
|
|
limit: Number of candles
|
|
|
|
|
|
Returns:
|
|
|
List of OHLC candles
|
|
|
"""
|
|
|
try:
|
|
|
pair = SYMBOL_MAP["binance"].get(symbol)
|
|
|
if not pair:
|
|
|
logger.debug(f"Binance: No mapping for {symbol}")
|
|
|
return []
|
|
|
|
|
|
url = "https://api.binance.com/api/v3/klines"
|
|
|
params = {"symbol": pair, "interval": interval, "limit": limit}
|
|
|
|
|
|
logger.debug(f"Fetching from Binance: {pair} ({symbol})")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(url, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
if not data or not isinstance(data, list):
|
|
|
return []
|
|
|
|
|
|
ohlc_data = []
|
|
|
for candle in data:
|
|
|
try:
|
|
|
|
|
|
ohlc_data.append({
|
|
|
"symbol": symbol,
|
|
|
"interval": interval,
|
|
|
"timestamp": datetime.fromtimestamp(int(candle[0]) / 1000),
|
|
|
"open": float(candle[1]),
|
|
|
"high": float(candle[2]),
|
|
|
"low": float(candle[3]),
|
|
|
"close": float(candle[4]),
|
|
|
"volume": float(candle[5]),
|
|
|
"provider": "binance"
|
|
|
})
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error parsing Binance candle: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.info(f"✅ Binance: Fetched {len(ohlc_data)} candles for {symbol}")
|
|
|
return ohlc_data
|
|
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
|
if e.response.status_code == 451:
|
|
|
logger.debug(f"Binance geo-restricted for {symbol}")
|
|
|
else:
|
|
|
logger.debug(f"Binance HTTP error for {symbol}: {e.response.status_code}")
|
|
|
return []
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Binance error for {symbol}: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def fetch_ohlc_with_fallback(symbol: str, interval: str, limit: int = 100) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch OHLC data with automatic fallback across multiple sources
|
|
|
|
|
|
Priority order:
|
|
|
1. CoinGecko (most reliable, no auth, no geo-restrictions)
|
|
|
2. Kraken (reliable, no auth)
|
|
|
3. Coinbase (reliable, no auth)
|
|
|
4. Binance (may be geo-restricted)
|
|
|
|
|
|
Args:
|
|
|
symbol: Base symbol (e.g., 'BTC')
|
|
|
interval: Interval ('1h', '4h', '1d')
|
|
|
limit: Number of candles to fetch
|
|
|
|
|
|
Returns:
|
|
|
List of OHLC candles from first successful source
|
|
|
"""
|
|
|
sources = [
|
|
|
("CoinGecko", fetch_from_coingecko),
|
|
|
("Kraken", fetch_from_kraken),
|
|
|
("Coinbase", fetch_from_coinbase),
|
|
|
("Binance", fetch_from_binance),
|
|
|
]
|
|
|
|
|
|
for source_name, fetch_func in sources:
|
|
|
try:
|
|
|
data = await fetch_func(symbol, interval, limit)
|
|
|
if data and len(data) > 0:
|
|
|
logger.debug(f"✅ Successfully fetched {len(data)} candles from {source_name} for {symbol}")
|
|
|
return data
|
|
|
except Exception as e:
|
|
|
logger.debug(f"❌ {source_name} failed for {symbol}: {e}")
|
|
|
continue
|
|
|
|
|
|
logger.warning(f"⚠️ All sources failed for {symbol} {interval}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
async def save_ohlc_data_to_cache(ohlc_data: List[Dict[str, Any]]) -> int:
|
|
|
"""
|
|
|
Save REAL OHLC data to database cache AND upload to HuggingFace Datasets
|
|
|
|
|
|
Data Flow:
|
|
|
1. Save to SQLite cache (local persistence)
|
|
|
2. Upload to HuggingFace Datasets (cloud storage & hub)
|
|
|
3. Clients can fetch from HuggingFace Datasets
|
|
|
|
|
|
Args:
|
|
|
ohlc_data: List of REAL OHLC data dictionaries
|
|
|
|
|
|
Returns:
|
|
|
int: Number of candles saved
|
|
|
"""
|
|
|
saved_count = 0
|
|
|
|
|
|
|
|
|
for data in ohlc_data:
|
|
|
try:
|
|
|
success = cache.save_ohlc_candle(
|
|
|
symbol=data["symbol"],
|
|
|
interval=data["interval"],
|
|
|
timestamp=data["timestamp"],
|
|
|
open_price=data["open"],
|
|
|
high=data["high"],
|
|
|
low=data["low"],
|
|
|
close=data["close"],
|
|
|
volume=data["volume"],
|
|
|
provider=data["provider"]
|
|
|
)
|
|
|
|
|
|
if success:
|
|
|
saved_count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error saving OHLC data for {data.get('symbol')}: {e}")
|
|
|
continue
|
|
|
|
|
|
|
|
|
if HF_UPLOAD_ENABLED and hf_uploader and ohlc_data:
|
|
|
try:
|
|
|
|
|
|
upload_data = []
|
|
|
for data in ohlc_data:
|
|
|
upload_record = data.copy()
|
|
|
if isinstance(upload_record.get("timestamp"), datetime):
|
|
|
upload_record["timestamp"] = upload_record["timestamp"].isoformat() + "Z"
|
|
|
upload_data.append(upload_record)
|
|
|
|
|
|
logger.info(f"📤 Uploading {len(upload_data)} OHLC records to HuggingFace Datasets...")
|
|
|
upload_success = await hf_uploader.upload_ohlc_data(
|
|
|
upload_data,
|
|
|
append=True
|
|
|
)
|
|
|
|
|
|
if upload_success:
|
|
|
logger.info(f"✅ Successfully uploaded OHLC data to HuggingFace Datasets")
|
|
|
else:
|
|
|
logger.warning(f"⚠️ Failed to upload OHLC data to HuggingFace Datasets")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error uploading OHLC to HuggingFace Datasets: {e}")
|
|
|
|
|
|
|
|
|
return saved_count
|
|
|
|
|
|
|
|
|
async def fetch_and_cache_ohlc_for_symbol(symbol: str, interval: str) -> int:
|
|
|
"""
|
|
|
Fetch and cache OHLC data for a single symbol and interval using multi-source fallback
|
|
|
|
|
|
Args:
|
|
|
symbol: Base symbol (e.g., 'BTC')
|
|
|
interval: Candle interval ('1h', '4h', '1d')
|
|
|
|
|
|
Returns:
|
|
|
int: Number of candles saved
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
limit = 100 if interval == "1d" else 100
|
|
|
|
|
|
|
|
|
ohlc_data = await fetch_ohlc_with_fallback(symbol, interval, limit)
|
|
|
|
|
|
if not ohlc_data or len(ohlc_data) == 0:
|
|
|
logger.debug(f"No OHLC data received for {symbol} {interval}")
|
|
|
return 0
|
|
|
|
|
|
|
|
|
saved_count = await save_ohlc_data_to_cache(ohlc_data)
|
|
|
|
|
|
if saved_count > 0:
|
|
|
logger.debug(f"Saved {saved_count}/{len(ohlc_data)} candles for {symbol} {interval}")
|
|
|
return saved_count
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching OHLC for {symbol} {interval}: {e}")
|
|
|
return 0
|
|
|
|
|
|
|
|
|
async def ohlc_data_worker_loop():
|
|
|
"""
|
|
|
Background worker loop - Fetch REAL OHLC data periodically with multi-source fallback
|
|
|
|
|
|
CRITICAL RULES:
|
|
|
1. Run continuously in background
|
|
|
2. Fetch REAL data from multiple sources with automatic fallback
|
|
|
3. Store REAL data in database
|
|
|
4. NEVER generate fake candles as fallback
|
|
|
5. If all sources fail, log error and retry on next iteration
|
|
|
"""
|
|
|
|
|
|
logger.info("Starting OHLC data background worker with multi-source fallback")
|
|
|
logger.info("📊 Data sources: CoinGecko, Kraken, Coinbase, Binance")
|
|
|
iteration = 0
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
iteration += 1
|
|
|
start_time = time.time()
|
|
|
|
|
|
logger.info(f"[Iteration {iteration}] Fetching REAL OHLC data from multiple sources...")
|
|
|
|
|
|
total_saved = 0
|
|
|
total_combinations = len(SYMBOLS) * len(INTERVALS)
|
|
|
successful_fetches = 0
|
|
|
|
|
|
|
|
|
for symbol in SYMBOLS:
|
|
|
for interval in INTERVALS:
|
|
|
try:
|
|
|
saved = await fetch_and_cache_ohlc_for_symbol(symbol, interval)
|
|
|
total_saved += saved
|
|
|
if saved > 0:
|
|
|
successful_fetches += 1
|
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing {symbol} {interval}: {e}")
|
|
|
continue
|
|
|
|
|
|
elapsed = time.time() - start_time
|
|
|
logger.info(
|
|
|
f"[Iteration {iteration}] Successfully saved {total_saved} REAL OHLC candles "
|
|
|
f"({successful_fetches}/{total_combinations} symbol-intervals) in {elapsed:.2f}s"
|
|
|
)
|
|
|
|
|
|
|
|
|
await asyncio.sleep(300)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True)
|
|
|
|
|
|
await asyncio.sleep(300)
|
|
|
|
|
|
|
|
|
async def start_ohlc_data_worker():
|
|
|
"""
|
|
|
Start OHLC data background worker with multi-source support
|
|
|
|
|
|
This should be called during application startup
|
|
|
"""
|
|
|
try:
|
|
|
logger.info("Initializing OHLC data worker with multi-source fallback...")
|
|
|
logger.info("📊 Supported sources: CoinGecko, Kraken, Coinbase, Binance")
|
|
|
|
|
|
|
|
|
logger.info("Running initial OHLC data fetch...")
|
|
|
total_saved = 0
|
|
|
|
|
|
for symbol in SYMBOLS[:5]:
|
|
|
for interval in INTERVALS:
|
|
|
saved = await fetch_and_cache_ohlc_for_symbol(symbol, interval)
|
|
|
total_saved += saved
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
logger.info(f"Initial fetch: Saved {total_saved} REAL OHLC candles")
|
|
|
|
|
|
|
|
|
asyncio.create_task(ohlc_data_worker_loop())
|
|
|
logger.info("OHLC data worker started successfully")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to start OHLC data worker: {e}", exc_info=True)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
import sys
|
|
|
sys.path.append("/workspace")
|
|
|
|
|
|
async def test():
|
|
|
"""Test the worker with multi-source fallback"""
|
|
|
logger.info("Testing OHLC data worker with multi-source fallback...")
|
|
|
|
|
|
|
|
|
test_symbols = ["BTC", "ETH"]
|
|
|
interval = "1h"
|
|
|
|
|
|
for symbol in test_symbols:
|
|
|
logger.info(f"\n{'='*60}")
|
|
|
logger.info(f"Testing {symbol}")
|
|
|
logger.info(f"{'='*60}")
|
|
|
|
|
|
data = await fetch_ohlc_with_fallback(symbol, interval, limit=10)
|
|
|
logger.info(f"Fetched {len(data)} candles for {symbol} {interval}")
|
|
|
|
|
|
if data:
|
|
|
|
|
|
logger.info(f"Provider: {data[0].get('provider')}")
|
|
|
for candle in data[:3]:
|
|
|
logger.info(
|
|
|
f" {candle['timestamp']}: O={candle['open']:.2f} "
|
|
|
f"H={candle['high']:.2f} L={candle['low']:.2f} C={candle['close']:.2f}"
|
|
|
)
|
|
|
|
|
|
|
|
|
saved = await save_ohlc_data_to_cache(data)
|
|
|
logger.info(f"Saved {saved} candles to database")
|
|
|
else:
|
|
|
logger.warning(f"No data retrieved for {symbol}")
|
|
|
|
|
|
asyncio.run(test())
|
|
|
|