Datasourceforcryptocurrency / backend /services /multi_source_data_fetchers.py
Really-amin's picture
Upload 577 files
b190b45 verified
#!/usr/bin/env python3
"""
Multi-Source Data Fetchers
Specialized fetchers for each data type with 10+ fallback sources
Includes special handlers for CoinGecko and Binance
"""
import httpx
import asyncio
import logging
import feedparser
from typing import Dict, Any, List, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class MarketPriceFetcher:
"""
Fetch market prices with 23+ fallback sources
Special handling for CoinGecko and Binance
"""
@staticmethod
async def fetch_coingecko_special(source: Dict[str, Any], symbols: Optional[List[str]] = None, **kwargs) -> Dict[str, Any]:
"""
Special CoinGecko handler with advanced features
- Automatic symbol mapping
- Batch requests
- Community data integration
"""
try:
base_url = source["url"]
timeout = source.get("timeout", 10)
async with httpx.AsyncClient(timeout=timeout) as client:
if symbols and len(symbols) > 0:
# Map symbols to CoinGecko IDs
symbol_map = {
"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin",
"XRP": "ripple", "ADA": "cardano", "DOGE": "dogecoin",
"SOL": "solana", "TRX": "tron", "DOT": "polkadot",
"MATIC": "matic-network", "LTC": "litecoin", "SHIB": "shiba-inu",
"AVAX": "avalanche-2", "UNI": "uniswap", "LINK": "chainlink",
"ATOM": "cosmos", "XLM": "stellar", "ETC": "ethereum-classic",
"XMR": "monero", "BCH": "bitcoin-cash"
}
coin_ids = []
for symbol in symbols:
clean_symbol = symbol.upper().replace("USDT", "").replace("USD", "")
coin_id = symbol_map.get(clean_symbol, clean_symbol.lower())
coin_ids.append(coin_id)
# Batch request for specific symbols
response = await client.get(
f"{base_url}/simple/price",
params={
"ids": ",".join(coin_ids),
"vs_currencies": "usd",
"include_24hr_change": "true",
"include_24hr_vol": "true",
"include_market_cap": "true",
"include_last_updated_at": "true"
}
)
else:
# Get top coins by market cap
limit = kwargs.get("limit", 100)
response = await client.get(
f"{base_url}/coins/markets",
params={
"vs_currency": "usd",
"order": "market_cap_desc",
"per_page": min(limit, 250),
"page": 1,
"sparkline": "false",
"price_change_percentage": "24h,7d"
}
)
response.raise_for_status()
data = response.json()
# Transform to standard format
prices = []
if isinstance(data, dict) and symbols:
# Simple price format
for coin_id, coin_data in data.items():
symbol = next((k for k, v in symbol_map.items() if v == coin_id), coin_id.upper())
prices.append({
"symbol": symbol,
"price": coin_data.get("usd", 0),
"change24h": coin_data.get("usd_24h_change", 0),
"volume24h": coin_data.get("usd_24h_vol", 0),
"marketCap": coin_data.get("usd_market_cap", 0),
"lastUpdated": coin_data.get("last_updated_at", int(datetime.utcnow().timestamp()))
})
elif isinstance(data, list):
# Markets format
for coin in data:
prices.append({
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"price": coin.get("current_price", 0),
"change24h": coin.get("price_change_24h", 0),
"changePercent24h": coin.get("price_change_percentage_24h", 0),
"changePercent7d": coin.get("price_change_percentage_7d_in_currency", 0),
"volume24h": coin.get("total_volume", 0),
"marketCap": coin.get("market_cap", 0),
"marketCapRank": coin.get("market_cap_rank", 0),
"circulatingSupply": coin.get("circulating_supply", 0),
"totalSupply": coin.get("total_supply", 0),
"ath": coin.get("ath", 0),
"athDate": coin.get("ath_date", ""),
"lastUpdated": coin.get("last_updated", "")
})
logger.info(f"✅ CoinGecko Special: {len(prices)} prices fetched")
return {
"prices": prices,
"count": len(prices),
"source": "coingecko_special",
"enhanced": True
}
except Exception as e:
logger.error(f"❌ CoinGecko Special failed: {e}")
raise
@staticmethod
async def fetch_binance_special(source: Dict[str, Any], symbols: Optional[List[str]] = None, **kwargs) -> Dict[str, Any]:
"""
Special Binance handler with advanced features
- 24h ticker statistics
- Book ticker (best bid/ask)
- Average price
- Multi-symbol batch requests
"""
try:
base_url = source["url"]
timeout = source.get("timeout", 10)
async with httpx.AsyncClient(timeout=timeout) as client:
if symbols and len(symbols) > 0:
# Fetch data for specific symbols
prices = []
# Create tasks for parallel fetching
tasks = []
for symbol in symbols:
clean_symbol = symbol.upper().replace("USD", "")
binance_symbol = f"{clean_symbol}USDT"
tasks.append(MarketPriceFetcher._fetch_binance_single(client, base_url, binance_symbol))
# Execute in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, dict):
prices.append(result)
else:
# Get all tickers
response = await client.get(f"{base_url}/ticker/24hr")
response.raise_for_status()
tickers = response.json()
# Filter USDT pairs and transform
prices = []
limit = kwargs.get("limit", 100)
for ticker in tickers:
symbol = ticker.get("symbol", "")
if symbol.endswith("USDT"):
clean_symbol = symbol.replace("USDT", "")
prices.append({
"symbol": clean_symbol,
"price": float(ticker.get("lastPrice", 0)),
"change24h": float(ticker.get("priceChange", 0)),
"changePercent24h": float(ticker.get("priceChangePercent", 0)),
"volume24h": float(ticker.get("volume", 0)),
"quoteVolume24h": float(ticker.get("quoteVolume", 0)),
"high24h": float(ticker.get("highPrice", 0)),
"low24h": float(ticker.get("lowPrice", 0)),
"openPrice": float(ticker.get("openPrice", 0)),
"weightedAvgPrice": float(ticker.get("weightedAvgPrice", 0)),
"trades": int(ticker.get("count", 0)),
"openTime": int(ticker.get("openTime", 0)),
"closeTime": int(ticker.get("closeTime", 0))
})
if len(prices) >= limit:
break
logger.info(f"✅ Binance Special: {len(prices)} prices fetched")
return {
"prices": prices,
"count": len(prices),
"source": "binance_special",
"enhanced": True
}
except Exception as e:
logger.error(f"❌ Binance Special failed: {e}")
raise
@staticmethod
async def _fetch_binance_single(client: httpx.AsyncClient, base_url: str, symbol: str) -> Dict[str, Any]:
"""Fetch single symbol data from Binance with multiple endpoints"""
try:
# Fetch 24h ticker
response = await client.get(
f"{base_url}/ticker/24hr",
params={"symbol": symbol}
)
response.raise_for_status()
ticker = response.json()
# Try to get book ticker (best bid/ask)
try:
book_response = await client.get(
f"{base_url}/ticker/bookTicker",
params={"symbol": symbol}
)
book_response.raise_for_status()
book_ticker = book_response.json()
except:
book_ticker = {}
clean_symbol = symbol.replace("USDT", "")
return {
"symbol": clean_symbol,
"price": float(ticker.get("lastPrice", 0)),
"change24h": float(ticker.get("priceChange", 0)),
"changePercent24h": float(ticker.get("priceChangePercent", 0)),
"volume24h": float(ticker.get("volume", 0)),
"quoteVolume24h": float(ticker.get("quoteVolume", 0)),
"high24h": float(ticker.get("highPrice", 0)),
"low24h": float(ticker.get("lowPrice", 0)),
"weightedAvgPrice": float(ticker.get("weightedAvgPrice", 0)),
"bidPrice": float(book_ticker.get("bidPrice", 0)) if book_ticker else None,
"askPrice": float(book_ticker.get("askPrice", 0)) if book_ticker else None,
"spread": float(book_ticker.get("askPrice", 0)) - float(book_ticker.get("bidPrice", 0)) if book_ticker else None,
"trades": int(ticker.get("count", 0))
}
except Exception as e:
logger.warning(f"⚠️ Failed to fetch {symbol}: {e}")
raise
@staticmethod
async def fetch_generic(source: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Generic price fetcher for other sources"""
source_name = source["name"]
url = source["url"]
timeout = source.get("timeout", 10)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
# Different endpoints based on source
if "coinpaprika" in source_name:
response = await client.get(f"{url}/tickers")
response.raise_for_status()
data = response.json()
prices = []
for coin in data[:kwargs.get("limit", 100)]:
quotes = coin.get("quotes", {}).get("USD", {})
prices.append({
"symbol": coin.get("symbol", ""),
"name": coin.get("name", ""),
"price": quotes.get("price", 0),
"changePercent24h": quotes.get("percent_change_24h", 0),
"volume24h": quotes.get("volume_24h", 0),
"marketCap": quotes.get("market_cap", 0)
})
return {"prices": prices, "count": len(prices)}
elif "coincap" in source_name:
response = await client.get(f"{url}/assets")
response.raise_for_status()
data = response.json()
prices = []
for asset in data.get("data", [])[:kwargs.get("limit", 100)]:
prices.append({
"symbol": asset.get("symbol", ""),
"name": asset.get("name", ""),
"price": float(asset.get("priceUsd", 0)),
"changePercent24h": float(asset.get("changePercent24Hr", 0)),
"volume24h": float(asset.get("volumeUsd24Hr", 0)),
"marketCap": float(asset.get("marketCapUsd", 0))
})
return {"prices": prices, "count": len(prices)}
elif "coinmarketcap" in source_name:
headers = {"X-CMC_PRO_API_KEY": source.get("api_key", "")}
response = await client.get(
f"{url}/cryptocurrency/listings/latest",
headers=headers,
params={"limit": kwargs.get("limit", 100), "convert": "USD"}
)
response.raise_for_status()
data = response.json()
prices = []
for coin in data.get("data", []):
quote = coin.get("quote", {}).get("USD", {})
prices.append({
"symbol": coin.get("symbol", ""),
"name": coin.get("name", ""),
"price": quote.get("price", 0),
"changePercent24h": quote.get("percent_change_24h", 0),
"volume24h": quote.get("volume_24h", 0),
"marketCap": quote.get("market_cap", 0)
})
return {"prices": prices, "count": len(prices)}
else:
# Generic fallback
logger.warning(f"⚠️ No specific handler for {source_name}, using generic")
return {"prices": [], "count": 0, "error": "No specific handler"}
except Exception as e:
logger.error(f"❌ {source_name} failed: {e}")
raise
class OHLCFetcher:
"""
Fetch OHLC/candlestick data with 18+ fallback sources
Special handling for Binance klines
"""
@staticmethod
async def fetch_binance_ohlc_special(
source: Dict[str, Any],
symbol: str,
timeframe: str = "1h",
limit: int = 1000,
**kwargs
) -> Dict[str, Any]:
"""
Special Binance OHLC handler with advanced features
- Supports all timeframes
- Up to 1000 candles per request
- Automatic symbol normalization
"""
try:
base_url = source["url"].replace("/api/v3", "/api/v3")
timeout = source.get("timeout", 15)
# Normalize symbol
clean_symbol = symbol.upper().replace("USD", "")
if not clean_symbol.endswith("USDT"):
binance_symbol = f"{clean_symbol}USDT"
else:
binance_symbol = clean_symbol
# Timeframe mapping
interval_map = {
"1m": "1m", "3m": "3m", "5m": "5m", "15m": "15m", "30m": "30m",
"1h": "1h", "2h": "2h", "4h": "4h", "6h": "6h", "8h": "8h", "12h": "12h",
"1d": "1d", "3d": "3d", "1w": "1w", "1M": "1M"
}
binance_interval = interval_map.get(timeframe, "1h")
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
"https://api.binance.com/api/v3/klines",
params={
"symbol": binance_symbol,
"interval": binance_interval,
"limit": min(limit, 1000)
}
)
response.raise_for_status()
klines = response.json()
# Transform to standard OHLCV format
candles = []
for kline in klines:
candles.append({
"timestamp": int(kline[0]),
"open": float(kline[1]),
"high": float(kline[2]),
"low": float(kline[3]),
"close": float(kline[4]),
"volume": float(kline[5]),
"closeTime": int(kline[6]),
"quoteVolume": float(kline[7]),
"trades": int(kline[8]),
"takerBuyBaseVolume": float(kline[9]),
"takerBuyQuoteVolume": float(kline[10])
})
logger.info(f"✅ Binance OHLC Special: {len(candles)} candles for {binance_symbol}")
return {
"symbol": symbol,
"timeframe": timeframe,
"candles": candles,
"count": len(candles),
"source": "binance_ohlc_special",
"enhanced": True
}
except Exception as e:
logger.error(f"❌ Binance OHLC Special failed: {e}")
raise
@staticmethod
async def fetch_coingecko_ohlc(source: Dict[str, Any], symbol: str, days: int = 7, **kwargs) -> Dict[str, Any]:
"""Fetch OHLC from CoinGecko"""
try:
# Symbol to coin ID mapping
symbol_map = {
"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin",
"XRP": "ripple", "ADA": "cardano", "DOGE": "dogecoin",
"SOL": "solana", "TRX": "tron", "DOT": "polkadot"
}
coin_id = symbol_map.get(symbol.upper(), symbol.lower())
base_url = source["url"]
timeout = source.get("timeout", 15)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(
f"{base_url}/coins/{coin_id}/ohlc",
params={"vs_currency": "usd", "days": days}
)
response.raise_for_status()
data = response.json()
candles = []
for item in data:
candles.append({
"timestamp": item[0],
"open": item[1],
"high": item[2],
"low": item[3],
"close": item[4],
"volume": 0 # CoinGecko OHLC doesn't include volume
})
return {"symbol": symbol, "candles": candles, "count": len(candles)}
except Exception as e:
logger.error(f"❌ CoinGecko OHLC failed: {e}")
raise
@staticmethod
async def fetch_generic_exchange(source: Dict[str, Any], symbol: str, timeframe: str = "1h", limit: int = 100, **kwargs) -> Dict[str, Any]:
"""Generic OHLC fetcher for exchanges (KuCoin, Bybit, OKX, etc.)"""
source_name = source["name"]
url = source["url"]
try:
# Add specific logic for each exchange
if "kucoin" in source_name:
# KuCoin specific implementation
pass
elif "bybit" in source_name:
# Bybit specific implementation
pass
elif "okx" in source_name:
# OKX specific implementation
pass
# Placeholder
return {"symbol": symbol, "candles": [], "count": 0}
except Exception as e:
logger.error(f"❌ {source_name} OHLC failed: {e}")
raise
class NewsFetcher:
"""Fetch news from 15+ sources"""
@staticmethod
async def fetch_news_api(source: Dict[str, Any], query: str = "cryptocurrency", limit: int = 20, **kwargs) -> Dict[str, Any]:
"""Fetch from news API sources"""
try:
url = source["url"]
api_key = source.get("api_key")
timeout = source.get("timeout", 10)
async with httpx.AsyncClient(timeout=timeout) as client:
if "newsapi.org" in url:
response = await client.get(
f"{url}/everything",
params={
"q": query,
"apiKey": api_key,
"language": "en",
"sortBy": "publishedAt",
"pageSize": limit
}
)
response.raise_for_status()
data = response.json()
articles = []
for article in data.get("articles", []):
articles.append({
"title": article.get("title", ""),
"description": article.get("description", ""),
"url": article.get("url", ""),
"source": article.get("source", {}).get("name", ""),
"publishedAt": article.get("publishedAt", ""),
"author": article.get("author", "")
})
return {"articles": articles, "count": len(articles)}
else:
return {"articles": [], "count": 0}
except Exception as e:
logger.error(f"❌ News API failed: {e}")
raise
@staticmethod
async def fetch_rss_feed(source: Dict[str, Any], limit: int = 20, **kwargs) -> Dict[str, Any]:
"""Fetch from RSS feeds"""
try:
feed_url = source["url"]
# Parse RSS feed (using feedparser - sync operation)
feed = await asyncio.to_thread(feedparser.parse, feed_url)
articles = []
for entry in feed.entries[:limit]:
try:
published = entry.get("published_parsed")
if published:
dt = datetime(*published[:6])
timestamp = dt.isoformat()
else:
timestamp = datetime.utcnow().isoformat()
except:
timestamp = datetime.utcnow().isoformat()
articles.append({
"title": entry.get("title", ""),
"description": entry.get("summary", ""),
"url": entry.get("link", ""),
"source": source["name"],
"publishedAt": timestamp
})
logger.info(f"✅ RSS {source['name']}: {len(articles)} articles")
return {"articles": articles, "count": len(articles)}
except Exception as e:
logger.error(f"❌ RSS feed failed: {e}")
raise
class SentimentFetcher:
"""Fetch sentiment data from 12+ sources"""
@staticmethod
async def fetch_fear_greed(source: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""Fetch Fear & Greed Index"""
try:
url = source["url"]
timeout = source.get("timeout", 10)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url, params={"limit": 1})
response.raise_for_status()
data = response.json()
if "data" in data and len(data["data"]) > 0:
fng = data["data"][0]
return {
"value": int(fng.get("value", 50)),
"classification": fng.get("value_classification", "neutral"),
"timestamp": int(fng.get("timestamp", 0))
}
return {"value": 50, "classification": "neutral", "timestamp": int(datetime.utcnow().timestamp())}
except Exception as e:
logger.error(f"❌ Fear & Greed failed: {e}")
raise
__all__ = [
"MarketPriceFetcher",
"OHLCFetcher",
"NewsFetcher",
"SentimentFetcher"
]