import json import os import time from datetime import datetime, timezone, timedelta from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.errors import HfHubHTTPError from dotenv import load_dotenv import duckdb import backoff import requests import requests.exceptions from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger import logging import traceback import subprocess import re # Load environment variables load_dotenv() # ============================================================================= # CONFIGURATION # ============================================================================= # Get script directory for relative paths SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(SCRIPT_DIR) # Parent directory AGENTS_REPO = "SWE-Arena/bot_data" AGENTS_REPO_LOCAL_PATH = os.path.join(BASE_DIR, "bot_data") # Local git clone path DUCKDB_CACHE_FILE = os.path.join(SCRIPT_DIR, "cache.duckdb") GHARCHIVE_DATA_LOCAL_PATH = os.path.join(BASE_DIR, "gharchive/data") LEADERBOARD_FILENAME = f"{os.getenv('COMPOSE_PROJECT_NAME')}.json" LEADERBOARD_REPO = "SWE-Arena/leaderboard_data" LEADERBOARD_TIME_FRAME_DAYS = 180 LONGSTANDING_GAP_DAYS = 30 # Minimum days for an issue to be considered long-standing # GitHub organizations and repositories to track for wanted issues TRACKED_ORGS = [ "apache", "github", "huggingface", ] # Labels that indicate "patch wanted" status PATCH_WANTED_LABELS = [ "bug", "enhancement", ] # Git sync configuration (mandatory to get latest bot data) GIT_SYNC_TIMEOUT = 300 # 5 minutes timeout for git pull # Streaming batch configuration BATCH_SIZE_DAYS = 1 # Process 1 day at a time (~24 hourly files) # Download configuration DOWNLOAD_WORKERS = 4 DOWNLOAD_RETRY_DELAY = 2 MAX_RETRIES = 5 # Upload configuration UPLOAD_DELAY_SECONDS = 5 UPLOAD_MAX_BACKOFF = 3600 # Scheduler configuration SCHEDULE_ENABLED = False SCHEDULE_DAY_OF_WEEK = 'fri' # Friday SCHEDULE_HOUR = 0 SCHEDULE_MINUTE = 0 SCHEDULE_TIMEZONE = 'UTC' # ============================================================================= # UTILITY FUNCTIONS # ============================================================================= def load_jsonl(filename): """Load JSONL file and return list of dictionaries.""" if not os.path.exists(filename): return [] data = [] with open(filename, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: try: data.append(json.loads(line)) except json.JSONDecodeError as e: print(f"Warning: Skipping invalid JSON line: {e}") return data def save_jsonl(filename, data): """Save list of dictionaries to JSONL file.""" with open(filename, 'w', encoding='utf-8') as f: for item in data: f.write(json.dumps(item) + '\n') def normalize_date_format(date_string): """Convert date strings or datetime objects to standardized ISO 8601 format with Z suffix.""" if not date_string or date_string == 'N/A': return 'N/A' try: if isinstance(date_string, datetime): return date_string.strftime('%Y-%m-%dT%H:%M:%SZ') date_string = re.sub(r'\\s+', ' ', date_string.strip()) date_string = date_string.replace(' ', 'T') if len(date_string) >= 3: if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]: date_string = date_string + ':00' dt = datetime.fromisoformat(date_string.replace('Z', '+00:00')) return dt.strftime('%Y-%m-%dT%H:%M:%SZ') except Exception as e: print(f"Warning: Could not parse date '{date_string}': {e}") return date_string def get_hf_token(): """Get HuggingFace token from environment variables.""" token = os.getenv('HF_TOKEN') if not token: print("Warning: HF_TOKEN not found in environment variables") return token # ============================================================================= # GHARCHIVE DOWNLOAD FUNCTIONS # ============================================================================= def download_file(url): """Download a GHArchive file with retry logic.""" filename = url.split("/")[-1] filepath = os.path.join(GHARCHIVE_DATA_LOCAL_PATH, filename) if os.path.exists(filepath): return True for attempt in range(MAX_RETRIES): try: response = requests.get(url, timeout=30) response.raise_for_status() with open(filepath, "wb") as f: f.write(response.content) return True except requests.exceptions.HTTPError as e: # 404 means the file doesn't exist in GHArchive - skip without retry if e.response.status_code == 404: if attempt == 0: # Only log once, not for each retry print(f" ⚠ {filename}: Not available (404) - skipping") return False # Other HTTP errors (5xx, etc.) should be retried wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt) print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})") time.sleep(wait_time) except Exception as e: # Network errors, timeouts, etc. should be retried wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt) print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})") time.sleep(wait_time) return False def download_all_gharchive_data(): """Download all GHArchive data files for the last LEADERBOARD_TIME_FRAME_DAYS.""" os.makedirs(GHARCHIVE_DATA_LOCAL_PATH, exist_ok=True) end_date = datetime.now(timezone.utc) start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS) urls = [] current_date = start_date while current_date <= end_date: date_str = current_date.strftime("%Y-%m-%d") for hour in range(24): url = f"https://data.gharchive.org/{date_str}-{hour}.json.gz" urls.append(url) current_date += timedelta(days=1) downloads_processed = 0 try: with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor: futures = [executor.submit(download_file, url) for url in urls] for future in as_completed(futures): downloads_processed += 1 print(f" Download complete: {downloads_processed} files") return True except Exception as e: print(f"Error during download: {str(e)}") traceback.print_exc() return False # ============================================================================= # HUGGINGFACE API WRAPPERS # ============================================================================= def is_retryable_error(e): """Check if exception is retryable (rate limit or timeout error).""" if isinstance(e, HfHubHTTPError): if e.response.status_code == 429: return True if isinstance(e, (requests.exceptions.Timeout, requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout)): return True if isinstance(e, Exception): error_str = str(e).lower() if 'timeout' in error_str or 'timed out' in error_str: return True return False @backoff.on_exception( backoff.expo, (HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception), max_tries=MAX_RETRIES, base=300, max_value=3600, giveup=lambda e: not is_retryable_error(e), on_backoff=lambda details: print( f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..." ) ) def list_repo_files_with_backoff(api, **kwargs): """Wrapper for api.list_repo_files() with exponential backoff.""" return api.list_repo_files(**kwargs) @backoff.on_exception( backoff.expo, (HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception), max_tries=MAX_RETRIES, base=300, max_value=3600, giveup=lambda e: not is_retryable_error(e), on_backoff=lambda details: print( f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..." ) ) def hf_hub_download_with_backoff(**kwargs): """Wrapper for hf_hub_download() with exponential backoff.""" return hf_hub_download(**kwargs) @backoff.on_exception( backoff.expo, (HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception), max_tries=MAX_RETRIES, base=300, max_value=3600, giveup=lambda e: not is_retryable_error(e), on_backoff=lambda details: print( f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..." ) ) def upload_file_with_backoff(api, **kwargs): """Wrapper for api.upload_file() with exponential backoff.""" return api.upload_file(**kwargs) @backoff.on_exception( backoff.expo, (HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception), max_tries=MAX_RETRIES, base=300, max_value=3600, giveup=lambda e: not is_retryable_error(e), on_backoff=lambda details: print( f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..." ) ) def upload_folder_with_backoff(api, **kwargs): """Wrapper for api.upload_folder() with exponential backoff.""" return api.upload_folder(**kwargs) def get_duckdb_connection(): """ Initialize DuckDB connection with OPTIMIZED memory settings. Uses persistent database and reduced memory footprint. Automatically removes cache file if lock conflict is detected. """ try: conn = duckdb.connect(DUCKDB_CACHE_FILE) except Exception as e: # Check if it's a locking error error_msg = str(e) if "lock" in error_msg.lower() or "conflicting" in error_msg.lower(): print(f" ⚠ Lock conflict detected, removing {DUCKDB_CACHE_FILE}...") if os.path.exists(DUCKDB_CACHE_FILE): os.remove(DUCKDB_CACHE_FILE) print(f" ✓ Cache file removed, retrying connection...") # Retry connection after removing cache conn = duckdb.connect(DUCKDB_CACHE_FILE) else: # Re-raise if it's not a locking error raise # CORE MEMORY & THREADING SETTINGS conn.execute(f"SET threads TO 6;") conn.execute(f"SET max_memory = '50GB';") conn.execute("SET temp_directory = '/tmp/duckdb_temp';") # PERFORMANCE OPTIMIZATIONS conn.execute("SET preserve_insertion_order = false;") # Disable expensive ordering conn.execute("SET enable_object_cache = true;") # Cache repeatedly read files return conn def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_LOCAL_PATH): """Generate file path patterns for GHArchive data in date range (only existing files).""" file_patterns = [] missing_dates = set() current_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0) end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0) while current_date <= end_day: date_has_files = False for hour in range(24): pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}-{hour}.json.gz") if os.path.exists(pattern): file_patterns.append(pattern) date_has_files = True if not date_has_files: missing_dates.add(current_date.strftime('%Y-%m-%d')) current_date += timedelta(days=1) if missing_dates: print(f" ⚠ Skipping {len(missing_dates)} date(s) with no data") return file_patterns # ============================================================================= # STREAMING BATCH PROCESSING - UNIFIED QUERY FOR ALL METADATA # ============================================================================= def fetch_all_metadata_streaming(conn, identifiers, start_date, end_date): """ QUERY: Fetch both issue and discussion metadata using streaming batch processing: - IssuesEvent, IssueCommentEvent (for assistant-assigned issues AND wanted issues) - PullRequestEvent (for wanted issue tracking) - DiscussionEvent (for discussion tracking) Args: conn: DuckDB connection instance identifiers: List of GitHub usernames/bot identifiers start_date: Start datetime (timezone-aware) end_date: End datetime (timezone-aware) Returns: Dictionary with four keys: - 'agent_issues': {agent_id: [issue_metadata]} for assistant-assigned issues - 'wanted_open': [open_wanted_issues] for long-standing open issues - 'wanted_resolved': {agent_id: [resolved_wanted]} for resolved wanted issues - 'agent_discussions': {agent_id: [discussion_metadata]} for assistant discussions """ identifier_set = set(identifiers) identifier_list = ', '.join([f"'{id}'" for id in identifiers]) tracked_orgs_list = ', '.join([f"'{org}'" for org in TRACKED_ORGS]) # Storage for assistant-assigned issues agent_issues = defaultdict(list) # agent_id -> [issue_metadata] agent_issue_urls = defaultdict(set) # agent_id -> set of issue URLs (for deduplication) # Storage for wanted issues all_issues = {} # issue_url -> issue_metadata issue_to_prs = defaultdict(set) # issue_url -> set of PR URLs pr_creators = {} # pr_url -> creator login pr_merged_at = {} # pr_url -> merged_at timestamp # Storage for discussions discussions_by_agent = defaultdict(list) # Calculate total batches total_days = (end_date - start_date).days total_batches = (total_days // BATCH_SIZE_DAYS) + 1 # Process in batches current_date = start_date batch_num = 0 print(f" Streaming {total_batches} batches of {BATCH_SIZE_DAYS}-day intervals...") while current_date <= end_date: batch_num += 1 batch_end = min(current_date + timedelta(days=BATCH_SIZE_DAYS - 1), end_date) # Get file patterns for THIS BATCH ONLY file_patterns = generate_file_path_patterns(current_date, batch_end) if not file_patterns: print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} - NO DATA") current_date = batch_end + timedelta(days=1) continue # Progress indicator print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} ({len(file_patterns)} files)... ", end="", flush=True) # Build file patterns SQL for THIS BATCH file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']' try: # UNIFIED QUERY: Optimized for gzip decompression unified_query = f""" SELECT type, json_extract_string(repo, '$.name') as repo_name, json_extract_string(repo, '$.url') as repo_url, -- Issue fields json_extract_string(payload, '$.issue.html_url') as issue_url, json_extract_string(payload, '$.issue.title') as issue_title, json_extract_string(payload, '$.issue.number') as issue_number, json_extract_string(payload, '$.issue.created_at') as issue_created_at, json_extract_string(payload, '$.issue.closed_at') as issue_closed_at, json_extract(payload, '$.issue.labels') as issue_labels, json_extract_string(payload, '$.issue.pull_request') as is_pull_request, json_extract_string(payload, '$.issue.state_reason') as issue_state_reason, -- Actor/assignee fields for assistant assignment json_extract_string(payload, '$.issue.user.login') as issue_creator, json_extract_string(payload, '$.issue.assignee.login') as issue_assignee, json_extract(payload, '$.issue.assignees') as issue_assignees, json_extract_string(payload, '$.comment.user.login') as commenter, -- PR fields - simplified with COALESCE COALESCE( json_extract_string(payload, '$.issue.html_url'), json_extract_string(payload, '$.pull_request.html_url') ) as pr_url, COALESCE( json_extract_string(payload, '$.issue.user.login'), json_extract_string(payload, '$.pull_request.user.login') ) as pr_creator, COALESCE( json_extract_string(payload, '$.issue.pull_request.merged_at'), json_extract_string(payload, '$.pull_request.merged_at') ) as pr_merged_at, COALESCE( json_extract_string(payload, '$.issue.body'), json_extract_string(payload, '$.pull_request.body') ) as pr_body, -- Discussion fields json_extract_string(payload, '$.discussion.html_url') as discussion_url, json_extract_string(payload, '$.discussion.user.login') as discussion_creator, json_extract_string(payload, '$.discussion.created_at') as discussion_created_at, json_extract_string(payload, '$.discussion.answer_chosen_at') as discussion_closed_at, json_extract_string(payload, '$.discussion.state_reason') as discussion_state_reason, json_extract_string(payload, '$.action') as action FROM read_json( {file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true ) WHERE type IN ('IssuesEvent', 'IssueCommentEvent', 'PullRequestEvent', 'DiscussionEvent') AND ( -- Assistant-assigned issues: assistant is creator, assignee, or commenter (type = 'IssuesEvent' AND ( json_extract_string(payload, '$.issue.user.login') IN ({identifier_list}) OR json_extract_string(payload, '$.issue.assignee.login') IN ({identifier_list}) OR SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list}) )) -- Issue comments: assistant is commenter OR tracked org OR (type = 'IssueCommentEvent' AND ( json_extract_string(payload, '$.comment.user.login') IN ({identifier_list}) OR SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list}) )) -- PRs: assistant is creator OR tracked org (for wanted issue tracking) OR (type = 'PullRequestEvent' AND ( json_extract_string(payload, '$.pull_request.user.login') IN ({identifier_list}) OR SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list}) )) -- Discussions: assistant is creator AND tracked org OR (type = 'DiscussionEvent' AND json_extract_string(payload, '$.discussion.user.login') IN ({identifier_list}) AND SPLIT_PART(json_extract_string(repo, '$.name'), '/', 1) IN ({tracked_orgs_list}) ) ) """ all_results = conn.execute(unified_query).fetchall() # Post-process results to separate into different categories # Row structure: [type, repo_name, repo_url, issue_url, issue_title, issue_number, # issue_created_at, issue_closed_at, issue_labels, is_pull_request, # issue_state_reason, issue_creator, issue_assignee, issue_assignees, # commenter, pr_url, pr_creator, pr_merged_at, pr_body, # discussion_url, discussion_creator, discussion_created_at, # discussion_closed_at, discussion_state_reason, action] issue_events = [] # For wanted tracking pr_events = [] # For wanted tracking discussion_events = [] # For discussion tracking agent_issue_events = [] # For assistant-assigned issues for row in all_results: event_type = row[0] is_pr = row[9] # is_pull_request field if event_type in ('IssuesEvent', 'IssueCommentEvent'): if not is_pr: # It's an issue, not a PR # Check if this is an assistant-assigned issue issue_creator = row[11] issue_assignee = row[12] issue_assignees_json = row[13] commenter = row[14] agent_identifier = None if event_type == 'IssuesEvent': # Check if issue creator, assignee, or any assignees match our identifiers if issue_creator in identifier_set: agent_identifier = issue_creator elif issue_assignee in identifier_set: agent_identifier = issue_assignee else: # Check assignees array try: if issue_assignees_json: if isinstance(issue_assignees_json, str): assignees_data = json.loads(issue_assignees_json) else: assignees_data = issue_assignees_json if isinstance(assignees_data, list): for assignee_obj in assignees_data: if isinstance(assignee_obj, dict): assignee_login = assignee_obj.get('login') if assignee_login in identifier_set: agent_identifier = assignee_login break except (json.JSONDecodeError, TypeError): pass elif event_type == 'IssueCommentEvent': # Check if commenter is an assistant if commenter in identifier_set: agent_identifier = commenter # Add to appropriate list if agent_identifier: agent_issue_events.append((row, agent_identifier)) # Always add to issue_events for wanted tracking (if from tracked orgs) issue_events.append(row) else: # It's a PR pr_events.append(row) elif event_type == 'PullRequestEvent': pr_events.append(row) elif event_type == 'DiscussionEvent': discussion_events.append(row) # Process assistant-assigned issues for row, agent_identifier in agent_issue_events: # Row indices: repo_url=2, issue_url=3, issue_created_at=6, issue_closed_at=7, issue_state_reason=10 repo_url = row[2] issue_url = row[3] created_at = row[6] closed_at = row[7] state_reason = row[10] if not issue_url or not agent_identifier: continue # Build full URL from repo_url if needed if repo_url and '/issues/' not in issue_url: issue_number = row[5] full_url = f"{repo_url.replace('api.github.com/repos/', 'github.com/')}/issues/{issue_number}" else: full_url = issue_url # Only include issues created within timeframe if created_at: try: created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) if created_dt < start_date: continue except: continue # Deduplicate: only add if we haven't seen this issue for this assistant if full_url in agent_issue_urls[agent_identifier]: continue agent_issue_urls[agent_identifier].add(full_url) issue_metadata = { 'url': full_url, 'created_at': normalize_date_format(created_at), 'closed_at': normalize_date_format(closed_at) if closed_at else None, 'state_reason': state_reason, } agent_issues[agent_identifier].append(issue_metadata) # Process issues for wanted tracking for row in issue_events: # Row indices: repo_name=1, issue_url=3, issue_title=4, issue_number=5, # issue_created_at=6, issue_closed_at=7, issue_labels=8 repo_name = row[1] issue_url = row[3] title = row[4] issue_number = row[5] created_at = row[6] closed_at = row[7] labels_json = row[8] if not issue_url or not repo_name: continue # Extract org from repo_name parts = repo_name.split('/') if len(parts) != 2: continue org = parts[0] # Filter by tracked orgs if org not in TRACKED_ORGS: continue # Parse labels try: if isinstance(labels_json, str): labels_data = json.loads(labels_json) else: labels_data = labels_json if not isinstance(labels_data, list): label_names = [] else: label_names = [label.get('name', '').lower() for label in labels_data if isinstance(label, dict)] except (json.JSONDecodeError, TypeError): label_names = [] # Determine state normalized_closed_at = normalize_date_format(closed_at) if closed_at else None state = 'closed' if (normalized_closed_at and normalized_closed_at != 'N/A') else 'open' # Store issue metadata all_issues[issue_url] = { 'url': issue_url, 'repo': repo_name, 'title': title, 'number': issue_number, 'state': state, 'created_at': normalize_date_format(created_at), 'closed_at': normalized_closed_at, 'labels': label_names } # Process PRs for wanted tracking for row in pr_events: # Row indices: pr_url=15, pr_creator=16, pr_merged_at=17, pr_body=18 pr_url = row[15] pr_creator = row[16] merged_at = row[17] pr_body = row[18] if not pr_url or not pr_creator: continue pr_creators[pr_url] = pr_creator pr_merged_at[pr_url] = merged_at # Extract linked issues from PR body if pr_body: # Match issue URLs or #number references issue_refs = re.findall(r'(?:https?://github\.com/[\w-]+/[\w-]+/issues/\d+)|(?:#\d+)', pr_body, re.IGNORECASE) for ref in issue_refs: # Convert #number to full URL if needed if ref.startswith('#'): # Extract org/repo from PR URL pr_parts = pr_url.split('/') if len(pr_parts) >= 5: org = pr_parts[-4] repo = pr_parts[-3] issue_num = ref[1:] issue_url = f"https://github.com/{org}/{repo}/issues/{issue_num}" issue_to_prs[issue_url].add(pr_url) else: issue_to_prs[ref].add(pr_url) # Process discussions for row in discussion_events: # Row indices: repo_name=1, discussion_url=19, discussion_creator=20, # discussion_created_at=21, discussion_closed_at=22, # discussion_state_reason=23, action=24 repo_name = row[1] discussion_url = row[19] discussion_creator = row[20] discussion_created_at = row[21] discussion_closed_at = row[22] discussion_state_reason = row[23] action = row[24] if not discussion_url or not repo_name: continue # Extract org from repo_name parts = repo_name.split('/') if len(parts) != 2: continue org = parts[0] # Filter by tracked orgs if org not in TRACKED_ORGS: continue # Parse discussion creation date to filter by time window created_dt = None if discussion_created_at: try: created_dt = datetime.fromisoformat(discussion_created_at.replace('Z', '+00:00')) # Only track discussions created on or after start_date if created_dt < start_date: continue except: continue # Group by creator (assistant identifier) # Only track discussions from our assistant identifiers if discussion_creator not in identifier_set: continue # Determine discussion state # A discussion is "resolved" if it has an answer chosen OR is marked answered is_resolved = False if discussion_closed_at: is_resolved = True elif discussion_state_reason and 'answered' in discussion_state_reason.lower(): is_resolved = True # Store discussion metadata discussion_meta = { 'url': discussion_url, 'repo': repo_name, 'created_at': normalize_date_format(discussion_created_at), 'closed_at': normalize_date_format(discussion_closed_at) if discussion_closed_at else None, 'state': 'resolved' if is_resolved else 'open', 'state_reason': discussion_state_reason } # Group by assistant if discussion_creator not in discussions_by_agent: discussions_by_agent[discussion_creator] = [] discussions_by_agent[discussion_creator].append(discussion_meta) print(f"✓ {len(agent_issue_events)} assistant issues, {len(issue_events)} wanted issues, {len(pr_events)} PRs, {len(discussion_events)} discussions") except Exception as e: print(f"\n ✗ Batch {batch_num} error: {str(e)}") traceback.print_exc() # Move to next batch current_date = batch_end + timedelta(days=1) # Post-processing: Filter issues and assign to assistants print(f"\n Post-processing {len(all_issues)} wanted issues...") wanted_open = [] wanted_resolved = defaultdict(list) current_time = datetime.now(timezone.utc) for issue_url, issue_meta in all_issues.items(): # Check if issue has linked PRs linked_prs = issue_to_prs.get(issue_url, set()) if not linked_prs: continue # Check if any linked PR was merged AND created by an assistant resolved_by = None for pr_url in linked_prs: merged_at = pr_merged_at.get(pr_url) if merged_at: # PR was merged pr_creator = pr_creators.get(pr_url) if pr_creator in identifier_set: resolved_by = pr_creator break if not resolved_by: continue # Process based on issue state if issue_meta['state'] == 'open': # For open issues: check if labels match PATCH_WANTED_LABELS issue_labels = issue_meta.get('labels', []) has_patch_label = False for issue_label in issue_labels: for wanted_label in PATCH_WANTED_LABELS: if wanted_label.lower() in issue_label: has_patch_label = True break if has_patch_label: break if not has_patch_label: continue # Check if long-standing created_at_str = issue_meta.get('created_at') if created_at_str and created_at_str != 'N/A': try: created_dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) days_open = (current_time - created_dt).days if days_open >= LONGSTANDING_GAP_DAYS: wanted_open.append(issue_meta) except: pass elif issue_meta['state'] == 'closed': # For closed issues: must be closed within time frame AND open 30+ days closed_at_str = issue_meta.get('closed_at') created_at_str = issue_meta.get('created_at') if closed_at_str and closed_at_str != 'N/A' and created_at_str and created_at_str != 'N/A': try: closed_dt = datetime.fromisoformat(closed_at_str.replace('Z', '+00:00')) created_dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) # Calculate how long the issue was open days_open = (closed_dt - created_dt).days # Only include if closed within timeframe AND was open 30+ days if start_date <= closed_dt <= end_date and days_open >= LONGSTANDING_GAP_DAYS: wanted_resolved[resolved_by].append(issue_meta) except: pass print(f" ✓ Found {sum(len(issues) for issues in agent_issues.values())} assistant-assigned issues across {len(agent_issues)} assistants") print(f" ✓ Found {len(wanted_open)} long-standing open wanted issues") print(f" ✓ Found {sum(len(issues) for issues in wanted_resolved.values())} resolved wanted issues across {len(wanted_resolved)} assistants") print(f" ✓ Found {sum(len(discussions) for discussions in discussions_by_agent.values())} discussions across {len(discussions_by_agent)} assistants") return { 'agent_issues': dict(agent_issues), 'wanted_open': wanted_open, 'wanted_resolved': dict(wanted_resolved), 'agent_discussions': dict(discussions_by_agent) } def sync_agents_repo(): """ Sync local bot_data repository with remote using git pull. This is MANDATORY to ensure we have the latest bot data. Raises exception if sync fails. """ if not os.path.exists(AGENTS_REPO_LOCAL_PATH): error_msg = f"Local repository not found at {AGENTS_REPO_LOCAL_PATH}" print(f" ✗ {error_msg}") print(f" Please clone it first: git clone https://huggingface.co/datasets/{AGENTS_REPO}") raise FileNotFoundError(error_msg) if not os.path.exists(os.path.join(AGENTS_REPO_LOCAL_PATH, '.git')): error_msg = f"{AGENTS_REPO_LOCAL_PATH} exists but is not a git repository" print(f" ✗ {error_msg}") raise ValueError(error_msg) try: # Run git pull with extended timeout due to large repository result = subprocess.run( ['git', 'pull'], cwd=AGENTS_REPO_LOCAL_PATH, capture_output=True, text=True, timeout=GIT_SYNC_TIMEOUT ) if result.returncode == 0: output = result.stdout.strip() if "Already up to date" in output or "Already up-to-date" in output: print(f" ✓ Repository is up to date") else: print(f" ✓ Repository synced successfully") if output: # Print first few lines of output lines = output.split('\n')[:5] for line in lines: print(f" {line}") return True else: error_msg = f"Git pull failed: {result.stderr.strip()}" print(f" ✗ {error_msg}") raise RuntimeError(error_msg) except subprocess.TimeoutExpired: error_msg = f"Git pull timed out after {GIT_SYNC_TIMEOUT} seconds" print(f" ✗ {error_msg}") raise TimeoutError(error_msg) except (FileNotFoundError, ValueError, RuntimeError, TimeoutError): raise # Re-raise expected exceptions except Exception as e: error_msg = f"Error syncing repository: {str(e)}" print(f" ✗ {error_msg}") raise RuntimeError(error_msg) from e def load_agents_from_hf(): """ Load all assistant metadata JSON files from local git repository. ALWAYS syncs with remote first to ensure we have the latest bot data. """ # MANDATORY: Sync with remote first to get latest bot data print(f" Syncing bot_data repository to get latest assistants...") sync_agents_repo() # Will raise exception if sync fails assistants = [] # Scan local directory for JSON files if not os.path.exists(AGENTS_REPO_LOCAL_PATH): raise FileNotFoundError(f"Local repository not found at {AGENTS_REPO_LOCAL_PATH}") # Walk through the directory to find all JSON files files_processed = 0 print(f" Loading assistant metadata from {AGENTS_REPO_LOCAL_PATH}...") for root, dirs, files in os.walk(AGENTS_REPO_LOCAL_PATH): # Skip .git directory if '.git' in root: continue for filename in files: if not filename.endswith('.json'): continue files_processed += 1 file_path = os.path.join(root, filename) try: with open(file_path, 'r', encoding='utf-8') as f: agent_data = json.load(f) # Only include active assistants if agent_data.get('status') != 'active': continue # Extract github_identifier from filename github_identifier = filename.replace('.json', '') agent_data['github_identifier'] = github_identifier assistants.append(agent_data) except Exception as e: print(f" ⚠ Error loading {filename}: {str(e)}") continue print(f" ✓ Loaded {len(assistants)} active assistants (from {files_processed} total files)") return assistants def calculate_issue_stats_from_metadata(metadata_list): """Calculate statistics from a list of issue metadata.""" total_issues = len(metadata_list) closed = sum(1 for issue_meta in metadata_list if issue_meta.get('closed_at')) resolved = sum(1 for issue_meta in metadata_list if issue_meta.get('state_reason') == 'completed') # Resolved rate = resolved / closed (not resolved / total) resolved_rate = (resolved / closed * 100) if closed > 0 else 0 return { 'total_issues': total_issues, 'closed_issues': closed, 'resolved_issues': resolved, 'resolved_rate': round(resolved_rate, 2), } def calculate_monthly_metrics_by_agent(all_metadata_dict, assistants): """Calculate monthly metrics for all assistants for visualization.""" identifier_to_name = {assistant.get('github_identifier'): assistant.get('name') for assistant in assistants if assistant.get('github_identifier')} if not all_metadata_dict: return {'assistants': [], 'months': [], 'data': {}} agent_month_data = defaultdict(lambda: defaultdict(list)) for agent_identifier, metadata_list in all_metadata_dict.items(): for issue_meta in metadata_list: created_at = issue_meta.get('created_at') if not created_at: continue agent_name = identifier_to_name.get(agent_identifier, agent_identifier) try: dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) month_key = f"{dt.year}-{dt.month:02d}" agent_month_data[agent_name][month_key].append(issue_meta) except Exception as e: print(f"Warning: Could not parse date '{created_at}': {e}") continue all_months = set() for agent_data in agent_month_data.values(): all_months.update(agent_data.keys()) months = sorted(list(all_months)) result_data = {} for agent_name, month_dict in agent_month_data.items(): resolved_rates = [] total_issues_list = [] resolved_issues_list = [] closed_issues_list = [] for month in months: issues_in_month = month_dict.get(month, []) resolved_count = sum(1 for issue in issues_in_month if issue.get('state_reason') == 'completed') closed_count = sum(1 for issue in issues_in_month if issue.get('closed_at')) total_count = len(issues_in_month) # Resolved rate = resolved / closed (not resolved / total) resolved_rate = (resolved_count / closed_count * 100) if closed_count > 0 else None resolved_rates.append(resolved_rate) total_issues_list.append(total_count) resolved_issues_list.append(resolved_count) closed_issues_list.append(closed_count) result_data[agent_name] = { 'resolved_rates': resolved_rates, 'total_issues': total_issues_list, 'resolved_issues': resolved_issues_list, 'closed_issues': closed_issues_list } agents_list = sorted(list(agent_month_data.keys())) return { 'assistants': agents_list, 'months': months, 'data': result_data } def calculate_discussion_stats_from_metadata(metadata_list): """Calculate statistics from a list of discussion metadata.""" total_discussions = len(metadata_list) resolved = sum(1 for discussion_meta in metadata_list if discussion_meta.get('state') == 'resolved') # Resolved rate = resolved / total * 100 resolved_rate = (resolved / total_discussions * 100) if total_discussions > 0 else 0 return { 'total_discussions': total_discussions, 'resolved_discussions': resolved, 'discussion_resolved_rate': round(resolved_rate, 2), } def calculate_monthly_metrics_by_agent_discussions(all_discussions_dict, assistants): """Calculate monthly metrics for discussions for all assistants for visualization.""" identifier_to_name = {assistant.get('github_identifier'): assistant.get('name') for assistant in assistants if assistant.get('github_identifier')} if not all_discussions_dict: return {'assistants': [], 'months': [], 'data': {}} agent_month_data = defaultdict(lambda: defaultdict(list)) for agent_identifier, metadata_list in all_discussions_dict.items(): for discussion_meta in metadata_list: created_at = discussion_meta.get('created_at') if not created_at: continue agent_name = identifier_to_name.get(agent_identifier, agent_identifier) try: dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) month_key = f"{dt.year}-{dt.month:02d}" agent_month_data[agent_name][month_key].append(discussion_meta) except Exception as e: print(f"Warning: Could not parse discussion date '{created_at}': {e}") continue all_months = set() for agent_data in agent_month_data.values(): all_months.update(agent_data.keys()) months = sorted(list(all_months)) result_data = {} for agent_name, month_dict in agent_month_data.items(): resolved_rates = [] total_discussions_list = [] resolved_discussions_list = [] for month in months: discussions_in_month = month_dict.get(month, []) resolved_count = sum(1 for discussion in discussions_in_month if discussion.get('state') == 'resolved') total_count = len(discussions_in_month) # Resolved rate = resolved / total * 100 resolved_rate = (resolved_count / total_count * 100) if total_count > 0 else None resolved_rates.append(resolved_rate) total_discussions_list.append(total_count) resolved_discussions_list.append(resolved_count) result_data[agent_name] = { 'resolved_rates': resolved_rates, 'total_discussions': total_discussions_list, 'resolved_discussions': resolved_discussions_list } agents_list = sorted(list(agent_month_data.keys())) return { 'assistants': agents_list, 'months': months, 'data': result_data } def construct_leaderboard_from_metadata(all_metadata_dict, assistants, wanted_resolved_dict=None, discussions_dict=None): """Construct leaderboard from in-memory issue metadata and discussion metadata. Args: all_metadata_dict: Dictionary mapping assistant ID to list of issue metadata (assistant-assigned issues) assistants: List of assistant metadata wanted_resolved_dict: Optional dictionary mapping assistant ID to list of resolved wanted issues discussions_dict: Optional dictionary mapping assistant ID to list of discussion metadata """ if not assistants: print("Error: No assistants found") return {} if wanted_resolved_dict is None: wanted_resolved_dict = {} if discussions_dict is None: discussions_dict = {} cache_dict = {} for assistant in assistants: identifier = assistant.get('github_identifier') agent_name = assistant.get('name', 'Unknown') bot_data = all_metadata_dict.get(identifier, []) stats = calculate_issue_stats_from_metadata(bot_data) # Add wanted issues count resolved_wanted = len(wanted_resolved_dict.get(identifier, [])) # Add discussion stats discussion_metadata = discussions_dict.get(identifier, []) discussion_stats = calculate_discussion_stats_from_metadata(discussion_metadata) cache_dict[identifier] = { 'name': agent_name, 'website': assistant.get('website', 'N/A'), 'github_identifier': identifier, **stats, 'resolved_wanted_issues': resolved_wanted, **discussion_stats } return cache_dict def save_leaderboard_data_to_hf(leaderboard_dict, issue_monthly_metrics, wanted_issues=None, discussion_monthly_metrics=None): """Save leaderboard data, monthly metrics, wanted issues, and discussion metrics to HuggingFace dataset.""" try: token = get_hf_token() if not token: raise Exception("No HuggingFace token found") api = HfApi(token=token) if wanted_issues is None: wanted_issues = [] combined_data = { 'metadata': { 'last_updated': datetime.now(timezone.utc).isoformat(), 'leaderboard_time_frame_days': LEADERBOARD_TIME_FRAME_DAYS, 'longstanding_gap_days': LONGSTANDING_GAP_DAYS, 'tracked_orgs': TRACKED_ORGS, 'patch_wanted_labels': PATCH_WANTED_LABELS }, 'leaderboard': leaderboard_dict, 'issue_monthly_metrics': issue_monthly_metrics, 'wanted_issues': wanted_issues, 'discussion_monthly_metrics': discussion_monthly_metrics } with open(LEADERBOARD_FILENAME, 'w') as f: json.dump(combined_data, f, indent=2) try: upload_file_with_backoff( api=api, path_or_fileobj=LEADERBOARD_FILENAME, path_in_repo=LEADERBOARD_FILENAME, repo_id=LEADERBOARD_REPO, repo_type="dataset" ) return True finally: if os.path.exists(LEADERBOARD_FILENAME): os.remove(LEADERBOARD_FILENAME) except Exception as e: print(f"Error saving leaderboard data: {str(e)}") traceback.print_exc() return False # ============================================================================= # MINING FUNCTION # ============================================================================= def mine_all_agents(): """ Mine issue metadata for all assistants using STREAMING batch processing. Downloads GHArchive data, then uses BATCH-based DuckDB queries. """ print(f"\n[1/4] Downloading GHArchive data...") if not download_all_gharchive_data(): print("Warning: Download had errors, continuing with available data...") print(f"\n[2/4] Loading assistant metadata...") assistants = load_agents_from_hf() if not assistants: print("Error: No assistants found") return identifiers = [assistant['github_identifier'] for assistant in assistants if assistant.get('github_identifier')] if not identifiers: print("Error: No valid assistant identifiers found") return print(f"\n[3/4] Mining issue metadata ({len(identifiers)} assistants, {LEADERBOARD_TIME_FRAME_DAYS} days)...") try: conn = get_duckdb_connection() except Exception as e: print(f"Failed to initialize DuckDB connection: {str(e)}") return current_time = datetime.now(timezone.utc) end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0) start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS) try: # USE UNIFIED STREAMING FUNCTION FOR ISSUES, WANTED, AND DISCUSSIONS results = fetch_all_metadata_streaming( conn, identifiers, start_date, end_date ) agent_issues = results['agent_issues'] wanted_open = results['wanted_open'] wanted_resolved = results['wanted_resolved'] agent_discussions = results['agent_discussions'] except Exception as e: print(f"Error during DuckDB fetch: {str(e)}") traceback.print_exc() return finally: conn.close() print(f"\n[4/4] Saving leaderboard...") try: leaderboard_dict = construct_leaderboard_from_metadata( agent_issues, assistants, wanted_resolved, agent_discussions ) issue_monthly_metrics = calculate_monthly_metrics_by_agent(agent_issues, assistants) discussion_monthly_metrics = calculate_monthly_metrics_by_agent_discussions( agent_discussions, assistants ) save_leaderboard_data_to_hf( leaderboard_dict, issue_monthly_metrics, wanted_open, discussion_monthly_metrics ) except Exception as e: print(f"Error saving leaderboard: {str(e)}") traceback.print_exc() finally: # Clean up DuckDB cache file to save storage if os.path.exists(DUCKDB_CACHE_FILE): try: os.remove(DUCKDB_CACHE_FILE) print(f" ✓ Cache file removed: {DUCKDB_CACHE_FILE}") except Exception as e: print(f" ⚠ Failed to remove cache file: {str(e)}") # ============================================================================= # SCHEDULER SETUP # ============================================================================= def setup_scheduler(): """Set up APScheduler to run mining jobs periodically.""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logging.getLogger('httpx').setLevel(logging.WARNING) scheduler = BlockingScheduler(timezone=SCHEDULE_TIMEZONE) trigger = CronTrigger( day_of_week=SCHEDULE_DAY_OF_WEEK, hour=SCHEDULE_HOUR, minute=SCHEDULE_MINUTE, timezone=SCHEDULE_TIMEZONE ) scheduler.add_job( mine_all_agents, trigger=trigger, id='mine_all_agents', name='Mine GHArchive data for all assistants', replace_existing=True ) next_run = trigger.get_next_fire_time(None, datetime.now(trigger.timezone)) print(f"Scheduler: Weekly on {SCHEDULE_DAY_OF_WEEK} at {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d} {SCHEDULE_TIMEZONE}") print(f"Next run: {next_run}\n") print(f"\nScheduler started") scheduler.start() # ============================================================================= # ENTRY POINT # ============================================================================= if __name__ == "__main__": if SCHEDULE_ENABLED: setup_scheduler() else: mine_all_agents()