Leet-Search / backend /app /scripts /populate_db.py
Patel aryan
refactor: update import paths to include 'backend.app' prefix for consistency
ae9d4c8
from backend.app.utils.get_paid_problems import get_paid_problems
from backend.app.utils.get_embeddings import get_embedding
import re
import logging
import os
import json
import requests
import csv
from bs4 import BeautifulSoup
logging.basicConfig(level=logging.INFO)
def generate_embeddings(data):
for problem in data:
embedding = get_embedding(problem.get('content', ''))
problem['embedding'] = embedding
def get_all_problems():
"""
Downloads the LeetCode problems JSON and saves it locally.
Returns the parsed JSON content.
"""
download_url = "https://raw.githubusercontent.com/noworneverev/leetcode" \
"-api/main/data/leetcode_questions.json"
json_path = os.path.join(os.path.dirname(
__file__), 'leetcode_questions.json')
try:
response = requests.get(download_url, timeout=10)
response.raise_for_status()
with open(json_path, 'w') as f:
f.write(response.text)
logging.info("Downloaded and saved leetcode_questions.json")
except requests.RequestException as e:
logging.error(f"Failed to download JSON: {e}")
if not os.path.exists(json_path):
raise FileNotFoundError(
"No local leetcode_questions.json file available.")
else:
logging.info("Using existing local file")
with open(json_path, 'r') as f:
return json.load(f)
def format_problem(problems=[], type=False):
formatted_problems = []
for problem in problems:
raw_html = problem.get('content', '')
soup = BeautifulSoup(raw_html, 'html.parser')
clean_text = soup.get_text(separator=" ").strip().replace("\n", " ")
clean_text = ' '.join(clean_text.split())
clean_text = re.sub(r"(?<=\s)(10|2)\s+(\d+)(?=\s)",
r"\1^\2", clean_text)
formatted_problems.append({
'id': problem['id'],
'title': problem['title'],
'url': f"https://leetcode.com/problems/{problem['slug']}",
'paidOnly': type,
'slug': problem['slug'],
'content': clean_text,
'original_content': raw_html,
'difficulty': problem['difficulty'],
'topicTags': problem.get('topicTags', []),
})
return formatted_problems
def filter_problems(problems=[]):
filtered_problems_free = []
filtered_problems_paid = []
for problem in problems:
problem = problem['data']['question']
if problem['isPaidOnly']:
filtered_problems_paid.append({
'id': problem['questionFrontendId'],
'title': problem['title'],
'difficulty': problem['difficulty'],
'slug': problem['url'].rstrip('/').split('/')[-1],
'topicTags': [tag['name'] for tag in problem['topicTags']],
})
else:
filtered_problems_free.append({
'id': problem['questionFrontendId'],
'title': problem['title'],
'slug': problem['url'].rstrip('/').split('/')[-1],
'content': problem['content'],
'difficulty': problem['difficulty'],
'topicTags': [tag['name'] for tag in problem['topicTags']],
})
return filtered_problems_free, filtered_problems_paid
def save_to_csv(data, filename='problems.csv'):
"""
Saves the provided data to a CSV file.
"""
csv_path = os.path.join(os.path.dirname(__file__), filename)
with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = ['id', 'id_num', 'url', 'title',
'paid_only', 'content', 'original_content', 'embedding', 'difficulty', 'topictags']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in data:
writer.writerow(row)
logging.info(f"Saved data to {csv_path}")
def order_data(data):
csv_data = []
for problem in data:
problem['topictags'] = '@'.join(problem.get('topictags', []))
csv_data.append({
'id': problem['id'],
'id_num': int(problem['id']),
'url': f"https://leetcode.com/problems/{problem['slug']}",
'title': problem['title'],
'paid_only': problem['paidOnly'],
'content': problem.get('content', ''),
'original_content': problem.get('original_content', ''),
'embedding': json.dumps(problem.get('embedding', [])),
'difficulty': problem['difficulty'],
'topicTags': problem['topicTags'],
})
return csv_data
def populate_db():
logging.info("Starting database population...")
problems = get_all_problems()
filtered_problems_free, filtered_problems_paid = filter_problems(problems)
problems_paid_with_content = get_paid_problems(
problems=filtered_problems_paid)
formatted_problems_paid = format_problem(problems_paid_with_content, True)
formatted_problems_free = format_problem(filtered_problems_free, False)
formatted_problems_free.extend(formatted_problems_paid)
logging.info(f"Total problems to insert: {len(formatted_problems_free)}")
generate_embeddings(formatted_problems_free)
csv_data = order_data(formatted_problems_free)
save_to_csv(csv_data)