File size: 6,956 Bytes
b66240d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from __future__ import annotations
import os, time, random
from typing import Dict, Any, List, Literal, Optional
import httpx

HF_API_MODELS = "https://huggingface.co/api/models"
HF_API_DATASETS = "https://huggingface.co/api/datasets"
REFRESH_INTERVAL_SEC = int(os.getenv("HF_REGISTRY_REFRESH_SEC", "21600"))
HTTP_TIMEOUT = float(os.getenv("HF_HTTP_TIMEOUT", "8.0"))

HF_MODE = os.getenv("HF_MODE", "off").lower()
if HF_MODE not in ("off", "public", "auth"):
    HF_MODE = "off"

HF_TOKEN = None
if HF_MODE == "auth":
    HF_TOKEN = os.getenv("HF_TOKEN")
    if not HF_TOKEN:
        HF_MODE = "off"

# Curated Crypto Datasets
CRYPTO_DATASETS = {
    "price": [
        "paperswithbacktest/Cryptocurrencies-Daily-Price",
        "linxy/CryptoCoin",
        "sebdg/crypto_data",
        "Farmaanaa/bitcoin_price_timeseries",
        "WinkingFace/CryptoLM-Bitcoin-BTC-USDT",
        "WinkingFace/CryptoLM-Ethereum-ETH-USDT",
        "WinkingFace/CryptoLM-Ripple-XRP-USDT",
    ],
    "news_raw": [
        "flowfree/crypto-news-headlines",
        "edaschau/bitcoin_news",
    ],
    "news_labeled": [
        "SahandNZ/cryptonews-articles-with-price-momentum-labels",
        "tahamajs/bitcoin-individual-news-dataset",
        "tahamajs/bitcoin-enhanced-prediction-dataset-with-comprehensive-news",
        "tahamajs/bitcoin-prediction-dataset-with-local-news-summaries",
        "arad1367/Crypto_Semantic_News",
    ]
}

_SEED_MODELS = ["ElKulako/cryptobert", "kk08/CryptoBERT"]
_SEED_DATASETS = []
for cat in CRYPTO_DATASETS.values():
    _SEED_DATASETS.extend(cat)

class HFRegistry:
    def __init__(self):
        self.models: Dict[str, Dict[str, Any]] = {}
        self.datasets: Dict[str, Dict[str, Any]] = {}
        self.last_refresh = 0.0
        self.fail_reason: Optional[str] = None

    async def _hf_json(self, url: str, params: Dict[str, Any]) -> Any:
        headers = {}
        if HF_MODE == "auth" and HF_TOKEN:
            headers["Authorization"] = f"Bearer {HF_TOKEN}"
        
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT, headers=headers) as client:
            r = await client.get(url, params=params)
            r.raise_for_status()
            return r.json()

    async def refresh(self) -> Dict[str, Any]:
        if HF_MODE == "off":
            self.fail_reason = "HF_MODE=off"
            return {"ok": False, "error": "HF_MODE=off", "models": 0, "datasets": 0}
        
        try:
            for name in _SEED_MODELS:
                self.models.setdefault(name, {"id": name, "source": "seed", "pipeline_tag": "sentiment-analysis"})
            
            for category, dataset_list in CRYPTO_DATASETS.items():
                for name in dataset_list:
                    self.datasets.setdefault(name, {"id": name, "source": "seed", "category": category, "tags": ["crypto", category]})
            
            if HF_MODE in ("public", "auth"):
                try:
                    q_sent = {"pipeline_tag": "sentiment-analysis", "search": "crypto", "limit": 50}
                    models = await self._hf_json(HF_API_MODELS, q_sent)
                    for m in models or []:
                        mid = m.get("modelId") or m.get("id") or m.get("name")
                        if not mid: continue
                        self.models[mid] = {
                            "id": mid,
                            "pipeline_tag": m.get("pipeline_tag"),
                            "likes": m.get("likes"),
                            "downloads": m.get("downloads"),
                            "tags": m.get("tags") or [],
                            "source": "hub"
                        }
                    
                    q_crypto = {"search": "crypto", "limit": 100}
                    datasets = await self._hf_json(HF_API_DATASETS, q_crypto)
                    for d in datasets or []:
                        did = d.get("id") or d.get("name")
                        if not did: continue
                        category = "other"
                        tags_str = " ".join(d.get("tags") or []).lower()
                        name_lower = did.lower()
                        if "price" in tags_str or "ohlc" in tags_str or "price" in name_lower:
                            category = "price"
                        elif "news" in tags_str or "news" in name_lower:
                            if "label" in tags_str or "sentiment" in tags_str:
                                category = "news_labeled"
                            else:
                                category = "news_raw"
                        
                        self.datasets[did] = {
                            "id": did,
                            "likes": d.get("likes"),
                            "downloads": d.get("downloads"),
                            "tags": d.get("tags") or [],
                            "category": category,
                            "source": "hub"
                        }
                except Exception as e:
                    error_msg = str(e)[:200]
                    if "401" in error_msg or "unauthorized" in error_msg.lower():
                        self.fail_reason = "Authentication failed"
                    else:
                        self.fail_reason = error_msg
            
            self.last_refresh = time.time()
            if self.fail_reason is None:
                return {"ok": True, "models": len(self.models), "datasets": len(self.datasets)}
            return {"ok": False, "error": self.fail_reason, "models": len(self.models), "datasets": len(self.datasets)}
        except Exception as e:
            self.fail_reason = str(e)[:200]
            return {"ok": False, "error": self.fail_reason, "models": len(self.models), "datasets": len(self.datasets)}

    def list(self, kind: Literal["models","datasets"]="models", category: Optional[str]=None) -> List[Dict[str, Any]]:
        items = list(self.models.values()) if kind == "models" else list(self.datasets.values())
        if category and kind == "datasets":
            items = [d for d in items if d.get("category") == category]
        return items

    def health(self):
        age = time.time() - (self.last_refresh or 0)
        return {
            "ok": self.last_refresh > 0 and (self.fail_reason is None),
            "last_refresh_epoch": self.last_refresh,
            "age_sec": age,
            "fail_reason": self.fail_reason,
            "counts": {"models": len(self.models), "datasets": len(self.datasets)},
            "interval_sec": REFRESH_INTERVAL_SEC
        }

REGISTRY = HFRegistry()

async def periodic_refresh(loop_sleep: int = REFRESH_INTERVAL_SEC):
    await REGISTRY.refresh()
    await _sleep(int(loop_sleep * random.uniform(0.5, 0.9)))
    while True:
        await REGISTRY.refresh()
        await _sleep(loop_sleep)

async def _sleep(sec: int):
    import asyncio
    try:
        await asyncio.sleep(sec)
    except: pass