Spaces:
Runtime error
Runtime error
File size: 5,374 Bytes
ae9d4c8 c8fa3bd 6505613 c8fa3bd 6505613 c8fa3bd 6505613 c8fa3bd 6505613 c8fa3bd 6505613 c8fa3bd 6505613 c8fa3bd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
from backend.app.utils.get_paid_problems import get_paid_problems
from backend.app.utils.get_embeddings import get_embedding
import re
import logging
import os
import json
import requests
import csv
from bs4 import BeautifulSoup
logging.basicConfig(level=logging.INFO)
def generate_embeddings(data):
for problem in data:
embedding = get_embedding(problem.get('content', ''))
problem['embedding'] = embedding
def get_all_problems():
"""
Downloads the LeetCode problems JSON and saves it locally.
Returns the parsed JSON content.
"""
download_url = "https://raw.githubusercontent.com/noworneverev/leetcode" \
"-api/main/data/leetcode_questions.json"
json_path = os.path.join(os.path.dirname(
__file__), 'leetcode_questions.json')
try:
response = requests.get(download_url, timeout=10)
response.raise_for_status()
with open(json_path, 'w') as f:
f.write(response.text)
logging.info("Downloaded and saved leetcode_questions.json")
except requests.RequestException as e:
logging.error(f"Failed to download JSON: {e}")
if not os.path.exists(json_path):
raise FileNotFoundError(
"No local leetcode_questions.json file available.")
else:
logging.info("Using existing local file")
with open(json_path, 'r') as f:
return json.load(f)
def format_problem(problems=[], type=False):
formatted_problems = []
for problem in problems:
raw_html = problem.get('content', '')
soup = BeautifulSoup(raw_html, 'html.parser')
clean_text = soup.get_text(separator=" ").strip().replace("\n", " ")
clean_text = ' '.join(clean_text.split())
clean_text = re.sub(r"(?<=\s)(10|2)\s+(\d+)(?=\s)",
r"\1^\2", clean_text)
formatted_problems.append({
'id': problem['id'],
'title': problem['title'],
'url': f"https://leetcode.com/problems/{problem['slug']}",
'paidOnly': type,
'slug': problem['slug'],
'content': clean_text,
'original_content': raw_html,
'difficulty': problem['difficulty'],
'topicTags': problem.get('topicTags', []),
})
return formatted_problems
def filter_problems(problems=[]):
filtered_problems_free = []
filtered_problems_paid = []
for problem in problems:
problem = problem['data']['question']
if problem['isPaidOnly']:
filtered_problems_paid.append({
'id': problem['questionFrontendId'],
'title': problem['title'],
'difficulty': problem['difficulty'],
'slug': problem['url'].rstrip('/').split('/')[-1],
'topicTags': [tag['name'] for tag in problem['topicTags']],
})
else:
filtered_problems_free.append({
'id': problem['questionFrontendId'],
'title': problem['title'],
'slug': problem['url'].rstrip('/').split('/')[-1],
'content': problem['content'],
'difficulty': problem['difficulty'],
'topicTags': [tag['name'] for tag in problem['topicTags']],
})
return filtered_problems_free, filtered_problems_paid
def save_to_csv(data, filename='problems.csv'):
"""
Saves the provided data to a CSV file.
"""
csv_path = os.path.join(os.path.dirname(__file__), filename)
with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['id', 'id_num', 'url', 'title',
'paid_only', 'content', 'original_content', 'embedding', 'difficulty', 'topictags']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)
logging.info(f"Saved data to {csv_path}")
def order_data(data):
csv_data = []
for problem in data:
problem['topictags'] = '@'.join(problem.get('topictags', []))
csv_data.append({
'id': problem['id'],
'id_num': int(problem['id']),
'url': f"https://leetcode.com/problems/{problem['slug']}",
'title': problem['title'],
'paid_only': problem['paidOnly'],
'content': problem.get('content', ''),
'original_content': problem.get('original_content', ''),
'embedding': json.dumps(problem.get('embedding', [])),
'difficulty': problem['difficulty'],
'topicTags': problem['topicTags'],
})
return csv_data
def populate_db():
logging.info("Starting database population...")
problems = get_all_problems()
filtered_problems_free, filtered_problems_paid = filter_problems(problems)
problems_paid_with_content = get_paid_problems(
problems=filtered_problems_paid)
formatted_problems_paid = format_problem(problems_paid_with_content, True)
formatted_problems_free = format_problem(filtered_problems_free, False)
formatted_problems_free.extend(formatted_problems_paid)
logging.info(f"Total problems to insert: {len(formatted_problems_free)}")
generate_embeddings(formatted_problems_free)
csv_data = order_data(formatted_problems_free)
save_to_csv(csv_data)
|