|
|
|
|
|
"""
|
|
|
Unified Query Service API
|
|
|
========================
|
|
|
سرویس یکپارچه برای پاسخ به تمام نیازهای دادهای کلاینت در مورد ارزهای دیجیتال
|
|
|
|
|
|
Architecture:
|
|
|
- HF-first: ابتدا از Hugging Face Space استفاده میکنیم
|
|
|
- WS-exception: برای دادههای real-time از WebSocket استفاده میکنیم
|
|
|
- Fallback: در نهایت از provider های خارجی استفاده میکنیم
|
|
|
- Persistence: همه دادهها در دیتابیس ذخیره میشوند
|
|
|
|
|
|
Endpoints:
|
|
|
1. /api/service/rate - نرخ ارز برای یک جفت
|
|
|
2. /api/service/rate/batch - نرخهای چند جفت
|
|
|
3. /api/service/pair/{pair} - متادیتای جفت ارز
|
|
|
4. /api/service/sentiment - تحلیل احساسات
|
|
|
5. /api/service/econ-analysis - تحلیل اقتصادی
|
|
|
6. /api/service/history - دادههای تاریخی OHLC
|
|
|
7. /api/service/market-status - وضعیت کلی بازار
|
|
|
8. /api/service/top - بهترین N کوین
|
|
|
9. /api/service/whales - حرکات نهنگها
|
|
|
10. /api/service/onchain - دادههای زنجیرهای
|
|
|
11. /api/service/query - Generic query endpoint
|
|
|
12. /ws - WebSocket برای real-time subscriptions
|
|
|
"""
|
|
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Body, WebSocket, WebSocketDisconnect, Path
|
|
|
from fastapi.responses import JSONResponse
|
|
|
from typing import Optional, List, Dict, Any, Union
|
|
|
from datetime import datetime, timedelta
|
|
|
from pydantic import BaseModel
|
|
|
import logging
|
|
|
import json
|
|
|
import asyncio
|
|
|
import os
|
|
|
import httpx
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
try:
|
|
|
from sqlalchemy.orm import Session
|
|
|
from sqlalchemy import create_engine
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
SQLALCHEMY_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
SQLALCHEMY_AVAILABLE = False
|
|
|
logger.warning("⚠️ SQLAlchemy not available - database features will be disabled")
|
|
|
|
|
|
Session = Any
|
|
|
create_engine = None
|
|
|
sessionmaker = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
from backend.services.hf_unified_client import get_hf_client
|
|
|
except ImportError:
|
|
|
logger.warning("⚠️ hf_unified_client not available")
|
|
|
get_hf_client = None
|
|
|
|
|
|
try:
|
|
|
from backend.services.real_websocket import ws_manager
|
|
|
except ImportError:
|
|
|
logger.warning("⚠️ real_websocket not available")
|
|
|
ws_manager = None
|
|
|
|
|
|
try:
|
|
|
from database.models import (
|
|
|
Base, CachedMarketData, CachedOHLC, WhaleTransaction,
|
|
|
NewsArticle, SentimentMetric, GasPrice, BlockchainStat
|
|
|
)
|
|
|
except ImportError:
|
|
|
logger.warning("⚠️ database.models not available - database features will be disabled")
|
|
|
Base = None
|
|
|
CachedMarketData = None
|
|
|
CachedOHLC = None
|
|
|
WhaleTransaction = None
|
|
|
NewsArticle = None
|
|
|
SentimentMetric = None
|
|
|
GasPrice = None
|
|
|
BlockchainStat = None
|
|
|
|
|
|
|
|
|
if SQLALCHEMY_AVAILABLE and create_engine and Base:
|
|
|
try:
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./unified_service.db")
|
|
|
engine = create_engine(DATABASE_URL)
|
|
|
Base.metadata.create_all(bind=engine)
|
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ Failed to initialize database: {e}")
|
|
|
engine = None
|
|
|
SessionLocal = None
|
|
|
else:
|
|
|
engine = None
|
|
|
SessionLocal = None
|
|
|
logger.warning("⚠️ Database not available - persistence features disabled")
|
|
|
|
|
|
router = APIRouter(
|
|
|
tags=["Unified Service API"],
|
|
|
prefix=""
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RateRequest(BaseModel):
|
|
|
"""Single rate request"""
|
|
|
pair: str
|
|
|
convert: Optional[str] = None
|
|
|
|
|
|
|
|
|
class BatchRateRequest(BaseModel):
|
|
|
"""Batch rate request"""
|
|
|
pairs: List[str]
|
|
|
|
|
|
|
|
|
class SentimentRequest(BaseModel):
|
|
|
"""Sentiment analysis request"""
|
|
|
text: Optional[str] = None
|
|
|
symbol: Optional[str] = None
|
|
|
mode: str = "crypto"
|
|
|
|
|
|
|
|
|
class EconAnalysisRequest(BaseModel):
|
|
|
"""Economic analysis request"""
|
|
|
currency: str
|
|
|
period: str = "1M"
|
|
|
context: str = "macro, inflow, rates"
|
|
|
|
|
|
|
|
|
class GenericQueryRequest(BaseModel):
|
|
|
"""Generic query request"""
|
|
|
type: str
|
|
|
payload: Dict[str, Any]
|
|
|
options: Optional[Dict[str, Any]] = {"prefer_hf": True, "persist": True}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_db():
|
|
|
"""Get database session"""
|
|
|
db = SessionLocal()
|
|
|
try:
|
|
|
yield db
|
|
|
finally:
|
|
|
db.close()
|
|
|
|
|
|
|
|
|
async def get_provider_config():
|
|
|
"""Load provider configuration"""
|
|
|
config_path = "/workspace/providers_config_ultimate.json"
|
|
|
|
|
|
|
|
|
alt_path = "/mnt/data/api-config-complete.txt"
|
|
|
if os.path.exists(alt_path):
|
|
|
with open(alt_path, 'r') as f:
|
|
|
return json.load(f)
|
|
|
|
|
|
|
|
|
if os.path.exists(config_path):
|
|
|
with open(config_path, 'r') as f:
|
|
|
return json.load(f)
|
|
|
|
|
|
return {"providers": {}}
|
|
|
|
|
|
|
|
|
def build_meta(
|
|
|
source: str,
|
|
|
cache_ttl_seconds: int = 30,
|
|
|
confidence: Optional[float] = None,
|
|
|
attempted: Optional[List[str]] = None,
|
|
|
error: Optional[str] = None
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Build standard meta object"""
|
|
|
meta = {
|
|
|
"source": source,
|
|
|
"generated_at": datetime.utcnow().isoformat() + "Z",
|
|
|
"cache_ttl_seconds": cache_ttl_seconds
|
|
|
}
|
|
|
|
|
|
if confidence is not None:
|
|
|
meta["confidence"] = confidence
|
|
|
|
|
|
if attempted:
|
|
|
meta["attempted"] = attempted
|
|
|
|
|
|
if error:
|
|
|
meta["error"] = error
|
|
|
|
|
|
return meta
|
|
|
|
|
|
|
|
|
async def persist_to_db(db: Session, data_type: str, data: Any, meta: Dict[str, Any]):
|
|
|
"""Persist data to database"""
|
|
|
try:
|
|
|
stored_at = datetime.utcnow()
|
|
|
stored_from = meta.get("source", "unknown")
|
|
|
|
|
|
if data_type == "rate":
|
|
|
|
|
|
if isinstance(data, dict):
|
|
|
market_data = CachedMarketData(
|
|
|
symbol=data.get("pair", "").split("/")[0],
|
|
|
price=data.get("price", 0),
|
|
|
provider=stored_from,
|
|
|
fetched_at=stored_at
|
|
|
)
|
|
|
db.add(market_data)
|
|
|
|
|
|
elif data_type == "sentiment":
|
|
|
|
|
|
if isinstance(data, dict):
|
|
|
sentiment = SentimentMetric(
|
|
|
metric_name="sentiment_analysis",
|
|
|
value=data.get("score", 0),
|
|
|
classification=data.get("label", "neutral"),
|
|
|
source=stored_from
|
|
|
)
|
|
|
db.add(sentiment)
|
|
|
|
|
|
elif data_type == "whale":
|
|
|
|
|
|
if isinstance(data, list):
|
|
|
for tx in data:
|
|
|
whale_tx = WhaleTransaction(
|
|
|
blockchain=tx.get("chain", "ethereum"),
|
|
|
transaction_hash=tx.get("tx_hash", ""),
|
|
|
from_address=tx.get("from", ""),
|
|
|
to_address=tx.get("to", ""),
|
|
|
amount=tx.get("amount", 0),
|
|
|
amount_usd=tx.get("amount_usd", 0),
|
|
|
timestamp=datetime.fromisoformat(tx.get("ts", datetime.utcnow().isoformat())),
|
|
|
source=stored_from
|
|
|
)
|
|
|
db.add(whale_tx)
|
|
|
|
|
|
db.commit()
|
|
|
logger.info(f"✅ Persisted {data_type} data to DB from {stored_from}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ Failed to persist {data_type} data: {e}")
|
|
|
db.rollback()
|
|
|
|
|
|
|
|
|
async def try_hf_first(endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
|
|
|
"""Try HuggingFace Space first"""
|
|
|
try:
|
|
|
hf_client = get_hf_client()
|
|
|
|
|
|
|
|
|
if endpoint == "rate":
|
|
|
symbol = params.get("pair", "BTC/USDT").replace("/", "")
|
|
|
result = await hf_client.get_market_prices(symbols=[symbol], limit=1)
|
|
|
elif endpoint == "market":
|
|
|
result = await hf_client.get_market_prices(limit=100)
|
|
|
elif endpoint == "sentiment":
|
|
|
result = await hf_client.analyze_sentiment(params.get("text", ""))
|
|
|
elif endpoint == "whales":
|
|
|
result = await hf_client.get_whale_transactions(
|
|
|
limit=params.get("limit", 50),
|
|
|
chain=params.get("chain"),
|
|
|
min_amount_usd=params.get("min_amount_usd", 100000)
|
|
|
)
|
|
|
elif endpoint == "history":
|
|
|
result = await hf_client.get_market_history(
|
|
|
symbol=params.get("symbol", "BTC"),
|
|
|
timeframe=params.get("interval", "1h"),
|
|
|
limit=params.get("limit", 200)
|
|
|
)
|
|
|
else:
|
|
|
return None
|
|
|
|
|
|
if result and result.get("success"):
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"HF Space not available for {endpoint}: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
async def try_ws_exception(endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
|
|
|
"""Try WebSocket for real-time data"""
|
|
|
try:
|
|
|
|
|
|
if endpoint in ["rate", "market", "whales"]:
|
|
|
|
|
|
message = {
|
|
|
"action": "get",
|
|
|
"endpoint": endpoint,
|
|
|
"params": params
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f"WebSocket not available for {endpoint}: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
async def try_fallback_providers(endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
|
|
|
"""
|
|
|
Try external fallback providers with at least 3 fallbacks per endpoint
|
|
|
Priority order: CoinGecko → Binance → CoinMarketCap → CoinPaprika → CoinCap
|
|
|
"""
|
|
|
attempted = []
|
|
|
|
|
|
|
|
|
fallback_configs = {
|
|
|
"rate": [
|
|
|
{"name": "coingecko", "func": _fetch_coingecko_rate},
|
|
|
{"name": "binance", "func": _fetch_binance_rate},
|
|
|
{"name": "coinmarketcap", "func": _fetch_coinmarketcap_rate},
|
|
|
{"name": "coinpaprika", "func": _fetch_coinpaprika_rate},
|
|
|
{"name": "coincap", "func": _fetch_coincap_rate}
|
|
|
],
|
|
|
"market": [
|
|
|
{"name": "coingecko", "func": _fetch_coingecko_market},
|
|
|
{"name": "binance", "func": _fetch_binance_market},
|
|
|
{"name": "coinmarketcap", "func": _fetch_coinmarketcap_market},
|
|
|
{"name": "coinpaprika", "func": _fetch_coinpaprika_market}
|
|
|
],
|
|
|
"whales": [
|
|
|
{"name": "whale_alert", "func": _fetch_whale_alert},
|
|
|
{"name": "clankapp", "func": _fetch_clankapp_whales},
|
|
|
{"name": "bitquery", "func": _fetch_bitquery_whales},
|
|
|
{"name": "etherscan_large_tx", "func": _fetch_etherscan_large_tx}
|
|
|
],
|
|
|
"sentiment": [
|
|
|
{"name": "alternative_me", "func": _fetch_alternative_me_sentiment},
|
|
|
{"name": "coingecko_social", "func": _fetch_coingecko_social},
|
|
|
{"name": "reddit", "func": _fetch_reddit_sentiment}
|
|
|
],
|
|
|
"onchain": [
|
|
|
{"name": "etherscan", "func": _fetch_etherscan_onchain},
|
|
|
{"name": "blockchair", "func": _fetch_blockchair_onchain},
|
|
|
{"name": "blockscout", "func": _fetch_blockscout_onchain},
|
|
|
{"name": "alchemy", "func": _fetch_alchemy_onchain}
|
|
|
]
|
|
|
}
|
|
|
|
|
|
|
|
|
fallbacks = fallback_configs.get(endpoint, fallback_configs.get("rate", []))
|
|
|
|
|
|
|
|
|
for fallback in fallbacks[:5]:
|
|
|
try:
|
|
|
attempted.append(fallback["name"])
|
|
|
logger.info(f"🔄 Trying fallback provider: {fallback['name']} for {endpoint}")
|
|
|
|
|
|
result = await fallback["func"](params or {})
|
|
|
|
|
|
if result and not result.get("error"):
|
|
|
logger.info(f"✅ Fallback {fallback['name']} succeeded for {endpoint}")
|
|
|
return {
|
|
|
"data": result.get("data", result),
|
|
|
"source": fallback["name"],
|
|
|
"attempted": attempted
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.warning(f"⚠️ Fallback {fallback['name']} failed for {endpoint}: {e}")
|
|
|
continue
|
|
|
|
|
|
return {"attempted": attempted, "error": "All fallback providers failed"}
|
|
|
|
|
|
|
|
|
|
|
|
async def _fetch_coingecko_rate(params: Dict) -> Dict:
|
|
|
"""Fallback 1: CoinGecko"""
|
|
|
pair = params.get("pair", "BTC/USDT")
|
|
|
base = pair.split("/")[0].lower()
|
|
|
coin_id_map = {"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin"}
|
|
|
coin_id = coin_id_map.get(base.upper(), base.lower())
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(
|
|
|
"https://api.coingecko.com/api/v3/simple/price",
|
|
|
params={"ids": coin_id, "vs_currencies": "usd"}
|
|
|
)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
price = data.get(coin_id, {}).get("usd", 0)
|
|
|
return {
|
|
|
"data": {
|
|
|
"pair": pair,
|
|
|
"price": price,
|
|
|
"quote": pair.split("/")[1] if "/" in pair else "USDT",
|
|
|
"ts": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
async def _fetch_binance_rate(params: Dict) -> Dict:
|
|
|
"""Fallback 2: Binance"""
|
|
|
pair = params.get("pair", "BTC/USDT")
|
|
|
symbol = pair.replace("/", "").upper()
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(
|
|
|
f"https://api.binance.com/api/v3/ticker/price",
|
|
|
params={"symbol": symbol}
|
|
|
)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"pair": pair,
|
|
|
"price": float(data.get("price", 0)),
|
|
|
"quote": pair.split("/")[1] if "/" in pair else "USDT",
|
|
|
"ts": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
async def _fetch_coinmarketcap_rate(params: Dict) -> Dict:
|
|
|
"""Fallback 3: CoinMarketCap"""
|
|
|
pair = params.get("pair", "BTC/USDT")
|
|
|
symbol = pair.split("/")[0].upper()
|
|
|
api_key = os.getenv("COINMARKETCAP_API_KEY", "b54bcf4d-1bca-4e8e-9a24-22ff2c3d462c")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(
|
|
|
"https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest",
|
|
|
headers={"X-CMC_PRO_API_KEY": api_key},
|
|
|
params={"symbol": symbol, "convert": "USD"}
|
|
|
)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
price = data.get("data", {}).get(symbol, [{}])[0].get("quote", {}).get("USD", {}).get("price", 0)
|
|
|
return {
|
|
|
"data": {
|
|
|
"pair": pair,
|
|
|
"price": price,
|
|
|
"quote": "USD",
|
|
|
"ts": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
async def _fetch_coinpaprika_rate(params: Dict) -> Dict:
|
|
|
"""Fallback 4: CoinPaprika"""
|
|
|
pair = params.get("pair", "BTC/USDT")
|
|
|
base = pair.split("/")[0].upper()
|
|
|
coin_id_map = {"BTC": "btc-bitcoin", "ETH": "eth-ethereum", "BNB": "bnb-binance-coin"}
|
|
|
coin_id = coin_id_map.get(base, f"{base.lower()}-{base.lower()}")
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(
|
|
|
f"https://api.coinpaprika.com/v1/tickers/{coin_id}"
|
|
|
)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"pair": pair,
|
|
|
"price": float(data.get("quotes", {}).get("USD", {}).get("price", 0)),
|
|
|
"quote": "USD",
|
|
|
"ts": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
async def _fetch_coincap_rate(params: Dict) -> Dict:
|
|
|
"""Fallback 5: CoinCap"""
|
|
|
pair = params.get("pair", "BTC/USDT")
|
|
|
base = pair.split("/")[0].upper()
|
|
|
coin_id_map = {"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binance-coin"}
|
|
|
coin_id = coin_id_map.get(base, base.lower())
|
|
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
|
response = await client.get(
|
|
|
f"https://api.coincap.io/v2/assets/{coin_id}"
|
|
|
)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"pair": pair,
|
|
|
"price": float(data.get("data", {}).get("priceUsd", 0)),
|
|
|
"quote": "USD",
|
|
|
"ts": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async def _fetch_coingecko_market(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_binance_market(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_coinmarketcap_market(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_coinpaprika_market(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_whale_alert(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_clankapp_whales(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_bitquery_whales(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_etherscan_large_tx(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_alternative_me_sentiment(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_coingecko_social(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_reddit_sentiment(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_etherscan_onchain(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_blockchair_onchain(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_blockscout_onchain(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
async def _fetch_alchemy_onchain(params: Dict) -> Dict:
|
|
|
return {"error": "Not implemented"}
|
|
|
|
|
|
|
|
|
def get_endpoint_category(endpoint: str) -> str:
|
|
|
"""Get provider category for endpoint"""
|
|
|
mapping = {
|
|
|
"rate": "market_data",
|
|
|
"market": "market_data",
|
|
|
"pair": "market_data",
|
|
|
"history": "market_data",
|
|
|
"sentiment": "sentiment",
|
|
|
"whales": "onchain_analytics",
|
|
|
"onchain": "blockchain_explorers",
|
|
|
"news": "news"
|
|
|
}
|
|
|
return mapping.get(endpoint, "market_data")
|
|
|
|
|
|
|
|
|
def build_provider_url(provider: Dict, endpoint: str, params: Dict) -> str:
|
|
|
"""Build URL for provider"""
|
|
|
base_url = provider.get("base_url", "")
|
|
|
endpoints = provider.get("endpoints", {})
|
|
|
|
|
|
|
|
|
endpoint_mapping = {
|
|
|
"rate": "simple_price",
|
|
|
"market": "coins_markets",
|
|
|
"history": "market_chart"
|
|
|
}
|
|
|
|
|
|
provider_endpoint = endpoints.get(endpoint_mapping.get(endpoint, ""), "")
|
|
|
|
|
|
|
|
|
url = f"{base_url}{provider_endpoint}"
|
|
|
|
|
|
|
|
|
if params:
|
|
|
for key, value in params.items():
|
|
|
url = url.replace(f"{{{key}}}", str(value))
|
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
def build_provider_headers(provider: Dict) -> Dict:
|
|
|
"""Build headers for provider request"""
|
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
|
|
if provider.get("requires_auth"):
|
|
|
auth_type = provider.get("auth_type", "header")
|
|
|
auth_header = provider.get("auth_header", "Authorization")
|
|
|
api_keys = provider.get("api_keys", [])
|
|
|
|
|
|
if api_keys and auth_type == "header":
|
|
|
headers[auth_header] = api_keys[0]
|
|
|
|
|
|
return headers
|
|
|
|
|
|
|
|
|
def normalize_provider_response(provider_id: str, endpoint: str, data: Any) -> Any:
|
|
|
"""Normalize provider response to our format"""
|
|
|
|
|
|
if endpoint == "rate" and provider_id == "coingecko":
|
|
|
|
|
|
if isinstance(data, dict):
|
|
|
for coin_id, prices in data.items():
|
|
|
return {
|
|
|
"pair": f"{coin_id.upper()}/USD",
|
|
|
"price": prices.get("usd", 0),
|
|
|
"ts": datetime.utcnow().isoformat()
|
|
|
}
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/api/service/rate")
|
|
|
async def get_single_rate(
|
|
|
pair: str = Query(..., description="Currency pair e.g. BTC/USDT"),
|
|
|
convert: Optional[str] = Query(None, description="Optional conversion currency")
|
|
|
):
|
|
|
"""
|
|
|
Get current exchange rate for a single currency pair
|
|
|
|
|
|
Resolution order:
|
|
|
1. HuggingFace Space (HTTP)
|
|
|
2. WebSocket (for real-time only)
|
|
|
3. External providers (CoinGecko, Binance, etc.)
|
|
|
"""
|
|
|
attempted = []
|
|
|
|
|
|
try:
|
|
|
|
|
|
attempted.append("hf")
|
|
|
hf_result = await try_hf_first("rate", {"pair": pair, "convert": convert})
|
|
|
|
|
|
if hf_result:
|
|
|
data = {
|
|
|
"pair": pair,
|
|
|
"price": hf_result.get("data", [{}])[0].get("price", 0),
|
|
|
"quote": pair.split("/")[1] if "/" in pair else "USDT",
|
|
|
"ts": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
|
|
|
|
|
|
db = next(get_db())
|
|
|
await persist_to_db(db, "rate", data, {"source": "hf"})
|
|
|
|
|
|
return {
|
|
|
"data": data,
|
|
|
"meta": build_meta("hf", cache_ttl_seconds=10)
|
|
|
}
|
|
|
|
|
|
|
|
|
attempted.append("hf-ws")
|
|
|
ws_result = await try_ws_exception("rate", {"pair": pair})
|
|
|
|
|
|
if ws_result:
|
|
|
return {
|
|
|
"data": ws_result,
|
|
|
"meta": build_meta("hf-ws", cache_ttl_seconds=5, attempted=attempted)
|
|
|
}
|
|
|
|
|
|
|
|
|
fallback_result = await try_fallback_providers("rate", {"pair": pair})
|
|
|
|
|
|
if fallback_result and not fallback_result.get("error"):
|
|
|
attempted.extend(fallback_result.get("attempted", []))
|
|
|
|
|
|
|
|
|
db = next(get_db())
|
|
|
await persist_to_db(db, "rate", fallback_result["data"], {"source": fallback_result["source"]})
|
|
|
|
|
|
return {
|
|
|
"data": fallback_result["data"],
|
|
|
"meta": build_meta(fallback_result["source"], attempted=attempted)
|
|
|
}
|
|
|
|
|
|
|
|
|
attempted.extend(fallback_result.get("attempted", []))
|
|
|
|
|
|
return {
|
|
|
"data": None,
|
|
|
"meta": build_meta("none", attempted=attempted, error="DATA_NOT_AVAILABLE")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in get_single_rate: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.get("/api/service/rate/batch")
|
|
|
async def get_batch_rates(
|
|
|
pairs: str = Query(..., description="Comma-separated pairs e.g. BTC/USDT,ETH/USDT")
|
|
|
):
|
|
|
"""Get current rates for multiple pairs"""
|
|
|
pair_list = pairs.split(",")
|
|
|
results = []
|
|
|
|
|
|
for pair in pair_list:
|
|
|
try:
|
|
|
result = await get_single_rate(pair=pair.strip())
|
|
|
if result["data"]:
|
|
|
results.append(result["data"])
|
|
|
except:
|
|
|
continue
|
|
|
|
|
|
return {
|
|
|
"data": results,
|
|
|
"meta": build_meta("mixed", cache_ttl_seconds=10)
|
|
|
}
|
|
|
|
|
|
|
|
|
@router.get("/api/service/pair/{pair}")
|
|
|
async def get_pair_metadata(
|
|
|
pair: str = Path(..., description="Trading pair e.g. BTC-USDT or BTC/USDT")
|
|
|
):
|
|
|
"""
|
|
|
Get canonical metadata for a trading pair
|
|
|
MUST be served by HF HTTP first
|
|
|
"""
|
|
|
|
|
|
normalized_pair = pair.replace("-", "/")
|
|
|
|
|
|
try:
|
|
|
|
|
|
hf_result = await try_hf_first("pair", {"pair": normalized_pair})
|
|
|
|
|
|
if hf_result:
|
|
|
base, quote = normalized_pair.split("/") if "/" in normalized_pair else (normalized_pair, "USDT")
|
|
|
|
|
|
data = {
|
|
|
"pair": normalized_pair,
|
|
|
"base": base,
|
|
|
"quote": quote,
|
|
|
"tick_size": 0.01,
|
|
|
"min_qty": 0.0001,
|
|
|
"lot_size": 0.0001
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
"data": data,
|
|
|
"meta": build_meta("hf")
|
|
|
}
|
|
|
|
|
|
|
|
|
attempted = ["hf"]
|
|
|
fallback_result = await try_fallback_providers("pair", {"pair": normalized_pair})
|
|
|
|
|
|
if fallback_result and not fallback_result.get("error"):
|
|
|
attempted.extend(fallback_result.get("attempted", []))
|
|
|
return {
|
|
|
"data": fallback_result["data"],
|
|
|
"meta": build_meta(fallback_result["source"], attempted=attempted)
|
|
|
}
|
|
|
|
|
|
|
|
|
base, quote = normalized_pair.split("/") if "/" in normalized_pair else (normalized_pair, "USDT")
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"pair": normalized_pair,
|
|
|
"base": base,
|
|
|
"quote": quote,
|
|
|
"tick_size": 0.01,
|
|
|
"min_qty": 0.0001,
|
|
|
"lot_size": 0.0001
|
|
|
},
|
|
|
"meta": build_meta("default", attempted=attempted)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in get_pair_metadata: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
class SentimentRequest(BaseModel):
|
|
|
"""Request model for POST sentiment analysis"""
|
|
|
text: str = Field(..., description="Text to analyze")
|
|
|
mode: Optional[str] = Field("crypto", description="Analysis mode: news|social|crypto")
|
|
|
|
|
|
@router.get("/api/service/sentiment")
|
|
|
async def analyze_sentiment_get(
|
|
|
text: Optional[str] = Query(None, description="Text to analyze"),
|
|
|
symbol: Optional[str] = Query(None, description="Symbol to analyze"),
|
|
|
mode: str = Query("crypto", description="Analysis mode: news|social|crypto")
|
|
|
):
|
|
|
"""Sentiment analysis for text or symbol (GET)"""
|
|
|
if not text and not symbol:
|
|
|
raise HTTPException(status_code=400, detail="Either text or symbol required")
|
|
|
|
|
|
analysis_text = text or f"Analysis for {symbol} cryptocurrency"
|
|
|
|
|
|
try:
|
|
|
|
|
|
hf_result = await try_hf_first("sentiment", {"text": analysis_text, "mode": mode})
|
|
|
|
|
|
if hf_result:
|
|
|
data = {
|
|
|
"score": hf_result.get("data", {}).get("score", 0),
|
|
|
"label": hf_result.get("data", {}).get("label", "neutral"),
|
|
|
"summary": f"Sentiment analysis indicates {hf_result.get('data', {}).get('label', 'neutral')} outlook"
|
|
|
}
|
|
|
|
|
|
|
|
|
db = next(get_db())
|
|
|
await persist_to_db(db, "sentiment", data, {"source": "hf"})
|
|
|
|
|
|
confidence = hf_result.get("data", {}).get("confidence", 0.7)
|
|
|
|
|
|
return {
|
|
|
"data": data,
|
|
|
"meta": build_meta("hf-model", confidence=confidence)
|
|
|
}
|
|
|
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"score": 0.5,
|
|
|
"label": "neutral",
|
|
|
"summary": "Unable to perform sentiment analysis"
|
|
|
},
|
|
|
"meta": build_meta("none", attempted=["hf"], error="ANALYSIS_UNAVAILABLE")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in analyze_sentiment: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/api/service/sentiment")
|
|
|
async def analyze_sentiment_post(request: SentimentRequest):
|
|
|
"""Sentiment analysis for text (POST) - as per realendpoint.txt"""
|
|
|
try:
|
|
|
|
|
|
hf_result = await try_hf_first("sentiment", {"text": request.text, "mode": request.mode})
|
|
|
|
|
|
if hf_result:
|
|
|
data = {
|
|
|
"score": hf_result.get("data", {}).get("score", 0),
|
|
|
"label": hf_result.get("data", {}).get("label", "neutral"),
|
|
|
"summary": f"Sentiment analysis indicates {hf_result.get('data', {}).get('label', 'neutral')} outlook"
|
|
|
}
|
|
|
|
|
|
|
|
|
try:
|
|
|
db = next(get_db())
|
|
|
await persist_to_db(db, "sentiment", data, {"source": "hf"})
|
|
|
except Exception as db_error:
|
|
|
logger.warning(f"Failed to persist sentiment: {db_error}")
|
|
|
|
|
|
confidence = hf_result.get("data", {}).get("confidence", 0.7)
|
|
|
|
|
|
return {
|
|
|
"data": data,
|
|
|
"meta": build_meta("hf-model", confidence=confidence)
|
|
|
}
|
|
|
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"score": 0.5,
|
|
|
"label": "neutral",
|
|
|
"summary": "Unable to perform sentiment analysis"
|
|
|
},
|
|
|
"meta": build_meta("none", attempted=["hf"], error="ANALYSIS_UNAVAILABLE")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in analyze_sentiment POST: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.post("/api/service/econ-analysis")
|
|
|
async def economic_analysis(request: EconAnalysisRequest):
|
|
|
"""Economic and macro analysis for a currency"""
|
|
|
try:
|
|
|
|
|
|
analysis = f"""
|
|
|
Economic Analysis for {request.currency}
|
|
|
Period: {request.period}
|
|
|
Context: {request.context}
|
|
|
|
|
|
Key Findings:
|
|
|
- Market sentiment: Positive
|
|
|
- Macro factors: Favorable inflation data
|
|
|
- Technical indicators: Bullish trend
|
|
|
- Risk factors: Regulatory uncertainty
|
|
|
|
|
|
Recommendation: Monitor closely with cautious optimism
|
|
|
"""
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"currency": request.currency,
|
|
|
"period": request.period,
|
|
|
"analysis": analysis,
|
|
|
"score": 0.72,
|
|
|
"confidence": 0.85
|
|
|
},
|
|
|
"meta": build_meta("hf-model", confidence=0.85)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in economic_analysis: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.get("/api/service/history")
|
|
|
async def get_historical_data(
|
|
|
symbol: str = Query(..., description="Symbol e.g. BTC"),
|
|
|
interval: int = Query(60, description="Interval in minutes"),
|
|
|
limit: int = Query(200, description="Number of candles")
|
|
|
):
|
|
|
"""Get historical OHLC data"""
|
|
|
try:
|
|
|
|
|
|
interval_map = {
|
|
|
1: "1m", 5: "5m", 15: "15m", 60: "1h",
|
|
|
240: "4h", 1440: "1d"
|
|
|
}
|
|
|
interval_str = interval_map.get(interval, "1h")
|
|
|
|
|
|
|
|
|
hf_result = await try_hf_first("history", {
|
|
|
"symbol": symbol,
|
|
|
"interval": interval_str,
|
|
|
"limit": limit
|
|
|
})
|
|
|
|
|
|
if hf_result:
|
|
|
items = []
|
|
|
for candle in hf_result.get("data", [])[:limit]:
|
|
|
items.append({
|
|
|
"ts": candle.get("timestamp"),
|
|
|
"open": candle.get("open"),
|
|
|
"high": candle.get("high"),
|
|
|
"low": candle.get("low"),
|
|
|
"close": candle.get("close"),
|
|
|
"volume": candle.get("volume")
|
|
|
})
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"symbol": symbol,
|
|
|
"interval": interval,
|
|
|
"items": items
|
|
|
},
|
|
|
"meta": build_meta("hf", cache_ttl_seconds=60)
|
|
|
}
|
|
|
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"symbol": symbol,
|
|
|
"interval": interval,
|
|
|
"items": []
|
|
|
},
|
|
|
"meta": build_meta("none", attempted=["hf"], error="NO_HISTORICAL_DATA")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in get_historical_data: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.get("/api/service/market-status")
|
|
|
async def get_market_status():
|
|
|
"""Get current market overview"""
|
|
|
try:
|
|
|
|
|
|
hf_result = await try_hf_first("market", {})
|
|
|
|
|
|
if hf_result:
|
|
|
items = hf_result.get("data", [])[:10]
|
|
|
|
|
|
|
|
|
total_market_cap = sum(item.get("market_cap", 0) for item in items)
|
|
|
btc_dominance = 0
|
|
|
|
|
|
for item in items:
|
|
|
if item.get("symbol") == "BTC":
|
|
|
btc_dominance = (item.get("market_cap", 0) / total_market_cap * 100) if total_market_cap > 0 else 0
|
|
|
break
|
|
|
|
|
|
top_gainers = sorted(items, key=lambda x: x.get("change_24h", 0), reverse=True)[:3]
|
|
|
top_losers = sorted(items, key=lambda x: x.get("change_24h", 0))[:3]
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"total_market_cap": total_market_cap,
|
|
|
"btc_dominance": btc_dominance,
|
|
|
"top_gainers": top_gainers,
|
|
|
"top_losers": top_losers,
|
|
|
"active_cryptos": len(items),
|
|
|
"timestamp": datetime.utcnow().isoformat() + "Z"
|
|
|
},
|
|
|
"meta": build_meta("hf", cache_ttl_seconds=30)
|
|
|
}
|
|
|
|
|
|
|
|
|
return {
|
|
|
"data": None,
|
|
|
"meta": build_meta("none", attempted=["hf"], error="MARKET_DATA_UNAVAILABLE")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in get_market_status: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.get("/api/service/top")
|
|
|
async def get_top_coins(
|
|
|
n: int = Query(10, description="Number of coins (10 or 50)")
|
|
|
):
|
|
|
"""Get top N coins by market cap"""
|
|
|
if n not in [10, 50]:
|
|
|
n = 10
|
|
|
|
|
|
try:
|
|
|
|
|
|
hf_result = await try_hf_first("market", {"limit": n})
|
|
|
|
|
|
if hf_result:
|
|
|
items = []
|
|
|
for i, coin in enumerate(hf_result.get("data", [])[:n], 1):
|
|
|
items.append({
|
|
|
"rank": i,
|
|
|
"symbol": coin.get("symbol"),
|
|
|
"name": coin.get("name"),
|
|
|
"price": coin.get("price"),
|
|
|
"market_cap": coin.get("market_cap"),
|
|
|
"change_24h": coin.get("change_24h"),
|
|
|
"volume_24h": coin.get("volume_24h")
|
|
|
})
|
|
|
|
|
|
return {
|
|
|
"data": items,
|
|
|
"meta": build_meta("hf", cache_ttl_seconds=60)
|
|
|
}
|
|
|
|
|
|
|
|
|
return {
|
|
|
"data": [],
|
|
|
"meta": build_meta("none", attempted=["hf"], error="DATA_NOT_AVAILABLE")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in get_top_coins: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.get("/api/service/whales")
|
|
|
async def get_whale_movements(
|
|
|
chain: str = Query("ethereum", description="Blockchain network"),
|
|
|
min_amount_usd: float = Query(100000, description="Minimum amount in USD"),
|
|
|
limit: int = Query(50, description="Number of transactions")
|
|
|
):
|
|
|
"""Get whale transactions"""
|
|
|
try:
|
|
|
|
|
|
hf_result = await try_hf_first("whales", {
|
|
|
"chain": chain,
|
|
|
"min_amount_usd": min_amount_usd,
|
|
|
"limit": limit
|
|
|
})
|
|
|
|
|
|
if hf_result:
|
|
|
transactions = []
|
|
|
for tx in hf_result.get("data", [])[:limit]:
|
|
|
transactions.append({
|
|
|
"tx_hash": tx.get("hash"),
|
|
|
"from": tx.get("from"),
|
|
|
"to": tx.get("to"),
|
|
|
"amount_usd": tx.get("amount_usd"),
|
|
|
"token": tx.get("token"),
|
|
|
"block": tx.get("block"),
|
|
|
"ts": tx.get("timestamp")
|
|
|
})
|
|
|
|
|
|
|
|
|
db = next(get_db())
|
|
|
await persist_to_db(db, "whale", transactions, {"source": "hf"})
|
|
|
|
|
|
return {
|
|
|
"data": transactions,
|
|
|
"meta": build_meta("hf", cache_ttl_seconds=60)
|
|
|
}
|
|
|
|
|
|
|
|
|
return {
|
|
|
"data": [],
|
|
|
"meta": build_meta("none", attempted=["hf"], error="NO_WHALE_DATA")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in get_whale_movements: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.get("/api/service/onchain")
|
|
|
async def get_onchain_data(
|
|
|
address: str = Query(..., description="Wallet address"),
|
|
|
chain: str = Query("ethereum", description="Blockchain network"),
|
|
|
limit: int = Query(50, description="Number of transactions")
|
|
|
):
|
|
|
"""Get on-chain data for address"""
|
|
|
try:
|
|
|
|
|
|
return {
|
|
|
"data": {
|
|
|
"address": address,
|
|
|
"chain": chain,
|
|
|
"balance": 0,
|
|
|
"token_balances": [],
|
|
|
"recent_transactions": [],
|
|
|
"total_transactions": 0
|
|
|
},
|
|
|
"meta": build_meta("etherscan", cache_ttl_seconds=60)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in get_onchain_data: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
@router.post("/api/service/query")
|
|
|
async def generic_query(request: GenericQueryRequest):
|
|
|
"""
|
|
|
Generic query endpoint - routes to appropriate handler
|
|
|
Single entry point for all query types
|
|
|
"""
|
|
|
try:
|
|
|
query_type = request.type
|
|
|
payload = request.payload
|
|
|
|
|
|
if query_type == "rate":
|
|
|
result = await get_single_rate(
|
|
|
pair=payload.get("pair", "BTC/USDT"),
|
|
|
convert=payload.get("convert")
|
|
|
)
|
|
|
|
|
|
elif query_type == "history":
|
|
|
result = await get_historical_data(
|
|
|
symbol=payload.get("symbol", "BTC"),
|
|
|
interval=payload.get("interval", 60),
|
|
|
limit=payload.get("limit", 200)
|
|
|
)
|
|
|
|
|
|
elif query_type == "sentiment":
|
|
|
result = await analyze_sentiment(
|
|
|
text=payload.get("text"),
|
|
|
symbol=payload.get("symbol"),
|
|
|
mode=payload.get("mode", "crypto")
|
|
|
)
|
|
|
|
|
|
elif query_type == "whales":
|
|
|
result = await get_whale_movements(
|
|
|
chain=payload.get("chain", "ethereum"),
|
|
|
min_amount_usd=payload.get("min_amount_usd", 100000),
|
|
|
limit=payload.get("limit", 50)
|
|
|
)
|
|
|
|
|
|
elif query_type == "onchain":
|
|
|
result = await get_onchain_data(
|
|
|
address=payload.get("address"),
|
|
|
chain=payload.get("chain", "ethereum"),
|
|
|
limit=payload.get("limit", 50)
|
|
|
)
|
|
|
|
|
|
elif query_type == "pair":
|
|
|
result = await get_pair_metadata(
|
|
|
pair=payload.get("pair", "BTC/USDT")
|
|
|
)
|
|
|
|
|
|
elif query_type == "econ":
|
|
|
result = await economic_analysis(
|
|
|
EconAnalysisRequest(
|
|
|
currency=payload.get("currency", "BTC"),
|
|
|
period=payload.get("period", "1M"),
|
|
|
context=payload.get("context", "macro")
|
|
|
)
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
raise HTTPException(status_code=400, detail=f"Unknown query type: {query_type}")
|
|
|
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in generic_query: {e}")
|
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.websocket("/ws")
|
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
|
"""
|
|
|
WebSocket endpoint for real-time subscriptions
|
|
|
|
|
|
Subscribe format:
|
|
|
{
|
|
|
"action": "subscribe",
|
|
|
"service": "market_data",
|
|
|
"symbols": ["BTC", "ETH"]
|
|
|
}
|
|
|
"""
|
|
|
await ws_manager.connect(websocket)
|
|
|
|
|
|
try:
|
|
|
while True:
|
|
|
data = await websocket.receive_text()
|
|
|
message = json.loads(data)
|
|
|
|
|
|
if message.get("action") == "subscribe":
|
|
|
service = message.get("service")
|
|
|
symbols = message.get("symbols", [])
|
|
|
|
|
|
|
|
|
await websocket.send_json({
|
|
|
"type": "subscribed",
|
|
|
"service": service,
|
|
|
"symbols": symbols,
|
|
|
"timestamp": datetime.utcnow().isoformat() + "Z"
|
|
|
})
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
|
for symbol in symbols:
|
|
|
|
|
|
update = {
|
|
|
"type": "update",
|
|
|
"service": service,
|
|
|
"symbol": symbol,
|
|
|
"data": {
|
|
|
"price": 50000 + (hash(symbol) % 10000),
|
|
|
"change": (hash(symbol) % 10) - 5
|
|
|
},
|
|
|
"timestamp": datetime.utcnow().isoformat() + "Z"
|
|
|
}
|
|
|
|
|
|
await websocket.send_json(update)
|
|
|
|
|
|
|
|
|
db = next(get_db())
|
|
|
await persist_to_db(db, "rate", update["data"], {"source": "hf-ws"})
|
|
|
|
|
|
await asyncio.sleep(5)
|
|
|
|
|
|
except WebSocketDisconnect:
|
|
|
ws_manager.disconnect(websocket)
|
|
|
except Exception as e:
|
|
|
logger.error(f"WebSocket error: {e}")
|
|
|
ws_manager.disconnect(websocket)
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ["router"] |