Spaces:
Runtime error
Runtime error
| from backend.app.utils.get_paid_problems import get_paid_problems | |
| from backend.app.utils.get_embeddings import get_embedding | |
| import re | |
| import logging | |
| import os | |
| import json | |
| import requests | |
| import csv | |
| from bs4 import BeautifulSoup | |
| logging.basicConfig(level=logging.INFO) | |
| def generate_embeddings(data): | |
| for problem in data: | |
| embedding = get_embedding(problem.get('content', '')) | |
| problem['embedding'] = embedding | |
| def get_all_problems(): | |
| """ | |
| Downloads the LeetCode problems JSON and saves it locally. | |
| Returns the parsed JSON content. | |
| """ | |
| download_url = "https://raw.githubusercontent.com/noworneverev/leetcode" \ | |
| "-api/main/data/leetcode_questions.json" | |
| json_path = os.path.join(os.path.dirname( | |
| __file__), 'leetcode_questions.json') | |
| try: | |
| response = requests.get(download_url, timeout=10) | |
| response.raise_for_status() | |
| with open(json_path, 'w') as f: | |
| f.write(response.text) | |
| logging.info("Downloaded and saved leetcode_questions.json") | |
| except requests.RequestException as e: | |
| logging.error(f"Failed to download JSON: {e}") | |
| if not os.path.exists(json_path): | |
| raise FileNotFoundError( | |
| "No local leetcode_questions.json file available.") | |
| else: | |
| logging.info("Using existing local file") | |
| with open(json_path, 'r') as f: | |
| return json.load(f) | |
| def format_problem(problems=[], type=False): | |
| formatted_problems = [] | |
| for problem in problems: | |
| raw_html = problem.get('content', '') | |
| soup = BeautifulSoup(raw_html, 'html.parser') | |
| clean_text = soup.get_text(separator=" ").strip().replace("\n", " ") | |
| clean_text = ' '.join(clean_text.split()) | |
| clean_text = re.sub(r"(?<=\s)(10|2)\s+(\d+)(?=\s)", | |
| r"\1^\2", clean_text) | |
| formatted_problems.append({ | |
| 'id': problem['id'], | |
| 'title': problem['title'], | |
| 'url': f"https://leetcode.com/problems/{problem['slug']}", | |
| 'paidOnly': type, | |
| 'slug': problem['slug'], | |
| 'content': clean_text, | |
| 'original_content': raw_html, | |
| 'difficulty': problem['difficulty'], | |
| 'topicTags': problem.get('topicTags', []), | |
| }) | |
| return formatted_problems | |
| def filter_problems(problems=[]): | |
| filtered_problems_free = [] | |
| filtered_problems_paid = [] | |
| for problem in problems: | |
| problem = problem['data']['question'] | |
| if problem['isPaidOnly']: | |
| filtered_problems_paid.append({ | |
| 'id': problem['questionFrontendId'], | |
| 'title': problem['title'], | |
| 'difficulty': problem['difficulty'], | |
| 'slug': problem['url'].rstrip('/').split('/')[-1], | |
| 'topicTags': [tag['name'] for tag in problem['topicTags']], | |
| }) | |
| else: | |
| filtered_problems_free.append({ | |
| 'id': problem['questionFrontendId'], | |
| 'title': problem['title'], | |
| 'slug': problem['url'].rstrip('/').split('/')[-1], | |
| 'content': problem['content'], | |
| 'difficulty': problem['difficulty'], | |
| 'topicTags': [tag['name'] for tag in problem['topicTags']], | |
| }) | |
| return filtered_problems_free, filtered_problems_paid | |
| def save_to_csv(data, filename='problems.csv'): | |
| """ | |
| Saves the provided data to a CSV file. | |
| """ | |
| csv_path = os.path.join(os.path.dirname(__file__), filename) | |
| with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile: | |
| fieldnames = ['id', 'id_num', 'url', 'title', | |
| 'paid_only', 'content', 'original_content', 'embedding', 'difficulty', 'topictags'] | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for row in data: | |
| writer.writerow(row) | |
| logging.info(f"Saved data to {csv_path}") | |
| def order_data(data): | |
| csv_data = [] | |
| for problem in data: | |
| problem['topictags'] = '@'.join(problem.get('topictags', [])) | |
| csv_data.append({ | |
| 'id': problem['id'], | |
| 'id_num': int(problem['id']), | |
| 'url': f"https://leetcode.com/problems/{problem['slug']}", | |
| 'title': problem['title'], | |
| 'paid_only': problem['paidOnly'], | |
| 'content': problem.get('content', ''), | |
| 'original_content': problem.get('original_content', ''), | |
| 'embedding': json.dumps(problem.get('embedding', [])), | |
| 'difficulty': problem['difficulty'], | |
| 'topicTags': problem['topicTags'], | |
| }) | |
| return csv_data | |
| def populate_db(): | |
| logging.info("Starting database population...") | |
| problems = get_all_problems() | |
| filtered_problems_free, filtered_problems_paid = filter_problems(problems) | |
| problems_paid_with_content = get_paid_problems( | |
| problems=filtered_problems_paid) | |
| formatted_problems_paid = format_problem(problems_paid_with_content, True) | |
| formatted_problems_free = format_problem(filtered_problems_free, False) | |
| formatted_problems_free.extend(formatted_problems_paid) | |
| logging.info(f"Total problems to insert: {len(formatted_problems_free)}") | |
| generate_embeddings(formatted_problems_free) | |
| csv_data = order_data(formatted_problems_free) | |
| save_to_csv(csv_data) | |