|
|
|
|
|
"""
|
|
|
Market API Router - Implements cryptocurrency market endpoints
|
|
|
Handles GET /api/market/price, GET /api/market/ohlc, POST /api/sentiment/analyze, and WebSocket /ws
|
|
|
"""
|
|
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, WebSocket, WebSocketDisconnect
|
|
|
from fastapi.responses import JSONResponse
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
from pydantic import BaseModel, Field
|
|
|
from datetime import datetime
|
|
|
import logging
|
|
|
import json
|
|
|
import asyncio
|
|
|
import time
|
|
|
|
|
|
|
|
|
from backend.services.coingecko_client import coingecko_client
|
|
|
from backend.services.binance_client import BinanceClient
|
|
|
from backend.services.ai_service_unified import UnifiedAIService
|
|
|
from backend.services.market_data_aggregator import market_data_aggregator
|
|
|
from backend.services.sentiment_aggregator import sentiment_aggregator
|
|
|
from backend.services.hf_dataset_aggregator import hf_dataset_aggregator
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
router = APIRouter(tags=["Market API"])
|
|
|
|
|
|
|
|
|
class WebSocketManager:
|
|
|
"""Manages WebSocket connections and subscriptions"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.active_connections: Dict[str, WebSocket] = {}
|
|
|
self.subscriptions: Dict[str, List[str]] = {}
|
|
|
self.price_streams: Dict[str, asyncio.Task] = {}
|
|
|
|
|
|
async def connect(self, websocket: WebSocket, client_id: str):
|
|
|
"""Accept WebSocket connection"""
|
|
|
await websocket.accept()
|
|
|
self.active_connections[client_id] = websocket
|
|
|
self.subscriptions[client_id] = []
|
|
|
logger.info(f"WebSocket client {client_id} connected")
|
|
|
|
|
|
async def disconnect(self, client_id: str):
|
|
|
"""Disconnect WebSocket client"""
|
|
|
if client_id in self.active_connections:
|
|
|
del self.active_connections[client_id]
|
|
|
if client_id in self.subscriptions:
|
|
|
del self.subscriptions[client_id]
|
|
|
if client_id in self.price_streams:
|
|
|
self.price_streams[client_id].cancel()
|
|
|
del self.price_streams[client_id]
|
|
|
logger.info(f"WebSocket client {client_id} disconnected")
|
|
|
|
|
|
async def subscribe(self, client_id: str, symbol: str):
|
|
|
"""Subscribe client to symbol updates"""
|
|
|
if client_id not in self.subscriptions:
|
|
|
self.subscriptions[client_id] = []
|
|
|
if symbol.upper() not in self.subscriptions[client_id]:
|
|
|
self.subscriptions[client_id].append(symbol.upper())
|
|
|
logger.info(f"Client {client_id} subscribed to {symbol.upper()}")
|
|
|
|
|
|
async def send_message(self, client_id: str, message: Dict[str, Any]):
|
|
|
"""Send message to specific client"""
|
|
|
if client_id in self.active_connections:
|
|
|
try:
|
|
|
await self.active_connections[client_id].send_json(message)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error sending message to {client_id}: {e}")
|
|
|
await self.disconnect(client_id)
|
|
|
|
|
|
async def broadcast_to_subscribers(self, symbol: str, data: Dict[str, Any]):
|
|
|
"""Broadcast data to all clients subscribed to symbol"""
|
|
|
symbol_upper = symbol.upper()
|
|
|
for client_id, symbols in self.subscriptions.items():
|
|
|
if symbol_upper in symbols:
|
|
|
await self.send_message(client_id, data)
|
|
|
|
|
|
|
|
|
ws_manager = WebSocketManager()
|
|
|
|
|
|
|
|
|
binance_client = BinanceClient()
|
|
|
|
|
|
|
|
|
ai_service = UnifiedAIService()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/api/market/price")
|
|
|
async def get_market_price(
|
|
|
symbol: str = Query(..., description="Cryptocurrency symbol (e.g., BTC, ETH)")
|
|
|
):
|
|
|
"""
|
|
|
Fetch the current market price of a specific cryptocurrency.
|
|
|
Uses ALL free market data providers with intelligent fallback:
|
|
|
CoinGecko, CoinPaprika, CoinCap, Binance, CoinLore, Messari, CoinStats
|
|
|
|
|
|
Returns:
|
|
|
- If symbol is valid: current price with timestamp
|
|
|
- If symbol is invalid: 404 error
|
|
|
"""
|
|
|
try:
|
|
|
symbol_upper = symbol.upper()
|
|
|
|
|
|
|
|
|
price_data = await market_data_aggregator.get_price(symbol_upper)
|
|
|
|
|
|
return {
|
|
|
"symbol": price_data.get("symbol", symbol_upper),
|
|
|
"price": price_data.get("price", 0),
|
|
|
"source": price_data.get("source", "unknown"),
|
|
|
"timestamp": price_data.get("timestamp", int(time.time() * 1000)) // 1000
|
|
|
}
|
|
|
|
|
|
except HTTPException:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching price for {symbol}: {e}")
|
|
|
raise HTTPException(
|
|
|
status_code=502,
|
|
|
detail=f"Error fetching price data: {str(e)}"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/api/market/ohlc")
|
|
|
async def get_market_ohlc(
|
|
|
symbol: str = Query(..., description="Cryptocurrency symbol (e.g., BTC, ETH)"),
|
|
|
interval: Optional[str] = Query(None, description="Interval (1h, 4h, 1d) - alias for timeframe"),
|
|
|
timeframe: str = Query("1h", description="Timeframe (1h, 4h, 1d)"),
|
|
|
limit: int = Query(100, description="Number of data points to return")
|
|
|
):
|
|
|
"""
|
|
|
Fetch historical OHLC (Open, High, Low, Close) data for a cryptocurrency.
|
|
|
Uses multiple sources with fallback:
|
|
|
1. Binance Public API (real-time)
|
|
|
2. HuggingFace Datasets (linxy/CryptoCoin - 26 symbols)
|
|
|
3. HuggingFace Datasets (WinkingFace/CryptoLM - BTC, ETH, SOL, XRP)
|
|
|
|
|
|
Returns:
|
|
|
- If symbol and timeframe are valid: OHLC data array
|
|
|
- If invalid: 404 error
|
|
|
"""
|
|
|
try:
|
|
|
symbol_upper = symbol.upper()
|
|
|
|
|
|
|
|
|
actual_timeframe = interval if interval else timeframe
|
|
|
|
|
|
|
|
|
valid_timeframes = ["1m", "5m", "15m", "30m", "1h", "4h", "1d", "1w"]
|
|
|
if actual_timeframe not in valid_timeframes:
|
|
|
raise HTTPException(
|
|
|
status_code=400,
|
|
|
detail=f"Invalid timeframe '{actual_timeframe}'. Valid timeframes: {', '.join(valid_timeframes)}"
|
|
|
)
|
|
|
|
|
|
|
|
|
try:
|
|
|
ohlcv_data = await binance_client.get_ohlcv(symbol_upper, actual_timeframe, limit=limit)
|
|
|
|
|
|
if ohlcv_data and len(ohlcv_data) > 0:
|
|
|
|
|
|
ohlc_list = []
|
|
|
for item in ohlcv_data:
|
|
|
ohlc_list.append({
|
|
|
"open": item.get("open", 0),
|
|
|
"high": item.get("high", 0),
|
|
|
"low": item.get("low", 0),
|
|
|
"close": item.get("close", 0),
|
|
|
"timestamp": item.get("timestamp", int(time.time()))
|
|
|
})
|
|
|
|
|
|
logger.info(f"✅ Binance: Fetched OHLC for {symbol_upper}/{actual_timeframe}")
|
|
|
return {
|
|
|
"symbol": symbol_upper,
|
|
|
"timeframe": actual_timeframe,
|
|
|
"interval": actual_timeframe,
|
|
|
"ohlc": ohlc_list,
|
|
|
"source": "binance"
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.warning(f"⚠️ Binance failed for {symbol_upper}/{actual_timeframe}: {e}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
hf_ohlcv_data = await hf_dataset_aggregator.get_ohlcv(symbol_upper, actual_timeframe, limit=limit)
|
|
|
|
|
|
if hf_ohlcv_data and len(hf_ohlcv_data) > 0:
|
|
|
|
|
|
ohlc_list = []
|
|
|
for item in hf_ohlcv_data:
|
|
|
ohlc_list.append({
|
|
|
"open": item.get("open", 0),
|
|
|
"high": item.get("high", 0),
|
|
|
"low": item.get("low", 0),
|
|
|
"close": item.get("close", 0),
|
|
|
"timestamp": item.get("timestamp", int(time.time()))
|
|
|
})
|
|
|
|
|
|
logger.info(f"✅ HuggingFace Datasets: Fetched OHLC for {symbol_upper}/{actual_timeframe}")
|
|
|
return {
|
|
|
"symbol": symbol_upper,
|
|
|
"timeframe": actual_timeframe,
|
|
|
"interval": actual_timeframe,
|
|
|
"ohlc": ohlc_list,
|
|
|
"source": "huggingface"
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.warning(f"⚠️ HuggingFace Datasets failed for {symbol_upper}/{actual_timeframe}: {e}")
|
|
|
|
|
|
|
|
|
raise HTTPException(
|
|
|
status_code=404,
|
|
|
detail=f"No OHLC data found for symbol '{symbol}' with timeframe '{actual_timeframe}' from any source (Binance, HuggingFace)"
|
|
|
)
|
|
|
|
|
|
except HTTPException:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error fetching OHLC data: {e}")
|
|
|
raise HTTPException(
|
|
|
status_code=502,
|
|
|
detail=f"Error fetching OHLC data: {str(e)}"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SentimentAnalyzeRequest(BaseModel):
|
|
|
"""Request model for sentiment analysis"""
|
|
|
text: str = Field(..., description="Text to analyze for sentiment", min_length=1)
|
|
|
|
|
|
|
|
|
@router.post("/api/sentiment/analyze")
|
|
|
async def analyze_sentiment(request: SentimentAnalyzeRequest):
|
|
|
"""
|
|
|
Analyze the sentiment of a given text (Bullish, Bearish, Neutral).
|
|
|
|
|
|
Returns:
|
|
|
- If text is valid: sentiment analysis result
|
|
|
- If text is missing or invalid: 400 error
|
|
|
"""
|
|
|
try:
|
|
|
if not request.text or len(request.text.strip()) == 0:
|
|
|
raise HTTPException(
|
|
|
status_code=400,
|
|
|
detail="Text parameter is required and cannot be empty"
|
|
|
)
|
|
|
|
|
|
|
|
|
try:
|
|
|
result = await ai_service.analyze_sentiment(
|
|
|
text=request.text,
|
|
|
category="crypto",
|
|
|
use_ensemble=True
|
|
|
)
|
|
|
|
|
|
|
|
|
label = result.get("label", "neutral").lower()
|
|
|
confidence = result.get("confidence", 0.5)
|
|
|
|
|
|
|
|
|
if "bullish" in label or "positive" in label:
|
|
|
sentiment = "Bullish"
|
|
|
score = confidence if confidence > 0.5 else 0.6
|
|
|
elif "bearish" in label or "negative" in label:
|
|
|
sentiment = "Bearish"
|
|
|
score = 1 - confidence if confidence < 0.5 else 0.4
|
|
|
else:
|
|
|
sentiment = "Neutral"
|
|
|
score = 0.5
|
|
|
|
|
|
return {
|
|
|
"sentiment": sentiment,
|
|
|
"score": score,
|
|
|
"confidence": confidence
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error analyzing sentiment: {e}")
|
|
|
|
|
|
text_lower = request.text.lower()
|
|
|
positive_words = ['bullish', 'buy', 'moon', 'pump', 'up', 'gain', 'profit', 'good', 'great', 'strong']
|
|
|
negative_words = ['bearish', 'sell', 'dump', 'down', 'loss', 'crash', 'bad', 'fear', 'weak', 'drop']
|
|
|
|
|
|
pos_count = sum(1 for word in positive_words if word in text_lower)
|
|
|
neg_count = sum(1 for word in negative_words if word in text_lower)
|
|
|
|
|
|
if pos_count > neg_count:
|
|
|
sentiment = "Bullish"
|
|
|
elif neg_count > pos_count:
|
|
|
sentiment = "Bearish"
|
|
|
else:
|
|
|
sentiment = "Neutral"
|
|
|
|
|
|
return {
|
|
|
"sentiment": sentiment,
|
|
|
"score": 0.65 if sentiment == "Bullish" else (0.35 if sentiment == "Bearish" else 0.5),
|
|
|
"confidence": 0.6
|
|
|
}
|
|
|
|
|
|
except HTTPException:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in sentiment analysis: {e}")
|
|
|
raise HTTPException(
|
|
|
status_code=502,
|
|
|
detail=f"Error analyzing sentiment: {str(e)}"
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def stream_price_updates(client_id: str, symbol: str):
|
|
|
"""Stream price updates for a subscribed symbol"""
|
|
|
symbol_upper = symbol.upper()
|
|
|
|
|
|
while client_id in ws_manager.active_connections:
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
market_data = await coingecko_client.get_market_prices(symbols=[symbol_upper], limit=1)
|
|
|
if market_data and len(market_data) > 0:
|
|
|
coin = market_data[0]
|
|
|
price = coin.get("price", 0)
|
|
|
else:
|
|
|
|
|
|
ticker = await binance_client.get_ticker(f"{symbol_upper}USDT")
|
|
|
price = float(ticker.get("lastPrice", 0)) if ticker else 0
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Error fetching price for {symbol_upper}: {e}")
|
|
|
price = 0
|
|
|
|
|
|
|
|
|
await ws_manager.send_message(client_id, {
|
|
|
"symbol": symbol_upper,
|
|
|
"price": price,
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
|
|
|
|
|
|
await asyncio.sleep(5)
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
break
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in price stream for {symbol_upper}: {e}")
|
|
|
await asyncio.sleep(5)
|
|
|
|
|
|
|
|
|
@router.websocket("/ws")
|
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
|
"""
|
|
|
WebSocket endpoint for real-time cryptocurrency data updates.
|
|
|
|
|
|
Connection:
|
|
|
- Clients connect to receive real-time data
|
|
|
- Send subscription messages to subscribe to specific symbols
|
|
|
|
|
|
Subscription Message:
|
|
|
{
|
|
|
"type": "subscribe",
|
|
|
"symbol": "BTC"
|
|
|
}
|
|
|
|
|
|
Unsubscribe Message:
|
|
|
{
|
|
|
"type": "unsubscribe",
|
|
|
"symbol": "BTC"
|
|
|
}
|
|
|
|
|
|
Ping Message:
|
|
|
{
|
|
|
"type": "ping"
|
|
|
}
|
|
|
"""
|
|
|
client_id = f"client_{int(time.time() * 1000)}_{id(websocket)}"
|
|
|
|
|
|
try:
|
|
|
await ws_manager.connect(websocket, client_id)
|
|
|
|
|
|
|
|
|
await websocket.send_json({
|
|
|
"type": "connected",
|
|
|
"client_id": client_id,
|
|
|
"message": "Connected to cryptocurrency data WebSocket",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
|
|
|
|
|
|
while True:
|
|
|
try:
|
|
|
|
|
|
data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
|
|
|
|
|
|
try:
|
|
|
message = json.loads(data)
|
|
|
msg_type = message.get("type", "").lower()
|
|
|
|
|
|
if msg_type == "subscribe":
|
|
|
symbol = message.get("symbol", "").upper()
|
|
|
if not symbol:
|
|
|
await websocket.send_json({
|
|
|
"type": "error",
|
|
|
"error": "Symbol is required for subscription",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
continue
|
|
|
|
|
|
await ws_manager.subscribe(client_id, symbol)
|
|
|
|
|
|
|
|
|
task_key = f"{client_id}_{symbol}"
|
|
|
if task_key not in ws_manager.price_streams:
|
|
|
task = asyncio.create_task(stream_price_updates(client_id, symbol))
|
|
|
ws_manager.price_streams[task_key] = task
|
|
|
|
|
|
await websocket.send_json({
|
|
|
"type": "subscribed",
|
|
|
"symbol": symbol,
|
|
|
"message": f"Subscribed to {symbol} updates",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
|
|
|
elif msg_type == "unsubscribe":
|
|
|
symbol = message.get("symbol", "").upper()
|
|
|
if symbol in ws_manager.subscriptions.get(client_id, []):
|
|
|
ws_manager.subscriptions[client_id].remove(symbol)
|
|
|
task_key = f"{client_id}_{symbol}"
|
|
|
if task_key in ws_manager.price_streams:
|
|
|
ws_manager.price_streams[task_key].cancel()
|
|
|
del ws_manager.price_streams[task_key]
|
|
|
|
|
|
await websocket.send_json({
|
|
|
"type": "unsubscribed",
|
|
|
"symbol": symbol,
|
|
|
"message": f"Unsubscribed from {symbol} updates",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
|
|
|
elif msg_type == "ping":
|
|
|
await websocket.send_json({
|
|
|
"type": "pong",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
|
|
|
else:
|
|
|
await websocket.send_json({
|
|
|
"type": "error",
|
|
|
"error": f"Unknown message type: {msg_type}",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
await websocket.send_json({
|
|
|
"type": "error",
|
|
|
"error": "Invalid JSON format",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
|
await websocket.send_json({
|
|
|
"type": "heartbeat",
|
|
|
"timestamp": int(time.time()),
|
|
|
"status": "alive"
|
|
|
})
|
|
|
|
|
|
except WebSocketDisconnect:
|
|
|
logger.info(f"WebSocket client {client_id} disconnected normally")
|
|
|
await ws_manager.disconnect(client_id)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"WebSocket error for {client_id}: {e}", exc_info=True)
|
|
|
try:
|
|
|
await websocket.send_json({
|
|
|
"type": "error",
|
|
|
"error": f"Server error: {str(e)}",
|
|
|
"timestamp": int(time.time())
|
|
|
})
|
|
|
except:
|
|
|
pass
|
|
|
await ws_manager.disconnect(client_id)
|
|
|
|
|
|
finally:
|
|
|
await ws_manager.disconnect(client_id)
|
|
|
|
|
|
|