File size: 5,374 Bytes
ae9d4c8
 
c8fa3bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6505613
 
 
c8fa3bd
 
 
 
 
 
 
 
 
 
 
 
 
6505613
 
 
 
c8fa3bd
 
 
 
 
 
6505613
 
c8fa3bd
 
 
 
 
 
 
 
 
 
 
6505613
c8fa3bd
 
 
 
 
 
 
 
 
 
6505613
c8fa3bd
 
 
 
 
 
 
 
6505613
 
 
c8fa3bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from backend.app.utils.get_paid_problems import get_paid_problems
from backend.app.utils.get_embeddings import get_embedding
import re
import logging
import os
import json
import requests
import csv
from bs4 import BeautifulSoup

logging.basicConfig(level=logging.INFO)


def generate_embeddings(data):
    for problem in data:
        embedding = get_embedding(problem.get('content', ''))
        problem['embedding'] = embedding


def get_all_problems():
    """
    Downloads the LeetCode problems JSON and saves it locally.
    Returns the parsed JSON content.
    """
    download_url = "https://raw.githubusercontent.com/noworneverev/leetcode" \
        "-api/main/data/leetcode_questions.json"
    json_path = os.path.join(os.path.dirname(
        __file__), 'leetcode_questions.json')

    try:
        response = requests.get(download_url, timeout=10)
        response.raise_for_status()
        with open(json_path, 'w') as f:
            f.write(response.text)
            logging.info("Downloaded and saved leetcode_questions.json")
    except requests.RequestException as e:
        logging.error(f"Failed to download JSON: {e}")
        if not os.path.exists(json_path):
            raise FileNotFoundError(
                "No local leetcode_questions.json file available.")
        else:
            logging.info("Using existing local file")

    with open(json_path, 'r') as f:
        return json.load(f)


def format_problem(problems=[], type=False):
    formatted_problems = []
    for problem in problems:
        raw_html = problem.get('content', '')
        soup = BeautifulSoup(raw_html, 'html.parser')
        clean_text = soup.get_text(separator=" ").strip().replace("\n", " ")
        clean_text = ' '.join(clean_text.split())
        clean_text = re.sub(r"(?<=\s)(10|2)\s+(\d+)(?=\s)",
                            r"\1^\2", clean_text)
        formatted_problems.append({
            'id': problem['id'],
            'title': problem['title'],
            'url': f"https://leetcode.com/problems/{problem['slug']}",
            'paidOnly': type,
            'slug': problem['slug'],
            'content': clean_text,
            'original_content': raw_html,
            'difficulty': problem['difficulty'],
            'topicTags': problem.get('topicTags', []),
        })
    return formatted_problems


def filter_problems(problems=[]):
    filtered_problems_free = []
    filtered_problems_paid = []
    for problem in problems:
        problem = problem['data']['question']
        if problem['isPaidOnly']:
            filtered_problems_paid.append({
                'id': problem['questionFrontendId'],
                'title': problem['title'],
                'difficulty': problem['difficulty'],
                'slug': problem['url'].rstrip('/').split('/')[-1],
                'topicTags': [tag['name'] for tag in problem['topicTags']],
            })
        else:
            filtered_problems_free.append({
                'id': problem['questionFrontendId'],
                'title': problem['title'],
                'slug': problem['url'].rstrip('/').split('/')[-1],
                'content': problem['content'],
                'difficulty': problem['difficulty'],
                'topicTags': [tag['name'] for tag in problem['topicTags']],
            })
    return filtered_problems_free, filtered_problems_paid


def save_to_csv(data, filename='problems.csv'):
    """
    Saves the provided data to a CSV file.
    """
    csv_path = os.path.join(os.path.dirname(__file__), filename)
    with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['id', 'id_num', 'url', 'title',
                      'paid_only', 'content', 'original_content', 'embedding', 'difficulty', 'topictags']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for row in data:
            writer.writerow(row)
        logging.info(f"Saved data to {csv_path}")


def order_data(data):
    csv_data = []
    for problem in data:
        problem['topictags'] = '@'.join(problem.get('topictags', []))
        csv_data.append({
            'id': problem['id'],
            'id_num': int(problem['id']),
            'url': f"https://leetcode.com/problems/{problem['slug']}",
            'title': problem['title'],
            'paid_only': problem['paidOnly'],
            'content': problem.get('content', ''),
            'original_content': problem.get('original_content', ''),
            'embedding': json.dumps(problem.get('embedding', [])),
            'difficulty': problem['difficulty'],
            'topicTags': problem['topicTags'],
        })
    return csv_data


def populate_db():
    logging.info("Starting database population...")
    problems = get_all_problems()
    filtered_problems_free, filtered_problems_paid = filter_problems(problems)
    problems_paid_with_content = get_paid_problems(
        problems=filtered_problems_paid)
    formatted_problems_paid = format_problem(problems_paid_with_content, True)
    formatted_problems_free = format_problem(filtered_problems_free, False)
    formatted_problems_free.extend(formatted_problems_paid)
    logging.info(f"Total problems to insert: {len(formatted_problems_free)}")
    generate_embeddings(formatted_problems_free)
    csv_data = order_data(formatted_problems_free)
    save_to_csv(csv_data)