|
|
"""
|
|
|
Database Query Functions for Cached Market Data
|
|
|
Provides REAL data access from cached_market_data and cached_ohlc tables
|
|
|
|
|
|
CRITICAL RULES:
|
|
|
- ONLY read from database - NEVER generate fake data
|
|
|
- Return empty list if no data found
|
|
|
- All queries must be REAL database operations
|
|
|
"""
|
|
|
|
|
|
import logging
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import Optional, List, Dict, Any
|
|
|
from sqlalchemy import desc, and_, func
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
from database.models import CachedMarketData, CachedOHLC
|
|
|
from database.db_manager import DatabaseManager
|
|
|
from utils.logger import setup_logger
|
|
|
|
|
|
logger = setup_logger("cache_queries")
|
|
|
|
|
|
|
|
|
class CacheQueries:
|
|
|
"""
|
|
|
Database query operations for cached market data
|
|
|
|
|
|
CRITICAL: All methods return REAL data from database ONLY
|
|
|
"""
|
|
|
|
|
|
def __init__(self, db_manager: DatabaseManager):
|
|
|
self.db = db_manager
|
|
|
|
|
|
def get_cached_market_data(
|
|
|
self,
|
|
|
symbols: Optional[List[str]] = None,
|
|
|
limit: int = 100
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Get cached market data from database
|
|
|
|
|
|
CRITICAL RULES:
|
|
|
- ONLY read from cached_market_data table
|
|
|
- NEVER generate or fake data
|
|
|
- Return empty list if no data found
|
|
|
- Use DISTINCT ON to get latest data per symbol
|
|
|
|
|
|
Args:
|
|
|
symbols: List of symbols to filter (e.g., ['BTC', 'ETH'])
|
|
|
limit: Maximum number of records
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries with REAL market data from database
|
|
|
"""
|
|
|
try:
|
|
|
with self.db.get_session() as session:
|
|
|
|
|
|
subq = session.query(
|
|
|
CachedMarketData.symbol,
|
|
|
func.max(CachedMarketData.fetched_at).label('max_fetched_at')
|
|
|
).group_by(CachedMarketData.symbol)
|
|
|
|
|
|
if symbols:
|
|
|
subq = subq.filter(CachedMarketData.symbol.in_(symbols))
|
|
|
|
|
|
subq = subq.subquery()
|
|
|
|
|
|
|
|
|
query = session.query(CachedMarketData).join(
|
|
|
subq,
|
|
|
and_(
|
|
|
CachedMarketData.symbol == subq.c.symbol,
|
|
|
CachedMarketData.fetched_at == subq.c.max_fetched_at
|
|
|
)
|
|
|
).order_by(desc(CachedMarketData.fetched_at)).limit(limit)
|
|
|
|
|
|
results = query.all()
|
|
|
|
|
|
if not results:
|
|
|
logger.info(f"No cached market data found for symbols={symbols}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
data = []
|
|
|
for row in results:
|
|
|
data.append({
|
|
|
"symbol": row.symbol,
|
|
|
"price": float(row.price),
|
|
|
"market_cap": float(row.market_cap) if row.market_cap else None,
|
|
|
"volume_24h": float(row.volume_24h) if row.volume_24h else None,
|
|
|
"change_24h": float(row.change_24h) if row.change_24h else None,
|
|
|
"high_24h": float(row.high_24h) if row.high_24h else None,
|
|
|
"low_24h": float(row.low_24h) if row.low_24h else None,
|
|
|
"provider": row.provider,
|
|
|
"fetched_at": row.fetched_at
|
|
|
})
|
|
|
|
|
|
logger.info(f"Retrieved {len(data)} cached market records")
|
|
|
return data
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Database error in get_cached_market_data: {e}", exc_info=True)
|
|
|
|
|
|
return []
|
|
|
|
|
|
def get_cached_ohlc(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
interval: str = "1h",
|
|
|
limit: int = 1000
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Get cached OHLC data from database
|
|
|
|
|
|
CRITICAL RULES:
|
|
|
- ONLY read from cached_ohlc table
|
|
|
- NEVER generate fake candles
|
|
|
- Return empty list if no data found
|
|
|
- Order by timestamp ASC for chart display
|
|
|
|
|
|
Args:
|
|
|
symbol: Trading pair symbol (e.g., 'BTCUSDT')
|
|
|
interval: Candle interval (e.g., '1h', '4h', '1d')
|
|
|
limit: Maximum number of candles
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries with REAL OHLC data from database
|
|
|
"""
|
|
|
try:
|
|
|
with self.db.get_session() as session:
|
|
|
|
|
|
query = session.query(CachedOHLC).filter(
|
|
|
and_(
|
|
|
CachedOHLC.symbol == symbol,
|
|
|
CachedOHLC.interval == interval
|
|
|
)
|
|
|
).order_by(desc(CachedOHLC.timestamp)).limit(limit)
|
|
|
|
|
|
results = query.all()
|
|
|
|
|
|
if not results:
|
|
|
logger.info(f"No cached OHLC data found for {symbol} {interval}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
data = []
|
|
|
for row in reversed(results):
|
|
|
data.append({
|
|
|
"timestamp": row.timestamp,
|
|
|
"open": float(row.open),
|
|
|
"high": float(row.high),
|
|
|
"low": float(row.low),
|
|
|
"close": float(row.close),
|
|
|
"volume": float(row.volume),
|
|
|
"provider": row.provider,
|
|
|
"fetched_at": row.fetched_at
|
|
|
})
|
|
|
|
|
|
logger.info(f"Retrieved {len(data)} OHLC candles for {symbol} {interval}")
|
|
|
return data
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Database error in get_cached_ohlc: {e}", exc_info=True)
|
|
|
|
|
|
return []
|
|
|
|
|
|
def save_market_data(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
price: float,
|
|
|
market_cap: Optional[float] = None,
|
|
|
volume_24h: Optional[float] = None,
|
|
|
change_24h: Optional[float] = None,
|
|
|
high_24h: Optional[float] = None,
|
|
|
low_24h: Optional[float] = None,
|
|
|
provider: str = "unknown"
|
|
|
) -> bool:
|
|
|
"""
|
|
|
Save market data to cache
|
|
|
|
|
|
CRITICAL: Only used by background workers to store REAL API data
|
|
|
|
|
|
Args:
|
|
|
symbol: Crypto symbol
|
|
|
price: Current price (REAL from API)
|
|
|
market_cap: Market cap (REAL from API)
|
|
|
volume_24h: 24h volume (REAL from API)
|
|
|
change_24h: 24h change (REAL from API)
|
|
|
high_24h: 24h high (REAL from API)
|
|
|
low_24h: 24h low (REAL from API)
|
|
|
provider: Data provider name
|
|
|
|
|
|
Returns:
|
|
|
bool: True if saved successfully
|
|
|
"""
|
|
|
try:
|
|
|
with self.db.get_session() as session:
|
|
|
|
|
|
record = CachedMarketData(
|
|
|
symbol=symbol,
|
|
|
price=price,
|
|
|
market_cap=market_cap,
|
|
|
volume_24h=volume_24h,
|
|
|
change_24h=change_24h,
|
|
|
high_24h=high_24h,
|
|
|
low_24h=low_24h,
|
|
|
provider=provider,
|
|
|
fetched_at=datetime.utcnow()
|
|
|
)
|
|
|
|
|
|
session.add(record)
|
|
|
session.commit()
|
|
|
|
|
|
logger.info(f"Saved market data for {symbol} from {provider}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error saving market data for {symbol}: {e}", exc_info=True)
|
|
|
return False
|
|
|
|
|
|
def save_ohlc_candle(
|
|
|
self,
|
|
|
symbol: str,
|
|
|
interval: str,
|
|
|
timestamp: datetime,
|
|
|
open_price: float,
|
|
|
high: float,
|
|
|
low: float,
|
|
|
close: float,
|
|
|
volume: float,
|
|
|
provider: str = "unknown"
|
|
|
) -> bool:
|
|
|
"""
|
|
|
Save OHLC candle to cache
|
|
|
|
|
|
CRITICAL: Only used by background workers to store REAL candle data
|
|
|
|
|
|
Args:
|
|
|
symbol: Trading pair symbol
|
|
|
interval: Candle interval
|
|
|
timestamp: Candle open time (REAL from API)
|
|
|
open_price: Open price (REAL from API)
|
|
|
high: High price (REAL from API)
|
|
|
low: Low price (REAL from API)
|
|
|
close: Close price (REAL from API)
|
|
|
volume: Volume (REAL from API)
|
|
|
provider: Data provider name
|
|
|
|
|
|
Returns:
|
|
|
bool: True if saved successfully
|
|
|
"""
|
|
|
try:
|
|
|
with self.db.get_session() as session:
|
|
|
|
|
|
existing = session.query(CachedOHLC).filter(
|
|
|
and_(
|
|
|
CachedOHLC.symbol == symbol,
|
|
|
CachedOHLC.interval == interval,
|
|
|
CachedOHLC.timestamp == timestamp
|
|
|
)
|
|
|
).first()
|
|
|
|
|
|
if existing:
|
|
|
|
|
|
existing.open = open_price
|
|
|
existing.high = high
|
|
|
existing.low = low
|
|
|
existing.close = close
|
|
|
existing.volume = volume
|
|
|
existing.provider = provider
|
|
|
existing.fetched_at = datetime.utcnow()
|
|
|
else:
|
|
|
|
|
|
record = CachedOHLC(
|
|
|
symbol=symbol,
|
|
|
interval=interval,
|
|
|
timestamp=timestamp,
|
|
|
open=open_price,
|
|
|
high=high,
|
|
|
low=low,
|
|
|
close=close,
|
|
|
volume=volume,
|
|
|
provider=provider,
|
|
|
fetched_at=datetime.utcnow()
|
|
|
)
|
|
|
session.add(record)
|
|
|
|
|
|
session.commit()
|
|
|
|
|
|
logger.debug(f"Saved OHLC candle for {symbol} {interval} at {timestamp}")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error saving OHLC candle for {symbol}: {e}", exc_info=True)
|
|
|
return False
|
|
|
|
|
|
def cleanup_old_data(self, days: int = 7) -> Dict[str, int]:
|
|
|
"""
|
|
|
Remove old cached data to manage storage
|
|
|
|
|
|
Args:
|
|
|
days: Remove data older than N days
|
|
|
|
|
|
Returns:
|
|
|
Dictionary with counts of deleted records
|
|
|
"""
|
|
|
try:
|
|
|
with self.db.get_session() as session:
|
|
|
cutoff_time = datetime.utcnow() - timedelta(days=days)
|
|
|
deleted_counts = {}
|
|
|
|
|
|
|
|
|
deleted = session.query(CachedMarketData).filter(
|
|
|
CachedMarketData.fetched_at < cutoff_time
|
|
|
).delete()
|
|
|
deleted_counts['market_data'] = deleted
|
|
|
|
|
|
|
|
|
deleted = session.query(CachedOHLC).filter(
|
|
|
CachedOHLC.fetched_at < cutoff_time
|
|
|
).delete()
|
|
|
deleted_counts['ohlc'] = deleted
|
|
|
|
|
|
session.commit()
|
|
|
|
|
|
total_deleted = sum(deleted_counts.values())
|
|
|
logger.info(f"Cleaned up {total_deleted} old cache records (older than {days} days)")
|
|
|
|
|
|
return deleted_counts
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error cleaning up old data: {e}", exc_info=True)
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
_cache_queries = None
|
|
|
|
|
|
def get_cache_queries(db_manager: Optional[DatabaseManager] = None) -> CacheQueries:
|
|
|
"""
|
|
|
Get global CacheQueries instance
|
|
|
|
|
|
Args:
|
|
|
db_manager: DatabaseManager instance (optional, will use global if not provided)
|
|
|
|
|
|
Returns:
|
|
|
CacheQueries instance
|
|
|
"""
|
|
|
global _cache_queries
|
|
|
|
|
|
if _cache_queries is None:
|
|
|
if db_manager is None:
|
|
|
from database.db_manager import db_manager as global_db
|
|
|
db_manager = global_db
|
|
|
_cache_queries = CacheQueries(db_manager)
|
|
|
|
|
|
return _cache_queries
|
|
|
|