File size: 10,864 Bytes
ab250f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
from langchain_community.document_loaders import (
    UnstructuredWordDocumentLoader,
    TextLoader,
    CSVLoader,
    UnstructuredMarkdownLoader,
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from app.core.chunks import Chunk
import nltk  # used for proper tokenizer workflow
from uuid import (
    uuid4,
)  # for generating unique id as hex (uuid4 is used as it generates ids form pseudo random numbers unlike uuid1 and others)
import numpy as np
from app.settings import logging, settings
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import fitz

class PDFLoader:
    def __init__(self, file_path: str):
        self.file_path = file_path

    def load(self) -> list[Document]:
        docs = []
        with fitz.open(self.file_path) as doc:
            for page in doc:
                text = page.get_text("text")
                metadata = {
                    "source": self.file_path,
                    "page": page.number,
                }
                docs.append(Document(page_content=text, metadata=metadata))
        return docs


class DocumentProcessor:
    """
    TODO: determine the most suitable chunk size

    chunks -> the list of chunks from loaded files
    chunks_unsaved -> the list of recently added chunks that have not been saved to db yet
    processed -> the list of files that were already splitted into chunks
    unprocessed -> !processed
    text_splitter -> text splitting strategy
    """

    def __init__(self):
        self.chunks_unsaved: list[Chunk] = []
        self.unprocessed: list[Document] = []
        self.max_workers = min(4, os.cpu_count() or 1)
        self.text_splitter = RecursiveCharacterTextSplitter(
            **settings.text_splitter.model_dump()
        )

    """
    Measures cosine between two vectors
    """

    def cosine_similarity(self, vec1, vec2):
        return vec1 @ vec2 / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

    """
    Updates a list of the most relevant chunks without interacting with db
    """

    def update_most_relevant_chunk(
        self,
        chunk: list[np.float64, Chunk],
        relevant_chunks: list[list[np.float64, Chunk]],
        mx_len=15,
    ):
        relevant_chunks.append(chunk)
        for i in range(len(relevant_chunks) - 1, 0, -1):
            if relevant_chunks[i][0] > relevant_chunks[i - 1][0]:
                relevant_chunks[i], relevant_chunks[i - 1] = (
                    relevant_chunks[i - 1],
                    relevant_chunks[i],
                )
            else:
                break

        if len(relevant_chunks) > mx_len:
            del relevant_chunks[-1]

    """
    Loads one file - extracts text from file

    TODO: Replace UnstructuredWordDocumentLoader with Docx2txtLoader
    TODO: Play with .pdf and text from img extraction
    TODO: Try chunking with llm

    add_to_unprocessed -> used to add loaded file to the list of unprocessed(unchunked) files if true
    """

    def check_size(self, file_path: str = "") -> bool:
        try:
            size = os.path.getsize(filename=file_path)
        except Exception:
            size = 0

        if size > 1000000:
            return True
        return False

    def document_multiplexer(self, filepath: str, get_loader: bool = False, get_chunking_strategy: bool = False):
        loader = None
        parallelization = False
        if filepath.endswith(".pdf"):
            loader = PDFLoader(
                file_path=filepath
            )  # splits each presentation into slides and processes it as separate file
            parallelization = False
        elif filepath.endswith(".docx") or filepath.endswith(".doc"):
            loader = UnstructuredWordDocumentLoader(file_path=filepath)
        elif filepath.endswith(".txt"):
            loader = TextLoader(file_path=filepath)
        elif filepath.endswith(".csv"):
            loader = CSVLoader(file_path=filepath)
        elif filepath.endswith(".json"):
            loader = TextLoader(file_path=filepath)
        elif filepath.endswith(".md"):
            loader = UnstructuredMarkdownLoader(file_path=filepath)

        if filepath.endswith(".pdf"):
            parallelization = False
        else:
            parallelization = self.check_size(file_path=filepath)

        if get_loader:
            return loader
        elif get_chunking_strategy:
            return parallelization
        else:
            raise RuntimeError("What to do, my lord?")

    def load_document(
        self, filepath: str, add_to_unprocessed: bool = False
    ) -> list[Document]:
        loader = self.document_multiplexer(filepath=filepath, get_loader=True)

        if loader is None:
            raise RuntimeError("Unsupported type of file")

        documents: list[Document] = []  # We can not assign a single value to the document since .pdf are splitted into several files
        try:
            documents = loader.load()
            # print("-" * 100, documents, "-" * 100, sep="\n")
        except Exception:
            raise RuntimeError("File is corrupted")

        if add_to_unprocessed:
            for doc in documents:
                self.unprocessed.append(doc)

        strategy = self.document_multiplexer(filepath=filepath, get_chunking_strategy=True)
        print(f"Strategy --> {strategy}")
        self.generate_chunks(parallelization=strategy)
        return documents

    """
    Similar to load_document, but for multiple files

    add_to_unprocessed -> used to add loaded files to the list of unprocessed(unchunked) files if true
    """

    def load_documents(
        self, documents: list[str], add_to_unprocessed: bool = False
    ) -> list[Document]:
        extracted_documents: list[Document] = []

        for doc in documents:
            temp_storage: list[Document] = []

            try:
                temp_storage = self.load_document(
                    filepath=doc, add_to_unprocessed=True
                )
            except Exception as e:
                logging.error(
                    "Error at load_documents while loading %s", doc, exc_info=e
                )
                continue

            for extrc_doc in temp_storage:
                extracted_documents.append(extrc_doc)

                if add_to_unprocessed:
                    self.unprocessed.append(extrc_doc)

        return extracted_documents

    def split_into_groups(self, original_list: list[any], split_by: int = 15) -> list[list[any]]:
        output = []
        for i in range(0, len(original_list), split_by):
            new_group = original_list[i: i + split_by]
            output.append(new_group)
        return output

    def _chunkinize(self, document: Document, text: list[str], lines: list[dict]) -> list[Chunk]:
        output: list[Chunk] = []
        for chunk in text:
            start_l, end_l = self.get_start_end_lines(
                splitted_text=lines,
                start_char=chunk.metadata.get("start_index", 0),
                end_char=chunk.metadata.get("start_index", 0)
                + len(chunk.page_content),
            )

            new_chunk = Chunk(
                id=uuid4(),
                filename=document.metadata.get("source", ""),
                page_number=document.metadata.get("page", 0),
                start_index=chunk.metadata.get("start_index", 0),
                start_line=start_l,
                end_line=end_l,
                text=chunk.page_content,
            )
            # print(new_chunk)
            output.append(new_chunk)
        return output

    def precompute_lines(self, splitted_document: list[str]) -> list[dict]:
        current_start = 0
        output: list[dict] = []
        for i, line in enumerate(splitted_document):
            output.append({"id": i + 1, "start": current_start, "end": current_start + len(line) + 1, "text": line})
            current_start += len(line) + 1
        return output

    def generate_chunks(self, parallelization: bool = True):
        intermediate = []
        for document in self.unprocessed:
            text: list[str] = self.text_splitter.split_documents(documents=[document])
            lines: list[dict] = self.precompute_lines(splitted_document=document.page_content.splitlines())
            groups = self.split_into_groups(original_list=text, split_by=50)

            if parallelization:
                print("<------- Apply Parallel Execution ------->")
                with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
                    futures = [executor.submit(self._chunkinize, document, group, lines) for group in groups]
                    for feature in as_completed(futures):
                        intermediate.append(feature.result())
            else:
                intermediate.append(self._chunkinize(document=document, text=text, lines=lines))

        for group in intermediate:
            for chunk in group:
                self.chunks_unsaved.append(chunk)

        self.unprocessed = []

    def find_line(self, splitted_text: list[dict], char) -> int:
        l, r = 0, len(splitted_text) - 1

        while l <= r:
            m = (l + r) // 2
            line = splitted_text[m]

            if line["start"] <= char < line["end"]:
                return m + 1
            elif char < line["start"]:
                r = m - 1
            else:
                l = m + 1

        return r

    def get_start_end_lines(
        self,
        splitted_text: list[dict],
        start_char: int,
        end_char: int,
        debug_mode: bool = False,
    ) -> tuple[int, int]:
        start = self.find_line(splitted_text=splitted_text, char=start_char)
        end = self.find_line(splitted_text=splitted_text, char=end_char)
        return (start, end)

    """
    Note: it should be used only once to download tokenizers, futher usage is not recommended
    """

    def update_nltk(self) -> None:
        nltk.download("punkt")
        nltk.download("averaged_perceptron_tagger")

    """
    For now the system works as follows: we save recently loaded chunks in two arrays:
        chunks - for all chunks, even for that ones that havn't been saveed to db
        chunks_unsaved - for chunks that have been added recently
    I do not know weather we really need to store all chunks that were added in the
    current session, but chunks_unsaved are used to avoid dublications while saving to db.
    """

    def get_and_save_unsaved_chunks(self) -> list[Chunk]:
        chunks_copy: list[Chunk] = self.chunks_unsaved.copy()
        self.clear_unsaved_chunks()
        return chunks_copy

    def clear_unsaved_chunks(self):
        self.chunks_unsaved = []

    def get_all_chunks(self) -> list[Chunk]:
        return self.chunks_unsaved