File size: 8,363 Bytes
ffa0f3d
a3dfc07
7793bb6
aff5d04
0a7f9b4
aff5d04
ffa0f3d
7793bb6
 
ffa0f3d
 
aff5d04
 
 
a3dfc07
 
 
aff5d04
 
 
a3dfc07
aff5d04
ffa0f3d
 
7793bb6
ffa0f3d
 
 
 
aff5d04
 
 
ffa0f3d
 
159faf0
ffa0f3d
aff5d04
a3dfc07
aff5d04
 
0a7f9b4
 
aff5d04
0a7f9b4
 
aff5d04
 
a3dfc07
0a7f9b4
159faf0
0a7f9b4
aff5d04
 
 
 
 
7793bb6
0a7f9b4
ffa0f3d
 
aff5d04
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
 
 
 
 
7793bb6
ffa0f3d
7793bb6
ffa0f3d
0a7f9b4
ffa0f3d
159faf0
ffa0f3d
0a7f9b4
ffa0f3d
 
0a7f9b4
ffa0f3d
 
 
0a7f9b4
7793bb6
ffa0f3d
7793bb6
0a7f9b4
aff5d04
 
a725155
 
aff5d04
 
 
 
 
 
 
1aedf5d
3d9d99a
1aedf5d
3d9d99a
aff5d04
 
 
 
 
 
 
 
 
 
0a7f9b4
aff5d04
159faf0
aff5d04
0a7f9b4
aff5d04
 
 
0a7f9b4
aff5d04
 
 
 
0a7f9b4
aff5d04
 
159faf0
aff5d04
0a7f9b4
aff5d04
0a7f9b4
aff5d04
 
 
 
 
 
 
 
 
 
1aedf5d
a3dfc07
aff5d04
 
ffa0f3d
 
 
7793bb6
ffa0f3d
 
7793bb6
ffa0f3d
 
 
 
 
7793bb6
ffa0f3d
159faf0
7793bb6
 
aff5d04
0a7f9b4
aff5d04
 
 
a3dfc07
aff5d04
 
a3dfc07
aff5d04
 
 
 
 
a3dfc07
aff5d04
 
a3dfc07
0a7f9b4
aff5d04
a3dfc07
 
aff5d04
0a7f9b4
aff5d04
 
 
 
a3dfc07
aff5d04
 
a3dfc07
aff5d04
 
 
 
 
a3dfc07
aff5d04
0a7f9b4
a3dfc07
aff5d04
159faf0
a3dfc07
aff5d04
 
 
a3dfc07
0a7f9b4
aff5d04
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from pathlib import Path
from typing import Any, Dict, List, Optional

from ..embedding.embedding_service import EmbeddingService
from ..utils.memory_utils import log_memory_checkpoint, memory_monitor
from ..vector_store.vector_db import VectorDatabase
from .document_chunker import DocumentChunker
from .document_parser import DocumentParser


class IngestionPipeline:
    """Complete ingestion pipeline for processing document corpus with embeddings"""

    def __init__(
        self,
        chunk_size: int = 1000,
        overlap: int = 200,
        seed: int = 42,
        store_embeddings: bool = True,
        vector_db: Optional[VectorDatabase] = None,
        embedding_service: Optional[EmbeddingService] = None,
    ):
        """
        Initialize the ingestion pipeline

        Args:
            chunk_size: Size of text chunks
            overlap: Overlap between chunks
            seed: Random seed for reproducibility
            store_embeddings: Whether to generate and store embeddings
            vector_db: Vector database instance for storing embeddings
            embedding_service: Embedding service for generating embeddings
        """
        self.parser = DocumentParser()
        self.chunker = DocumentChunker(chunk_size=chunk_size, overlap=overlap, seed=seed)
        self.seed = seed
        self.store_embeddings = store_embeddings

        # Initialize embedding components if storing embeddings
        if store_embeddings:
            # Log memory before loading embedding model
            log_memory_checkpoint("before_embedding_service_init")
            self.embedding_service = embedding_service or EmbeddingService()
            log_memory_checkpoint("after_embedding_service_init")

            if vector_db is None:
                from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH

                log_memory_checkpoint("before_vector_db_init")
                self.vector_db = VectorDatabase(persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME)
                log_memory_checkpoint("after_vector_db_init")
            else:
                self.vector_db = vector_db
        else:
            self.embedding_service = None
            self.vector_db = None

    @memory_monitor
    def process_directory(self, directory_path: str) -> List[Dict[str, Any]]:
        """
        Process all supported documents in a directory (backward compatible)

        Args:
            directory_path: Path to directory containing documents

        Returns:
            List of processed chunks with metadata
        """
        directory = Path(directory_path)
        if not directory.exists():
            raise FileNotFoundError(f"Directory not found: {directory_path}")

        all_chunks = []

        # Process each supported file
        log_memory_checkpoint("ingest_directory_start")
        for file_path in directory.iterdir():
            if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS:
                try:
                    log_memory_checkpoint(f"before_process_file:{file_path.name}")
                    chunks = self.process_file(str(file_path))
                    all_chunks.extend(chunks)
                    log_memory_checkpoint(f"after_process_file:{file_path.name}")
                except Exception as e:
                    print(f"Warning: Failed to process {file_path}: {e}")
                    continue
        log_memory_checkpoint("ingest_directory_end")

        return all_chunks

    @memory_monitor
    def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]:
        """
        Process all supported documents in a directory with embeddings and enhanced
        reporting

        Args:
            directory_path: Path to directory containing documents

        Returns:
            Dictionary with processing results and statistics
        """
        import time

        start_time = time.time()

        directory = Path(directory_path)
        if not directory.exists():
            raise FileNotFoundError(f"Directory not found: {directory_path}")

        all_chunks = []
        processed_files = 0
        failed_files = []
        embeddings_stored = 0

        # Process each supported file
        log_memory_checkpoint("ingest_with_embeddings_start")
        for file_path in directory.iterdir():
            if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS:
                try:
                    log_memory_checkpoint(f"before_process_file:{file_path.name}")
                    chunks = self.process_file(str(file_path))
                    all_chunks.extend(chunks)
                    processed_files += 1
                    log_memory_checkpoint(f"after_process_file:{file_path.name}")
                except Exception as e:
                    print(f"Warning: Failed to process {file_path}: {e}")
                    failed_files.append({"file": str(file_path), "error": str(e)})
                    continue
        log_memory_checkpoint("files_processed")

        # Generate and store embeddings if enabled
        if self.store_embeddings and all_chunks and self.embedding_service and self.vector_db:
            try:
                log_memory_checkpoint("before_store_embeddings")
                embeddings_stored = self._store_embeddings_batch(all_chunks)
                log_memory_checkpoint("after_store_embeddings")
            except Exception as e:
                print(f"Warning: Failed to store embeddings: {e}")

        return {
            "status": "success",
            "chunks_processed": len(all_chunks),
            "files_processed": processed_files,
            "failed_files": failed_files,
            "embeddings_stored": embeddings_stored,
            "store_embeddings": self.store_embeddings,
            "processing_time_seconds": time.time() - start_time,
            "chunks": all_chunks,  # Include chunks for backward compatibility
        }

    def process_file(self, file_path: str) -> List[Dict[str, Any]]:
        """
        Process a single file through the complete pipeline

        Args:
            file_path: Path to the file to process

        Returns:
            List of chunks from the file
        """
        # Parse document
        parsed_doc = self.parser.parse_document(file_path)

        # Chunk the document
        chunks = self.chunker.chunk_document(parsed_doc["content"], parsed_doc["metadata"])

        return chunks

    @memory_monitor
    def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int:
        """
        Generate embeddings and store chunks in vector database

        Args:
            chunks: List of text chunks with metadata

        Returns:
            Number of embeddings stored successfully
        """
        if not self.embedding_service or not self.vector_db:
            return 0

        stored_count = 0
        batch_size = 32  # Process in batches for memory efficiency

        log_memory_checkpoint("store_batch_start")
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i : i + batch_size]

            try:
                log_memory_checkpoint(f"before_embed_batch:{i}")
                # Extract texts and prepare data for vector storage
                texts = [chunk["content"] for chunk in batch]
                chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch]
                metadatas = [chunk["metadata"] for chunk in batch]

                # Generate embeddings for the batch
                embeddings = self.embedding_service.embed_texts(texts)

                # Store in vector database
                self.vector_db.add_embeddings(
                    embeddings=embeddings,
                    chunk_ids=chunk_ids,
                    documents=texts,
                    metadatas=metadatas,
                )
                log_memory_checkpoint(f"after_store_batch:{i}")

                stored_count += len(batch)
                print(f"Stored embeddings for batch {i // batch_size + 1}: " f"{len(batch)} chunks")

            except Exception as e:
                print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}")
                continue

        log_memory_checkpoint("store_batch_end")
        return stored_count