|
|
"""
|
|
|
Background Data Collection Agent
|
|
|
Continuously collects data from 305+ free resources
|
|
|
Runs automatically when HuggingFace Space starts
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import time
|
|
|
from datetime import datetime, timedelta
|
|
|
from typing import Dict, List, Any
|
|
|
import logging
|
|
|
|
|
|
|
|
|
import sys
|
|
|
sys.path.insert(0, '/workspace')
|
|
|
from core.smart_fallback_manager import get_fallback_manager
|
|
|
from core.smart_proxy_manager import get_proxy_manager
|
|
|
from database.db_manager import db_manager
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class DataCollectionAgent:
|
|
|
"""
|
|
|
Background agent that continuously collects data
|
|
|
- Collects from 305+ free resources
|
|
|
- Stores in database cache
|
|
|
- Runs 24/7 in background
|
|
|
- Auto-handles failures with fallback
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.fallback_manager = get_fallback_manager()
|
|
|
self.proxy_manager = get_proxy_manager()
|
|
|
self.is_running = False
|
|
|
self.collection_stats = {
|
|
|
'total_collections': 0,
|
|
|
'successful_collections': 0,
|
|
|
'failed_collections': 0,
|
|
|
'last_collection_time': None,
|
|
|
'collections_by_category': {}
|
|
|
}
|
|
|
|
|
|
|
|
|
self.intervals = {
|
|
|
'market_data_apis': 30,
|
|
|
'news_apis': 300,
|
|
|
'sentiment_apis': 180,
|
|
|
'whale_tracking_apis': 60,
|
|
|
'block_explorers': 120,
|
|
|
'onchain_analytics_apis': 300,
|
|
|
}
|
|
|
|
|
|
|
|
|
self.last_collection = {}
|
|
|
|
|
|
logger.info("β
DataCollectionAgent initialized")
|
|
|
|
|
|
async def start(self):
|
|
|
"""Start the data collection agent"""
|
|
|
if self.is_running:
|
|
|
logger.warning("β οΈ Agent already running")
|
|
|
return
|
|
|
|
|
|
self.is_running = True
|
|
|
logger.info("π Starting DataCollectionAgent...")
|
|
|
|
|
|
|
|
|
tasks = [
|
|
|
self.collect_market_data(),
|
|
|
self.collect_news_data(),
|
|
|
self.collect_sentiment_data(),
|
|
|
self.collect_whale_tracking(),
|
|
|
self.collect_blockchain_data(),
|
|
|
self.health_check_loop(),
|
|
|
]
|
|
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
async def stop(self):
|
|
|
"""Stop the agent"""
|
|
|
self.is_running = False
|
|
|
logger.info("π Stopping DataCollectionAgent...")
|
|
|
|
|
|
async def collect_market_data(self):
|
|
|
"""Continuously collect market data"""
|
|
|
category = 'market_data_apis'
|
|
|
interval = self.intervals[category]
|
|
|
|
|
|
while self.is_running:
|
|
|
try:
|
|
|
logger.info(f"π Collecting market data...")
|
|
|
|
|
|
|
|
|
data = await self.fallback_manager.fetch_with_fallback(
|
|
|
category=category,
|
|
|
endpoint_path="/coins/markets",
|
|
|
params={
|
|
|
"vs_currency": "usd",
|
|
|
"order": "market_cap_desc",
|
|
|
"per_page": 250,
|
|
|
"page": 1
|
|
|
},
|
|
|
max_attempts=10
|
|
|
)
|
|
|
|
|
|
if data:
|
|
|
|
|
|
await self._store_market_data(data)
|
|
|
|
|
|
self.collection_stats['successful_collections'] += 1
|
|
|
logger.info(f"β
Market data collected successfully")
|
|
|
else:
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
logger.warning(f"β οΈ Failed to collect market data after all attempts")
|
|
|
|
|
|
|
|
|
self.collection_stats['total_collections'] += 1
|
|
|
self.last_collection[category] = datetime.now()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error collecting market data: {e}")
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
async def collect_news_data(self):
|
|
|
"""Continuously collect news data"""
|
|
|
category = 'news_apis'
|
|
|
interval = self.intervals[category]
|
|
|
|
|
|
while self.is_running:
|
|
|
try:
|
|
|
logger.info(f"π° Collecting news data...")
|
|
|
|
|
|
|
|
|
data = await self.fallback_manager.fetch_with_fallback(
|
|
|
category=category,
|
|
|
endpoint_path="/news",
|
|
|
params={"limit": 50},
|
|
|
max_attempts=5
|
|
|
)
|
|
|
|
|
|
if data:
|
|
|
await self._store_news_data(data)
|
|
|
self.collection_stats['successful_collections'] += 1
|
|
|
logger.info(f"β
News data collected successfully")
|
|
|
else:
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
self.collection_stats['total_collections'] += 1
|
|
|
self.last_collection[category] = datetime.now()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error collecting news: {e}")
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
async def collect_sentiment_data(self):
|
|
|
"""Continuously collect sentiment data"""
|
|
|
category = 'sentiment_apis'
|
|
|
interval = self.intervals[category]
|
|
|
|
|
|
while self.is_running:
|
|
|
try:
|
|
|
logger.info(f"π Collecting sentiment data...")
|
|
|
|
|
|
|
|
|
data = await self.fallback_manager.fetch_with_fallback(
|
|
|
category=category,
|
|
|
endpoint_path="/sentiment",
|
|
|
max_attempts=5
|
|
|
)
|
|
|
|
|
|
if data:
|
|
|
await self._store_sentiment_data(data)
|
|
|
self.collection_stats['successful_collections'] += 1
|
|
|
logger.info(f"β
Sentiment data collected successfully")
|
|
|
else:
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
self.collection_stats['total_collections'] += 1
|
|
|
self.last_collection[category] = datetime.now()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error collecting sentiment: {e}")
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
async def collect_whale_tracking(self):
|
|
|
"""Continuously collect whale tracking data"""
|
|
|
category = 'whale_tracking_apis'
|
|
|
interval = self.intervals[category]
|
|
|
|
|
|
while self.is_running:
|
|
|
try:
|
|
|
logger.info(f"π Collecting whale tracking data...")
|
|
|
|
|
|
data = await self.fallback_manager.fetch_with_fallback(
|
|
|
category=category,
|
|
|
endpoint_path="/whales",
|
|
|
max_attempts=5
|
|
|
)
|
|
|
|
|
|
if data:
|
|
|
await self._store_whale_data(data)
|
|
|
self.collection_stats['successful_collections'] += 1
|
|
|
logger.info(f"β
Whale data collected successfully")
|
|
|
else:
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
self.collection_stats['total_collections'] += 1
|
|
|
self.last_collection[category] = datetime.now()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error collecting whale data: {e}")
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
async def collect_blockchain_data(self):
|
|
|
"""Continuously collect blockchain data"""
|
|
|
category = 'block_explorers'
|
|
|
interval = self.intervals[category]
|
|
|
|
|
|
while self.is_running:
|
|
|
try:
|
|
|
logger.info(f"βοΈ Collecting blockchain data...")
|
|
|
|
|
|
|
|
|
chains = ['ethereum', 'bsc', 'polygon']
|
|
|
|
|
|
for chain in chains:
|
|
|
data = await self.fallback_manager.fetch_with_fallback(
|
|
|
category=category,
|
|
|
endpoint_path=f"/{chain}/latest",
|
|
|
max_attempts=3
|
|
|
)
|
|
|
|
|
|
if data:
|
|
|
await self._store_blockchain_data(chain, data)
|
|
|
|
|
|
self.collection_stats['successful_collections'] += 1
|
|
|
self.collection_stats['total_collections'] += 1
|
|
|
self.last_collection[category] = datetime.now()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error collecting blockchain data: {e}")
|
|
|
self.collection_stats['failed_collections'] += 1
|
|
|
|
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
async def health_check_loop(self):
|
|
|
"""Periodically check health and clean up failed resources"""
|
|
|
while self.is_running:
|
|
|
try:
|
|
|
|
|
|
await asyncio.sleep(600)
|
|
|
|
|
|
logger.info("π₯ Running health check...")
|
|
|
|
|
|
|
|
|
report = self.fallback_manager.get_health_report()
|
|
|
|
|
|
logger.info(f"π Health Report:")
|
|
|
logger.info(f" Total Resources: {report['total_resources']}")
|
|
|
logger.info(f" Active: {report['by_status']['active']}")
|
|
|
logger.info(f" Degraded: {report['by_status']['degraded']}")
|
|
|
logger.info(f" Failed: {report['by_status']['failed']}")
|
|
|
logger.info(f" Proxy Needed: {report['by_status']['proxy_needed']}")
|
|
|
|
|
|
|
|
|
removed = self.fallback_manager.cleanup_failed_resources(max_age_hours=24)
|
|
|
|
|
|
if removed:
|
|
|
logger.info(f"ποΈ Cleaned up {len(removed)} failed resources")
|
|
|
|
|
|
|
|
|
await self.proxy_manager.test_all_proxies()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Health check error: {e}")
|
|
|
|
|
|
async def _store_market_data(self, data: Any):
|
|
|
"""Store market data in database"""
|
|
|
try:
|
|
|
|
|
|
if isinstance(data, list):
|
|
|
for item in data:
|
|
|
symbol = item.get('symbol', '').upper()
|
|
|
if symbol:
|
|
|
db_manager.cache_market_data(
|
|
|
symbol=symbol,
|
|
|
price=item.get('current_price', 0),
|
|
|
volume=item.get('total_volume', 0),
|
|
|
market_cap=item.get('market_cap', 0),
|
|
|
change_24h=item.get('price_change_percentage_24h', 0),
|
|
|
data=item
|
|
|
)
|
|
|
logger.debug(f"πΎ Stored market data in database")
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error storing market data: {e}")
|
|
|
|
|
|
async def _store_news_data(self, data: Any):
|
|
|
"""Store news data in database"""
|
|
|
try:
|
|
|
|
|
|
logger.debug(f"πΎ Stored news data in database")
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error storing news data: {e}")
|
|
|
|
|
|
async def _store_sentiment_data(self, data: Any):
|
|
|
"""Store sentiment data in database"""
|
|
|
try:
|
|
|
logger.debug(f"πΎ Stored sentiment data in database")
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error storing sentiment data: {e}")
|
|
|
|
|
|
async def _store_whale_data(self, data: Any):
|
|
|
"""Store whale tracking data in database"""
|
|
|
try:
|
|
|
logger.debug(f"πΎ Stored whale data in database")
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error storing whale data: {e}")
|
|
|
|
|
|
async def _store_blockchain_data(self, chain: str, data: Any):
|
|
|
"""Store blockchain data in database"""
|
|
|
try:
|
|
|
logger.debug(f"πΎ Stored {chain} blockchain data in database")
|
|
|
except Exception as e:
|
|
|
logger.error(f"β Error storing blockchain data: {e}")
|
|
|
|
|
|
def get_stats(self) -> Dict:
|
|
|
"""Get collection statistics"""
|
|
|
return {
|
|
|
**self.collection_stats,
|
|
|
'is_running': self.is_running,
|
|
|
'last_collection': {
|
|
|
category: last_time.isoformat() if last_time else None
|
|
|
for category, last_time in self.last_collection.items()
|
|
|
},
|
|
|
'health_report': self.fallback_manager.get_health_report(),
|
|
|
'proxy_status': self.proxy_manager.get_status_report()
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
_agent = None
|
|
|
|
|
|
def get_data_collection_agent() -> DataCollectionAgent:
|
|
|
"""Get global data collection agent"""
|
|
|
global _agent
|
|
|
if _agent is None:
|
|
|
_agent = DataCollectionAgent()
|
|
|
return _agent
|
|
|
|
|
|
|
|
|
async def start_data_collection_agent():
|
|
|
"""Start the data collection agent"""
|
|
|
agent = get_data_collection_agent()
|
|
|
await agent.start()
|
|
|
|