SWE-Issue / msr.py
zhimin-z
fix
9d8b26b
import json
import os
import time
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.errors import HfHubHTTPError
from dotenv import load_dotenv
import duckdb
import backoff
import requests
import requests.exceptions
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import logging
import traceback
import subprocess
import re
# Load environment variables
load_dotenv()
# =============================================================================
# CONFIGURATION
# =============================================================================
# Get script directory for relative paths
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = os.path.dirname(SCRIPT_DIR) # Parent directory
AGENTS_REPO = "SWE-Arena/bot_data"
AGENTS_REPO_LOCAL_PATH = os.path.join(BASE_DIR, "bot_data") # Local git clone path
DUCKDB_CACHE_FILE = os.path.join(SCRIPT_DIR, "cache.duckdb")
GHARCHIVE_DATA_LOCAL_PATH = os.path.join(BASE_DIR, "gharchive/data")
LEADERBOARD_FILENAME = f"{os.getenv('COMPOSE_PROJECT_NAME')}.json"
LEADERBOARD_REPO = "SWE-Arena/leaderboard_data"
LEADERBOARD_TIME_FRAME_DAYS = 180
LONGSTANDING_GAP_DAYS = 30 # Minimum days for an issue to be considered long-standing
# GitHub organizations and repositories to track for wanted issues
TRACKED_ORGS = [
"apache",
"github",
"huggingface",
]
# Labels that indicate "patch wanted" status
PATCH_WANTED_LABELS = [
"bug",
"enhancement",
]
# Git sync configuration (mandatory to get latest bot data)
GIT_SYNC_TIMEOUT = 300 # 5 minutes timeout for git pull
# Streaming batch configuration
BATCH_SIZE_DAYS = 1 # Process 1 day at a time (~24 hourly files)
# Download configuration
DOWNLOAD_WORKERS = 4
DOWNLOAD_RETRY_DELAY = 2
MAX_RETRIES = 5
# Upload configuration
UPLOAD_DELAY_SECONDS = 5
UPLOAD_MAX_BACKOFF = 3600
# Scheduler configuration
SCHEDULE_ENABLED = False
SCHEDULE_DAY_OF_WEEK = 'fri' # Friday
SCHEDULE_HOUR = 0
SCHEDULE_MINUTE = 0
SCHEDULE_TIMEZONE = 'UTC'
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def load_jsonl(filename):
"""Load JSONL file and return list of dictionaries."""
if not os.path.exists(filename):
return []
data = []
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
data.append(json.loads(line))
except json.JSONDecodeError as e:
print(f"Warning: Skipping invalid JSON line: {e}")
return data
def save_jsonl(filename, data):
"""Save list of dictionaries to JSONL file."""
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item) + '\n')
def normalize_date_format(date_string):
"""Convert date strings or datetime objects to standardized ISO 8601 format with Z suffix."""
if not date_string or date_string == 'N/A':
return 'N/A'
try:
if isinstance(date_string, datetime):
return date_string.strftime('%Y-%m-%dT%H:%M:%SZ')
date_string = re.sub(r'\\s+', ' ', date_string.strip())
date_string = date_string.replace(' ', 'T')
if len(date_string) >= 3:
if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]:
date_string = date_string + ':00'
dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
except Exception as e:
print(f"Warning: Could not parse date '{date_string}': {e}")
return date_string
def get_hf_token():
"""Get HuggingFace token from environment variables."""
token = os.getenv('HF_TOKEN')
if not token:
print("Warning: HF_TOKEN not found in environment variables")
return token
# =============================================================================
# GHARCHIVE DOWNLOAD FUNCTIONS
# =============================================================================
def download_file(url):
"""Download a GHArchive file with retry logic."""
filename = url.split("/")[-1]
filepath = os.path.join(GHARCHIVE_DATA_LOCAL_PATH, filename)
if os.path.exists(filepath):
return True
for attempt in range(MAX_RETRIES):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
with open(filepath, "wb") as f:
f.write(response.content)
return True
except requests.exceptions.HTTPError as e:
# 404 means the file doesn't exist in GHArchive - skip without retry
if e.response.status_code == 404:
if attempt == 0: # Only log once, not for each retry
print(f" ⚠ {filename}: Not available (404) - skipping")
return False
# Other HTTP errors (5xx, etc.) should be retried
wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
time.sleep(wait_time)
except Exception as e:
# Network errors, timeouts, etc. should be retried
wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
time.sleep(wait_time)
return False
def download_all_gharchive_data():
"""Download all GHArchive data files for the last LEADERBOARD_TIME_FRAME_DAYS."""
os.makedirs(GHARCHIVE_DATA_LOCAL_PATH, exist_ok=True)
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
urls = []
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
for hour in range(24):
url = f"https://data.gharchive.org/{date_str}-{hour}.json.gz"
urls.append(url)
current_date += timedelta(days=1)
downloads_processed = 0
try:
with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor:
futures = [executor.submit(download_file, url) for url in urls]
for future in as_completed(futures):
downloads_processed += 1
print(f" Download complete: {downloads_processed} files")
return True
except Exception as e:
print(f"Error during download: {str(e)}")
traceback.print_exc()
return False
# =============================================================================
# HUGGINGFACE API WRAPPERS
# =============================================================================
def is_retryable_error(e):
"""Check if exception is retryable (rate limit or timeout error)."""
if isinstance(e, HfHubHTTPError):
if e.response.status_code == 429:
return True
if isinstance(e, (requests.exceptions.Timeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectTimeout)):
return True
if isinstance(e, Exception):
error_str = str(e).lower()
if 'timeout' in error_str or 'timed out' in error_str:
return True
return False
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def list_repo_files_with_backoff(api, **kwargs):
"""Wrapper for api.list_repo_files() with exponential backoff."""
return api.list_repo_files(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def hf_hub_download_with_backoff(**kwargs):
"""Wrapper for hf_hub_download() with exponential backoff."""
return hf_hub_download(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def upload_file_with_backoff(api, **kwargs):
"""Wrapper for api.upload_file() with exponential backoff."""
return api.upload_file(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def upload_folder_with_backoff(api, **kwargs):
"""Wrapper for api.upload_folder() with exponential backoff."""
return api.upload_folder(**kwargs)
def get_duckdb_connection():
"""
Initialize DuckDB connection with OPTIMIZED memory settings.
Uses persistent database and reduced memory footprint.
Automatically removes cache file if lock conflict is detected.
"""
try:
conn = duckdb.connect(DUCKDB_CACHE_FILE)
except Exception as e:
# Check if it's a locking error
error_msg = str(e)
if "lock" in error_msg.lower() or "conflicting" in error_msg.lower():
print(f" ⚠ Lock conflict detected, removing {DUCKDB_CACHE_FILE}...")
if os.path.exists(DUCKDB_CACHE_FILE):
os.remove(DUCKDB_CACHE_FILE)
print(f" ✓ Cache file removed, retrying connection...")
# Retry connection after removing cache
conn = duckdb.connect(DUCKDB_CACHE_FILE)
else:
# Re-raise if it's not a locking error
raise
# CORE MEMORY & THREADING SETTINGS
conn.execute(f"SET threads TO 6;")
conn.execute(f"SET max_memory = '50GB';")
conn.execute("SET temp_directory = '/tmp/duckdb_temp';")
# PERFORMANCE OPTIMIZATIONS
conn.execute("SET preserve_insertion_order = false;") # Disable expensive ordering
conn.execute("SET enable_object_cache = true;") # Cache repeatedly read files
return conn
def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_LOCAL_PATH):
"""Generate file path patterns for GHArchive data in date range (only existing files)."""
file_patterns = []
missing_dates = set()
current_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
while current_date <= end_day:
date_has_files = False
for hour in range(24):
pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}-{hour}.json.gz")
if os.path.exists(pattern):
file_patterns.append(pattern)
date_has_files = True
if not date_has_files:
missing_dates.add(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
if missing_dates:
print(f" ⚠ Skipping {len(missing_dates)} date(s) with no data")
return file_patterns
# =============================================================================
# STREAMING BATCH PROCESSING - UNIFIED QUERY FOR ALL METADATA
# =============================================================================
def fetch_all_metadata_streaming(conn, identifiers, start_date, end_date):
"""
QUERY: Fetch both issue and discussion metadata using streaming batch processing:
- IssuesEvent, IssueCommentEvent (for assistant-assigned issues AND wanted issues)
- PullRequestEvent (for wanted issue tracking)
- DiscussionEvent (for discussion tracking)
Args:
conn: DuckDB connection instance
identifiers: List of GitHub usernames/bot identifiers
start_date: Start datetime (timezone-aware)
end_date: End datetime (timezone-aware)
Returns:
Dictionary with four keys:
- 'agent_issues': {agent_id: [issue_metadata]} for assistant-assigned issues
- 'wanted_open': [open_wanted_issues] for long-standing open issues
- 'wanted_resolved': {agent_id: [resolved_wanted]} for resolved wanted issues
- 'agent_discussions': {agent_id: [discussion_metadata]} for assistant discussions
"""
identifier_set = set(identifiers)
identifier_list = ', '.join([f"'{id}'" for id in identifiers])
tracked_orgs_list = ', '.join([f"'{org}'" for org in TRACKED_ORGS])
# Storage for assistant-assigned issues
agent_issues = defaultdict(list) # agent_id -> [issue_metadata]
agent_issue_urls = defaultdict(set) # agent_id -> set of issue URLs (for deduplication)
# Storage for wanted issues
all_issues = {} # issue_url -> issue_metadata
issue_to_prs = defaultdict(set) # issue_url -> set of PR URLs
pr_creators = {} # pr_url -> creator login
pr_merged_at = {} # pr_url -> merged_at timestamp
# Storage for discussions
discussions_by_agent = defaultdict(list)
# Calculate total batches
total_days = (end_date - start_date).days
total_batches = (total_days // BATCH_SIZE_DAYS) + 1
# Process in batches
current_date = start_date
batch_num = 0
print(f" Streaming {total_batches} batches of {BATCH_SIZE_DAYS}-day intervals...")
while current_date <= end_date:
batch_num += 1
batch_end = min(current_date + timedelta(days=BATCH_SIZE_DAYS - 1), end_date)
# Get file patterns for THIS BATCH ONLY
file_patterns = generate_file_path_patterns(current_date, batch_end)
if not file_patterns:
print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} - NO DATA")
current_date = batch_end + timedelta(days=1)
continue
# Progress indicator
print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} ({len(file_patterns)} files)... ", end="", flush=True)
# Build file patterns SQL for THIS BATCH
file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']'
try:
# UNIFIED QUERY: Optimized for gzip decompression
unified_query = f"""
SELECT
type,
json_extract_string(repo, '$.name') as repo_name,
json_extract_string(repo, '$.url') as repo_url,
-- Issue fields
json_extract_string(payload, '$.issue.html_url') as issue_url,
json_extract_string(payload, '$.issue.title') as issue_title,
json_extract_string(payload, '$.issue.number') as issue_number,
json_extract_string(payload, '$.issue.created_at') as issue_created_at,
json_extract_string(payload, '$.issue.closed_at') as issue_closed_at,
json_extract(payload, '$.issue.labels') as issue_labels,
json_extract_string(payload, '$.issue.pull_request') as is_pull_request,
json_extract_string(payload, '$.issue.state_reason') as issue_state_reason,
-- Actor/assignee fields for assistant assignment
json_extract_string(payload, '$.issue.user.login') as issue_creator,
json_extract_string(payload, '$.issue.assignee.login') as issue_assignee,
json_extract(payload, '$.issue.assignees') as issue_assignees,
json_extract_string(payload, '$.comment.user.login') as commenter,
-- PR fields - simplified with COALESCE
COALESCE(
json_extract_string(payload, '$.issue.html_url'),
json_extract_string(payload, '$.pull_request.html_url')
) as pr_url,
COALESCE(
json_extract_string(payload, '$.issue.user.login'),
json_extract_string(payload, '$.pull_request.user.login')
) as pr_creator,
COALESCE(
json_extract_string(payload, '$.issue.pull_request.merged_at'),
json_extract_string(payload, '$.pull_request.merged_at')
) as pr_merged_at,
COALESCE(
json_extract_string(payload, '$.issue.body'),
json_extract_string(payload, '$.pull_request.body')
) as pr_body,
-- Discussion fields
json_extract_string(payload, '$.discussion.html_url') as discussion_url,
json_extract_string(payload, '$.discussion.user.login') as discussion_creator,
json_extract_string(payload, '$.discussion.created_at') as discussion_created_at,
json_extract_string(payload, '$.discussion.answer_chosen_at') as discussion_closed_at,
json_extract_string(payload, '$.discussion.state_reason') as discussion_state_reason,
json_extract_string(payload, '$.action') as action
FROM read_json(
{file_patterns_sql},
union_by_name=true,
filename=true,
compression='gzip',
format='newline_delimited',
ignore_errors=true
)
WHERE
type IN ('IssuesEvent', 'IssueCommentEvent', 'PullRequestEvent', 'DiscussionEvent')
AND (
-- Assistant-assigned issues: assistant is creator, assignee, or commenter
(type = 'IssuesEvent' AND (
json_extract_string(payload, '$.issue.user.login') IN ({identifier_list})
OR json_extract_string(payload, '$.issue.assignee.login') IN ({identifier_list})
OR SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list})
))
-- Issue comments: assistant is commenter OR tracked org
OR (type = 'IssueCommentEvent' AND (
json_extract_string(payload, '$.comment.user.login') IN ({identifier_list})
OR SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list})
))
-- PRs: assistant is creator OR tracked org (for wanted issue tracking)
OR (type = 'PullRequestEvent' AND (
json_extract_string(payload, '$.pull_request.user.login') IN ({identifier_list})
OR SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list})
))
-- Discussions: assistant is creator AND tracked org
OR (type = 'DiscussionEvent'
AND json_extract_string(payload, '$.discussion.user.login') IN ({identifier_list})
AND SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list})
)
)
"""
all_results = conn.execute(unified_query).fetchall()
# Post-process results to separate into different categories
# Row structure: [type, repo_name, repo_url, issue_url, issue_title, issue_number,
# issue_created_at, issue_closed_at, issue_labels, is_pull_request,
# issue_state_reason, issue_creator, issue_assignee, issue_assignees,
# commenter, pr_url, pr_creator, pr_merged_at, pr_body,
# discussion_url, discussion_creator, discussion_created_at,
# discussion_closed_at, discussion_state_reason, action]
issue_events = [] # For wanted tracking
pr_events = [] # For wanted tracking
discussion_events = [] # For discussion tracking
agent_issue_events = [] # For assistant-assigned issues
for row in all_results:
event_type = row[0]
is_pr = row[9] # is_pull_request field
if event_type in ('IssuesEvent', 'IssueCommentEvent'):
if not is_pr: # It's an issue, not a PR
# Check if this is an assistant-assigned issue
issue_creator = row[11]
issue_assignee = row[12]
issue_assignees_json = row[13]
commenter = row[14]
agent_identifier = None
if event_type == 'IssuesEvent':
# Check if issue creator, assignee, or any assignees match our identifiers
if issue_creator in identifier_set:
agent_identifier = issue_creator
elif issue_assignee in identifier_set:
agent_identifier = issue_assignee
else:
# Check assignees array
try:
if issue_assignees_json:
if isinstance(issue_assignees_json, str):
assignees_data = json.loads(issue_assignees_json)
else:
assignees_data = issue_assignees_json
if isinstance(assignees_data, list):
for assignee_obj in assignees_data:
if isinstance(assignee_obj, dict):
assignee_login = assignee_obj.get('login')
if assignee_login in identifier_set:
agent_identifier = assignee_login
break
except (json.JSONDecodeError, TypeError):
pass
elif event_type == 'IssueCommentEvent':
# Check if commenter is an assistant
if commenter in identifier_set:
agent_identifier = commenter
# Add to appropriate list
if agent_identifier:
agent_issue_events.append((row, agent_identifier))
# Always add to issue_events for wanted tracking (if from tracked orgs)
issue_events.append(row)
else:
# It's a PR
pr_events.append(row)
elif event_type == 'PullRequestEvent':
pr_events.append(row)
elif event_type == 'DiscussionEvent':
discussion_events.append(row)
# Process assistant-assigned issues
for row, agent_identifier in agent_issue_events:
# Row indices: repo_url=2, issue_url=3, issue_created_at=6, issue_closed_at=7, issue_state_reason=10
repo_url = row[2]
issue_url = row[3]
created_at = row[6]
closed_at = row[7]
state_reason = row[10]
if not issue_url or not agent_identifier:
continue
# Build full URL from repo_url if needed
if repo_url and '/issues/' not in issue_url:
issue_number = row[5]
full_url = f"{repo_url.replace('api.github.com/repos/', 'github.com/')}/issues/{issue_number}"
else:
full_url = issue_url
# Only include issues created within timeframe
if created_at:
try:
created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
if created_dt < start_date:
continue
except:
continue
# Deduplicate: only add if we haven't seen this issue for this assistant
if full_url in agent_issue_urls[agent_identifier]:
continue
agent_issue_urls[agent_identifier].add(full_url)
issue_metadata = {
'url': full_url,
'created_at': normalize_date_format(created_at),
'closed_at': normalize_date_format(closed_at) if closed_at else None,
'state_reason': state_reason,
}
agent_issues[agent_identifier].append(issue_metadata)
# Process issues for wanted tracking
for row in issue_events:
# Row indices: repo_name=1, issue_url=3, issue_title=4, issue_number=5,
# issue_created_at=6, issue_closed_at=7, issue_labels=8
repo_name = row[1]
issue_url = row[3]
title = row[4]
issue_number = row[5]
created_at = row[6]
closed_at = row[7]
labels_json = row[8]
if not issue_url or not repo_name:
continue
# Extract org from repo_name
parts = repo_name.split('/')
if len(parts) != 2:
continue
org = parts[0]
# Filter by tracked orgs
if org not in TRACKED_ORGS:
continue
# Parse labels
try:
if isinstance(labels_json, str):
labels_data = json.loads(labels_json)
else:
labels_data = labels_json
if not isinstance(labels_data, list):
label_names = []
else:
label_names = [label.get('name', '').lower() for label in labels_data if isinstance(label, dict)]
except (json.JSONDecodeError, TypeError):
label_names = []
# Determine state
normalized_closed_at = normalize_date_format(closed_at) if closed_at else None
state = 'closed' if (normalized_closed_at and normalized_closed_at != 'N/A') else 'open'
# Store issue metadata
all_issues[issue_url] = {
'url': issue_url,
'repo': repo_name,
'title': title,
'number': issue_number,
'state': state,
'created_at': normalize_date_format(created_at),
'closed_at': normalized_closed_at,
'labels': label_names
}
# Process PRs for wanted tracking
for row in pr_events:
# Row indices: pr_url=15, pr_creator=16, pr_merged_at=17, pr_body=18
pr_url = row[15]
pr_creator = row[16]
merged_at = row[17]
pr_body = row[18]
if not pr_url or not pr_creator:
continue
pr_creators[pr_url] = pr_creator
pr_merged_at[pr_url] = merged_at
# Extract linked issues from PR body
if pr_body:
# Match issue URLs or #number references
issue_refs = re.findall(r'(?:https?://github\.com/[\w-]+/[\w-]+/issues/\d+)|(?:#\d+)', pr_body, re.IGNORECASE)
for ref in issue_refs:
# Convert #number to full URL if needed
if ref.startswith('#'):
# Extract org/repo from PR URL
pr_parts = pr_url.split('/')
if len(pr_parts) >= 5:
org = pr_parts[-4]
repo = pr_parts[-3]
issue_num = ref[1:]
issue_url = f"https://github.com/{org}/{repo}/issues/{issue_num}"
issue_to_prs[issue_url].add(pr_url)
else:
issue_to_prs[ref].add(pr_url)
# Process discussions
for row in discussion_events:
# Row indices: repo_name=1, discussion_url=19, discussion_creator=20,
# discussion_created_at=21, discussion_closed_at=22,
# discussion_state_reason=23, action=24
repo_name = row[1]
discussion_url = row[19]
discussion_creator = row[20]
discussion_created_at = row[21]
discussion_closed_at = row[22]
discussion_state_reason = row[23]
action = row[24]
if not discussion_url or not repo_name:
continue
# Extract org from repo_name
parts = repo_name.split('/')
if len(parts) != 2:
continue
org = parts[0]
# Filter by tracked orgs
if org not in TRACKED_ORGS:
continue
# Parse discussion creation date to filter by time window
created_dt = None
if discussion_created_at:
try:
created_dt = datetime.fromisoformat(discussion_created_at.replace('Z', '+00:00'))
# Only track discussions created on or after start_date
if created_dt < start_date:
continue
except:
continue
# Group by creator (assistant identifier)
# Only track discussions from our assistant identifiers
if discussion_creator not in identifier_set:
continue
# Determine discussion state
# A discussion is "resolved" if it has an answer chosen OR is marked answered
is_resolved = False
if discussion_closed_at:
is_resolved = True
elif discussion_state_reason and 'answered' in discussion_state_reason.lower():
is_resolved = True
# Store discussion metadata
discussion_meta = {
'url': discussion_url,
'repo': repo_name,
'created_at': normalize_date_format(discussion_created_at),
'closed_at': normalize_date_format(discussion_closed_at) if discussion_closed_at else None,
'state': 'resolved' if is_resolved else 'open',
'state_reason': discussion_state_reason
}
# Group by assistant
if discussion_creator not in discussions_by_agent:
discussions_by_agent[discussion_creator] = []
discussions_by_agent[discussion_creator].append(discussion_meta)
print(f"✓ {len(agent_issue_events)} assistant issues, {len(issue_events)} wanted issues, {len(pr_events)} PRs, {len(discussion_events)} discussions")
except Exception as e:
print(f"\n ✗ Batch {batch_num} error: {str(e)}")
traceback.print_exc()
# Move to next batch
current_date = batch_end + timedelta(days=1)
# Post-processing: Filter issues and assign to assistants
print(f"\n Post-processing {len(all_issues)} wanted issues...")
wanted_open = []
wanted_resolved = defaultdict(list)
current_time = datetime.now(timezone.utc)
for issue_url, issue_meta in all_issues.items():
# Check if issue has linked PRs
linked_prs = issue_to_prs.get(issue_url, set())
if not linked_prs:
continue
# Check if any linked PR was merged AND created by an assistant
resolved_by = None
for pr_url in linked_prs:
merged_at = pr_merged_at.get(pr_url)
if merged_at: # PR was merged
pr_creator = pr_creators.get(pr_url)
if pr_creator in identifier_set:
resolved_by = pr_creator
break
if not resolved_by:
continue
# Process based on issue state
if issue_meta['state'] == 'open':
# For open issues: check if labels match PATCH_WANTED_LABELS
issue_labels = issue_meta.get('labels', [])
has_patch_label = False
for issue_label in issue_labels:
for wanted_label in PATCH_WANTED_LABELS:
if wanted_label.lower() in issue_label:
has_patch_label = True
break
if has_patch_label:
break
if not has_patch_label:
continue
# Check if long-standing
created_at_str = issue_meta.get('created_at')
if created_at_str and created_at_str != 'N/A':
try:
created_dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
days_open = (current_time - created_dt).days
if days_open >= LONGSTANDING_GAP_DAYS:
wanted_open.append(issue_meta)
except:
pass
elif issue_meta['state'] == 'closed':
# For closed issues: must be closed within time frame AND open 30+ days
closed_at_str = issue_meta.get('closed_at')
created_at_str = issue_meta.get('created_at')
if closed_at_str and closed_at_str != 'N/A' and created_at_str and created_at_str != 'N/A':
try:
closed_dt = datetime.fromisoformat(closed_at_str.replace('Z', '+00:00'))
created_dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
# Calculate how long the issue was open
days_open = (closed_dt - created_dt).days
# Only include if closed within timeframe AND was open 30+ days
if start_date <= closed_dt <= end_date and days_open >= LONGSTANDING_GAP_DAYS:
wanted_resolved[resolved_by].append(issue_meta)
except:
pass
print(f" ✓ Found {sum(len(issues) for issues in agent_issues.values())} assistant-assigned issues across {len(agent_issues)} assistants")
print(f" ✓ Found {len(wanted_open)} long-standing open wanted issues")
print(f" ✓ Found {sum(len(issues) for issues in wanted_resolved.values())} resolved wanted issues across {len(wanted_resolved)} assistants")
print(f" ✓ Found {sum(len(discussions) for discussions in discussions_by_agent.values())} discussions across {len(discussions_by_agent)} assistants")
return {
'agent_issues': dict(agent_issues),
'wanted_open': wanted_open,
'wanted_resolved': dict(wanted_resolved),
'agent_discussions': dict(discussions_by_agent)
}
def sync_agents_repo():
"""
Sync local bot_data repository with remote using git pull.
This is MANDATORY to ensure we have the latest bot data.
Raises exception if sync fails.
"""
if not os.path.exists(AGENTS_REPO_LOCAL_PATH):
error_msg = f"Local repository not found at {AGENTS_REPO_LOCAL_PATH}"
print(f" ✗ {error_msg}")
print(f" Please clone it first: git clone https://huggingface.co/datasets/{AGENTS_REPO}")
raise FileNotFoundError(error_msg)
if not os.path.exists(os.path.join(AGENTS_REPO_LOCAL_PATH, '.git')):
error_msg = f"{AGENTS_REPO_LOCAL_PATH} exists but is not a git repository"
print(f" ✗ {error_msg}")
raise ValueError(error_msg)
try:
# Run git pull with extended timeout due to large repository
result = subprocess.run(
['git', 'pull'],
cwd=AGENTS_REPO_LOCAL_PATH,
capture_output=True,
text=True,
timeout=GIT_SYNC_TIMEOUT
)
if result.returncode == 0:
output = result.stdout.strip()
if "Already up to date" in output or "Already up-to-date" in output:
print(f" ✓ Repository is up to date")
else:
print(f" ✓ Repository synced successfully")
if output:
# Print first few lines of output
lines = output.split('\n')[:5]
for line in lines:
print(f" {line}")
return True
else:
error_msg = f"Git pull failed: {result.stderr.strip()}"
print(f" ✗ {error_msg}")
raise RuntimeError(error_msg)
except subprocess.TimeoutExpired:
error_msg = f"Git pull timed out after {GIT_SYNC_TIMEOUT} seconds"
print(f" ✗ {error_msg}")
raise TimeoutError(error_msg)
except (FileNotFoundError, ValueError, RuntimeError, TimeoutError):
raise # Re-raise expected exceptions
except Exception as e:
error_msg = f"Error syncing repository: {str(e)}"
print(f" ✗ {error_msg}")
raise RuntimeError(error_msg) from e
def load_agents_from_hf():
"""
Load all assistant metadata JSON files from local git repository.
ALWAYS syncs with remote first to ensure we have the latest bot data.
"""
# MANDATORY: Sync with remote first to get latest bot data
print(f" Syncing bot_data repository to get latest assistants...")
sync_agents_repo() # Will raise exception if sync fails
assistants = []
# Scan local directory for JSON files
if not os.path.exists(AGENTS_REPO_LOCAL_PATH):
raise FileNotFoundError(f"Local repository not found at {AGENTS_REPO_LOCAL_PATH}")
# Walk through the directory to find all JSON files
files_processed = 0
print(f" Loading assistant metadata from {AGENTS_REPO_LOCAL_PATH}...")
for root, dirs, files in os.walk(AGENTS_REPO_LOCAL_PATH):
# Skip .git directory
if '.git' in root:
continue
for filename in files:
if not filename.endswith('.json'):
continue
files_processed += 1
file_path = os.path.join(root, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
agent_data = json.load(f)
# Only include active assistants
if agent_data.get('status') != 'active':
continue
# Extract github_identifier from filename
github_identifier = filename.replace('.json', '')
agent_data['github_identifier'] = github_identifier
assistants.append(agent_data)
except Exception as e:
print(f" ⚠ Error loading {filename}: {str(e)}")
continue
print(f" ✓ Loaded {len(assistants)} active assistants (from {files_processed} total files)")
return assistants
def calculate_issue_stats_from_metadata(metadata_list):
"""Calculate statistics from a list of issue metadata."""
total_issues = len(metadata_list)
closed = sum(1 for issue_meta in metadata_list if issue_meta.get('closed_at'))
resolved = sum(1 for issue_meta in metadata_list
if issue_meta.get('state_reason') == 'completed')
# Resolved rate = resolved / closed (not resolved / total)
resolved_rate = (resolved / closed * 100) if closed > 0 else 0
return {
'total_issues': total_issues,
'closed_issues': closed,
'resolved_issues': resolved,
'resolved_rate': round(resolved_rate, 2),
}
def calculate_monthly_metrics_by_agent(all_metadata_dict, assistants):
"""Calculate monthly metrics for all assistants for visualization."""
identifier_to_name = {assistant.get('github_identifier'): assistant.get('name') for assistant in assistants if assistant.get('github_identifier')}
if not all_metadata_dict:
return {'assistants': [], 'months': [], 'data': {}}
agent_month_data = defaultdict(lambda: defaultdict(list))
for agent_identifier, metadata_list in all_metadata_dict.items():
for issue_meta in metadata_list:
created_at = issue_meta.get('created_at')
if not created_at:
continue
agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
try:
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
month_key = f"{dt.year}-{dt.month:02d}"
agent_month_data[agent_name][month_key].append(issue_meta)
except Exception as e:
print(f"Warning: Could not parse date '{created_at}': {e}")
continue
all_months = set()
for agent_data in agent_month_data.values():
all_months.update(agent_data.keys())
months = sorted(list(all_months))
result_data = {}
for agent_name, month_dict in agent_month_data.items():
resolved_rates = []
total_issues_list = []
resolved_issues_list = []
closed_issues_list = []
for month in months:
issues_in_month = month_dict.get(month, [])
resolved_count = sum(1 for issue in issues_in_month if issue.get('state_reason') == 'completed')
closed_count = sum(1 for issue in issues_in_month if issue.get('closed_at'))
total_count = len(issues_in_month)
# Resolved rate = resolved / closed (not resolved / total)
resolved_rate = (resolved_count / closed_count * 100) if closed_count > 0 else None
resolved_rates.append(resolved_rate)
total_issues_list.append(total_count)
resolved_issues_list.append(resolved_count)
closed_issues_list.append(closed_count)
result_data[agent_name] = {
'resolved_rates': resolved_rates,
'total_issues': total_issues_list,
'resolved_issues': resolved_issues_list,
'closed_issues': closed_issues_list
}
agents_list = sorted(list(agent_month_data.keys()))
return {
'assistants': agents_list,
'months': months,
'data': result_data
}
def calculate_discussion_stats_from_metadata(metadata_list):
"""Calculate statistics from a list of discussion metadata."""
total_discussions = len(metadata_list)
resolved = sum(1 for discussion_meta in metadata_list if discussion_meta.get('state') == 'resolved')
# Resolved rate = resolved / total * 100
resolved_rate = (resolved / total_discussions * 100) if total_discussions > 0 else 0
return {
'total_discussions': total_discussions,
'resolved_discussions': resolved,
'discussion_resolved_rate': round(resolved_rate, 2),
}
def calculate_monthly_metrics_by_agent_discussions(all_discussions_dict, assistants):
"""Calculate monthly metrics for discussions for all assistants for visualization."""
identifier_to_name = {assistant.get('github_identifier'): assistant.get('name') for assistant in assistants if assistant.get('github_identifier')}
if not all_discussions_dict:
return {'assistants': [], 'months': [], 'data': {}}
agent_month_data = defaultdict(lambda: defaultdict(list))
for agent_identifier, metadata_list in all_discussions_dict.items():
for discussion_meta in metadata_list:
created_at = discussion_meta.get('created_at')
if not created_at:
continue
agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
try:
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
month_key = f"{dt.year}-{dt.month:02d}"
agent_month_data[agent_name][month_key].append(discussion_meta)
except Exception as e:
print(f"Warning: Could not parse discussion date '{created_at}': {e}")
continue
all_months = set()
for agent_data in agent_month_data.values():
all_months.update(agent_data.keys())
months = sorted(list(all_months))
result_data = {}
for agent_name, month_dict in agent_month_data.items():
resolved_rates = []
total_discussions_list = []
resolved_discussions_list = []
for month in months:
discussions_in_month = month_dict.get(month, [])
resolved_count = sum(1 for discussion in discussions_in_month if discussion.get('state') == 'resolved')
total_count = len(discussions_in_month)
# Resolved rate = resolved / total * 100
resolved_rate = (resolved_count / total_count * 100) if total_count > 0 else None
resolved_rates.append(resolved_rate)
total_discussions_list.append(total_count)
resolved_discussions_list.append(resolved_count)
result_data[agent_name] = {
'resolved_rates': resolved_rates,
'total_discussions': total_discussions_list,
'resolved_discussions': resolved_discussions_list
}
agents_list = sorted(list(agent_month_data.keys()))
return {
'assistants': agents_list,
'months': months,
'data': result_data
}
def construct_leaderboard_from_metadata(all_metadata_dict, assistants, wanted_resolved_dict=None, discussions_dict=None):
"""Construct leaderboard from in-memory issue metadata and discussion metadata.
Args:
all_metadata_dict: Dictionary mapping assistant ID to list of issue metadata (assistant-assigned issues)
assistants: List of assistant metadata
wanted_resolved_dict: Optional dictionary mapping assistant ID to list of resolved wanted issues
discussions_dict: Optional dictionary mapping assistant ID to list of discussion metadata
"""
if not assistants:
print("Error: No assistants found")
return {}
if wanted_resolved_dict is None:
wanted_resolved_dict = {}
if discussions_dict is None:
discussions_dict = {}
cache_dict = {}
for assistant in assistants:
identifier = assistant.get('github_identifier')
agent_name = assistant.get('name', 'Unknown')
bot_data = all_metadata_dict.get(identifier, [])
stats = calculate_issue_stats_from_metadata(bot_data)
# Add wanted issues count
resolved_wanted = len(wanted_resolved_dict.get(identifier, []))
# Add discussion stats
discussion_metadata = discussions_dict.get(identifier, [])
discussion_stats = calculate_discussion_stats_from_metadata(discussion_metadata)
cache_dict[identifier] = {
'name': agent_name,
'website': assistant.get('website', 'N/A'),
'github_identifier': identifier,
**stats,
'resolved_wanted_issues': resolved_wanted,
**discussion_stats
}
return cache_dict
def save_leaderboard_data_to_hf(leaderboard_dict, issue_monthly_metrics, wanted_issues=None, discussion_monthly_metrics=None):
"""Save leaderboard data, monthly metrics, wanted issues, and discussion metrics to HuggingFace dataset."""
try:
token = get_hf_token()
if not token:
raise Exception("No HuggingFace token found")
api = HfApi(token=token)
if wanted_issues is None:
wanted_issues = []
combined_data = {
'metadata': {
'last_updated': datetime.now(timezone.utc).isoformat(),
'leaderboard_time_frame_days': LEADERBOARD_TIME_FRAME_DAYS,
'longstanding_gap_days': LONGSTANDING_GAP_DAYS,
'tracked_orgs': TRACKED_ORGS,
'patch_wanted_labels': PATCH_WANTED_LABELS
},
'leaderboard': leaderboard_dict,
'issue_monthly_metrics': issue_monthly_metrics,
'wanted_issues': wanted_issues,
'discussion_monthly_metrics': discussion_monthly_metrics
}
with open(LEADERBOARD_FILENAME, 'w') as f:
json.dump(combined_data, f, indent=2)
try:
upload_file_with_backoff(
api=api,
path_or_fileobj=LEADERBOARD_FILENAME,
path_in_repo=LEADERBOARD_FILENAME,
repo_id=LEADERBOARD_REPO,
repo_type="dataset"
)
return True
finally:
if os.path.exists(LEADERBOARD_FILENAME):
os.remove(LEADERBOARD_FILENAME)
except Exception as e:
print(f"Error saving leaderboard data: {str(e)}")
traceback.print_exc()
return False
# =============================================================================
# MINING FUNCTION
# =============================================================================
def mine_all_agents():
"""
Mine issue metadata for all assistants using STREAMING batch processing.
Downloads GHArchive data, then uses BATCH-based DuckDB queries.
"""
print(f"\n[1/4] Downloading GHArchive data...")
if not download_all_gharchive_data():
print("Warning: Download had errors, continuing with available data...")
print(f"\n[2/4] Loading assistant metadata...")
assistants = load_agents_from_hf()
if not assistants:
print("Error: No assistants found")
return
identifiers = [assistant['github_identifier'] for assistant in assistants if assistant.get('github_identifier')]
if not identifiers:
print("Error: No valid assistant identifiers found")
return
print(f"\n[3/4] Mining issue metadata ({len(identifiers)} assistants, {LEADERBOARD_TIME_FRAME_DAYS} days)...")
try:
conn = get_duckdb_connection()
except Exception as e:
print(f"Failed to initialize DuckDB connection: {str(e)}")
return
current_time = datetime.now(timezone.utc)
end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
try:
# USE UNIFIED STREAMING FUNCTION FOR ISSUES, WANTED, AND DISCUSSIONS
results = fetch_all_metadata_streaming(
conn, identifiers, start_date, end_date
)
agent_issues = results['agent_issues']
wanted_open = results['wanted_open']
wanted_resolved = results['wanted_resolved']
agent_discussions = results['agent_discussions']
except Exception as e:
print(f"Error during DuckDB fetch: {str(e)}")
traceback.print_exc()
return
finally:
conn.close()
print(f"\n[4/4] Saving leaderboard...")
try:
leaderboard_dict = construct_leaderboard_from_metadata(
agent_issues, assistants, wanted_resolved, agent_discussions
)
issue_monthly_metrics = calculate_monthly_metrics_by_agent(agent_issues, assistants)
discussion_monthly_metrics = calculate_monthly_metrics_by_agent_discussions(
agent_discussions, assistants
)
save_leaderboard_data_to_hf(
leaderboard_dict, issue_monthly_metrics, wanted_open, discussion_monthly_metrics
)
except Exception as e:
print(f"Error saving leaderboard: {str(e)}")
traceback.print_exc()
finally:
# Clean up DuckDB cache file to save storage
if os.path.exists(DUCKDB_CACHE_FILE):
try:
os.remove(DUCKDB_CACHE_FILE)
print(f" ✓ Cache file removed: {DUCKDB_CACHE_FILE}")
except Exception as e:
print(f" ⚠ Failed to remove cache file: {str(e)}")
# =============================================================================
# SCHEDULER SETUP
# =============================================================================
def setup_scheduler():
"""Set up APScheduler to run mining jobs periodically."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logging.getLogger('httpx').setLevel(logging.WARNING)
scheduler = BlockingScheduler(timezone=SCHEDULE_TIMEZONE)
trigger = CronTrigger(
day_of_week=SCHEDULE_DAY_OF_WEEK,
hour=SCHEDULE_HOUR,
minute=SCHEDULE_MINUTE,
timezone=SCHEDULE_TIMEZONE
)
scheduler.add_job(
mine_all_agents,
trigger=trigger,
id='mine_all_agents',
name='Mine GHArchive data for all assistants',
replace_existing=True
)
next_run = trigger.get_next_fire_time(None, datetime.now(trigger.timezone))
print(f"Scheduler: Weekly on {SCHEDULE_DAY_OF_WEEK} at {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d} {SCHEDULE_TIMEZONE}")
print(f"Next run: {next_run}\n")
print(f"\nScheduler started")
scheduler.start()
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
if SCHEDULE_ENABLED:
setup_scheduler()
else:
mine_all_agents()