DOoM-lb / src /leaderboard.py
Anonumous's picture
Refactor project structure and update project version
0d67035
"""
Leaderboard data management for DeathMath benchmark.
Handles downloading, parsing, and aggregating model evaluation results.
"""
import json
import logging
import os
import time
from collections.abc import Callable
from io import BytesIO
from typing import Any
import pandas as pd
from huggingface_hub import hf_hub_download, snapshot_download
from src.config import API, DEFAULT_SYSTEM_PROMPT, H4_TOKEN, RESULTS_PATH, RESULTS_REPO
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def time_diff_wrapper(func: Callable) -> Callable:
"""Decorator to measure function execution time."""
def wrapper(*args: Any, **kwargs: Any) -> Any:
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
diff = end_time - start_time
logging.info("Time taken for %s: %s seconds", func.__name__, diff)
return result
return wrapper
@time_diff_wrapper
def download_dataset(
repo_id: str, local_dir: str, repo_type: str = "dataset", max_attempts: int = 3, backoff_factor: float = 1.5
) -> None:
"""Download dataset with exponential backoff retries."""
os.makedirs(local_dir, exist_ok=True)
attempt = 0
while attempt < max_attempts:
try:
logging.info("Downloading %s to %s", repo_id, local_dir)
snapshot_download(
repo_id=repo_id,
local_dir=local_dir,
repo_type=repo_type,
tqdm_class=None,
token=H4_TOKEN,
etag_timeout=30,
max_workers=8,
force_download=True,
local_dir_use_symlinks=False,
)
logging.info("Download successful")
return
except Exception as e:
wait_time = backoff_factor**attempt
logging.error("Error downloading %s: %s, retrying in %ss", repo_id, e, wait_time)
time.sleep(wait_time)
attempt += 1
logging.error("Failed to download %s after %s attempts", repo_id, max_attempts)
def create_safe_filename(model_name: str) -> str:
"""
Create safe filename from model name.
Args:
model_name: Full model name (e.g., "username/model-name" or "model-name")
Returns:
Safe filename (e.g., "username_model-name.json" or "model-name.json")
"""
parts = model_name.split("/")
if len(parts) >= 2:
username = parts[0]
modelname = "_".join(parts[1:])
safe_name = f"{username}_{modelname}"
else:
safe_name = model_name
safe_name = safe_name.replace("/", "_").replace(" ", "_")
return f"{safe_name}.json"
def generate_individual_files_from_leaderboard() -> None:
"""
Generate individual model files from leaderboard.json backup.
Only creates missing files, doesn't overwrite existing ones.
Uploads new files to RESULTS_REPO.
"""
try:
logging.info("Checking for leaderboard.json in RESULTS_REPO")
leaderboard_path = hf_hub_download(
repo_id=RESULTS_REPO,
filename="leaderboard.json",
repo_type="dataset",
token=H4_TOKEN,
)
with open(leaderboard_path, encoding="utf-8") as f:
leaderboard_data = json.load(f)
if not leaderboard_data:
logging.info("leaderboard.json is empty, skipping generation")
return
logging.info(f"Found leaderboard.json with {len(leaderboard_data)} models")
model_data_dir = "./m_data/model_data/"
os.makedirs(model_data_dir, exist_ok=True)
existing_files = set(os.listdir(model_data_dir))
logging.info(f"Existing files in model_data/: {len(existing_files)}")
created_count = 0
skipped_count = 0
error_count = 0
for entry in leaderboard_data:
try:
model_name = entry.get("model_name") or entry.get("model")
if not model_name:
logging.warning(f"Skipping entry without model_name: {entry}")
error_count += 1
continue
safe_filename = create_safe_filename(model_name)
if safe_filename in existing_files:
skipped_count += 1
continue
model_data = {
"model_name": model_name,
"score": float(entry.get("score", 0.0)),
"math_score": float(entry.get("math_score", 0.0)),
"physics_score": float(entry.get("physics_score", 0.0)),
"total_tokens": int(entry.get("total_tokens", 0)),
"evaluation_time": float(entry.get("evaluation_time", 0.0)),
"system_prompt": entry.get("system_prompt", DEFAULT_SYSTEM_PROMPT),
}
local_path = os.path.join(model_data_dir, safe_filename)
with open(local_path, "w", encoding="utf-8") as f:
json.dump(model_data, f, ensure_ascii=False, indent=2)
buf = BytesIO()
buf.write(json.dumps(model_data, ensure_ascii=False).encode("utf-8"))
API.upload_file(
path_or_fileobj=buf.getvalue(),
path_in_repo=f"model_data/{safe_filename}",
repo_id=RESULTS_REPO,
repo_type="dataset",
)
logging.info(f"Created: {safe_filename}")
created_count += 1
except Exception as e:
logging.error(f"Failed to process entry {entry.get('model_name', 'unknown')}: {e}")
error_count += 1
continue
logging.info(
f"Generation complete: {created_count} files created, {skipped_count} skipped, {error_count} errors"
)
except FileNotFoundError:
logging.warning("leaderboard.json not found in RESULTS_REPO, skipping generation")
except Exception as e:
logging.error(f"Failed to generate files from leaderboard.json: {e}")
def download_results() -> None:
"""Download model evaluation results from HuggingFace RESULTS_REPO."""
try:
download_dataset(RESULTS_REPO, RESULTS_PATH)
logging.info("Successfully downloaded model evaluation results")
generate_individual_files_from_leaderboard()
except Exception as e:
logging.error(f"Failed to download model evaluation results: {e}")
def build_leaderboard_df() -> pd.DataFrame:
"""
Build leaderboard dataframe from RESULTS_REPO.
Single source of truth: individual model files in m_data/model_data/
Ensures only one entry per model (with highest score).
Returns:
DataFrame with columns: model, score, math_score, physics_score, total_tokens, evaluation_time, system_prompt
"""
best_model_results: dict[str, dict[str, Any]] = {}
try:
model_data_dir = "./m_data/model_data/"
if os.path.exists(model_data_dir):
for file in os.listdir(model_data_dir):
if file.endswith(".json"):
try:
with open(os.path.join(model_data_dir, file), encoding="utf-8") as f:
data = json.load(f)
model_name = data.get("model_name", data.get("model", ""))
if not model_name:
logging.error(f"Failed to parse {file}: 'model_name' not found")
continue
model_data = {
"model": model_name,
"score": float(data.get("score", 0.0)),
"math_score": float(data.get("math_score", 0.0)),
"physics_score": float(data.get("physics_score", 0.0)),
"total_tokens": int(data.get("total_tokens", 0)),
"evaluation_time": float(data.get("evaluation_time", 0.0)),
"system_prompt": data.get("system_prompt", DEFAULT_SYSTEM_PROMPT),
}
model_base_name = model_name.split("/")[-1].split("_v")[0]
if model_base_name in best_model_results:
if model_data["score"] > best_model_results[model_base_name]["score"]:
best_model_results[model_base_name] = model_data
else:
best_model_results[model_base_name] = model_data
except Exception as e:
logging.error(f"Failed to parse {file}: {str(e)}")
continue
except Exception as e:
logging.error(f"Failed to process external model data: {e}")
results = list(best_model_results.values())
if not results:
results = [
{
"model": "example/model-1",
"score": 0.7,
"math_score": 0.8,
"physics_score": 0.6,
"total_tokens": 1000000,
"evaluation_time": 3600.0,
"system_prompt": DEFAULT_SYSTEM_PROMPT,
},
{
"model": "example/model-2",
"score": 0.6,
"math_score": 0.7,
"physics_score": 0.5,
"total_tokens": 800000,
"evaluation_time": 3000.0,
"system_prompt": DEFAULT_SYSTEM_PROMPT,
},
]
logging.warning("No model data found, using example models")
df = pd.DataFrame(results)
df.sort_values(by="score", ascending=False, inplace=True)
numeric_cols = df.select_dtypes(include=["number"]).columns
if not numeric_cols.empty:
df[numeric_cols] = df[numeric_cols].round(3)
return df