Spaces:
Sleeping
Sleeping
File size: 8,363 Bytes
ffa0f3d a3dfc07 7793bb6 aff5d04 0a7f9b4 aff5d04 ffa0f3d 7793bb6 ffa0f3d aff5d04 a3dfc07 aff5d04 a3dfc07 aff5d04 ffa0f3d 7793bb6 ffa0f3d aff5d04 ffa0f3d 159faf0 ffa0f3d aff5d04 a3dfc07 aff5d04 0a7f9b4 aff5d04 0a7f9b4 aff5d04 a3dfc07 0a7f9b4 159faf0 0a7f9b4 aff5d04 7793bb6 0a7f9b4 ffa0f3d aff5d04 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 0a7f9b4 ffa0f3d 159faf0 ffa0f3d 0a7f9b4 ffa0f3d 0a7f9b4 ffa0f3d 0a7f9b4 7793bb6 ffa0f3d 7793bb6 0a7f9b4 aff5d04 a725155 aff5d04 1aedf5d 3d9d99a 1aedf5d 3d9d99a aff5d04 0a7f9b4 aff5d04 159faf0 aff5d04 0a7f9b4 aff5d04 0a7f9b4 aff5d04 0a7f9b4 aff5d04 159faf0 aff5d04 0a7f9b4 aff5d04 0a7f9b4 aff5d04 1aedf5d a3dfc07 aff5d04 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 7793bb6 ffa0f3d 159faf0 7793bb6 aff5d04 0a7f9b4 aff5d04 a3dfc07 aff5d04 a3dfc07 aff5d04 a3dfc07 aff5d04 a3dfc07 0a7f9b4 aff5d04 a3dfc07 aff5d04 0a7f9b4 aff5d04 a3dfc07 aff5d04 a3dfc07 aff5d04 a3dfc07 aff5d04 0a7f9b4 a3dfc07 aff5d04 159faf0 a3dfc07 aff5d04 a3dfc07 0a7f9b4 aff5d04 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
from pathlib import Path
from typing import Any, Dict, List, Optional
from ..embedding.embedding_service import EmbeddingService
from ..utils.memory_utils import log_memory_checkpoint, memory_monitor
from ..vector_store.vector_db import VectorDatabase
from .document_chunker import DocumentChunker
from .document_parser import DocumentParser
class IngestionPipeline:
"""Complete ingestion pipeline for processing document corpus with embeddings"""
def __init__(
self,
chunk_size: int = 1000,
overlap: int = 200,
seed: int = 42,
store_embeddings: bool = True,
vector_db: Optional[VectorDatabase] = None,
embedding_service: Optional[EmbeddingService] = None,
):
"""
Initialize the ingestion pipeline
Args:
chunk_size: Size of text chunks
overlap: Overlap between chunks
seed: Random seed for reproducibility
store_embeddings: Whether to generate and store embeddings
vector_db: Vector database instance for storing embeddings
embedding_service: Embedding service for generating embeddings
"""
self.parser = DocumentParser()
self.chunker = DocumentChunker(chunk_size=chunk_size, overlap=overlap, seed=seed)
self.seed = seed
self.store_embeddings = store_embeddings
# Initialize embedding components if storing embeddings
if store_embeddings:
# Log memory before loading embedding model
log_memory_checkpoint("before_embedding_service_init")
self.embedding_service = embedding_service or EmbeddingService()
log_memory_checkpoint("after_embedding_service_init")
if vector_db is None:
from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
log_memory_checkpoint("before_vector_db_init")
self.vector_db = VectorDatabase(persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME)
log_memory_checkpoint("after_vector_db_init")
else:
self.vector_db = vector_db
else:
self.embedding_service = None
self.vector_db = None
@memory_monitor
def process_directory(self, directory_path: str) -> List[Dict[str, Any]]:
"""
Process all supported documents in a directory (backward compatible)
Args:
directory_path: Path to directory containing documents
Returns:
List of processed chunks with metadata
"""
directory = Path(directory_path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory_path}")
all_chunks = []
# Process each supported file
log_memory_checkpoint("ingest_directory_start")
for file_path in directory.iterdir():
if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS:
try:
log_memory_checkpoint(f"before_process_file:{file_path.name}")
chunks = self.process_file(str(file_path))
all_chunks.extend(chunks)
log_memory_checkpoint(f"after_process_file:{file_path.name}")
except Exception as e:
print(f"Warning: Failed to process {file_path}: {e}")
continue
log_memory_checkpoint("ingest_directory_end")
return all_chunks
@memory_monitor
def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]:
"""
Process all supported documents in a directory with embeddings and enhanced
reporting
Args:
directory_path: Path to directory containing documents
Returns:
Dictionary with processing results and statistics
"""
import time
start_time = time.time()
directory = Path(directory_path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory_path}")
all_chunks = []
processed_files = 0
failed_files = []
embeddings_stored = 0
# Process each supported file
log_memory_checkpoint("ingest_with_embeddings_start")
for file_path in directory.iterdir():
if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS:
try:
log_memory_checkpoint(f"before_process_file:{file_path.name}")
chunks = self.process_file(str(file_path))
all_chunks.extend(chunks)
processed_files += 1
log_memory_checkpoint(f"after_process_file:{file_path.name}")
except Exception as e:
print(f"Warning: Failed to process {file_path}: {e}")
failed_files.append({"file": str(file_path), "error": str(e)})
continue
log_memory_checkpoint("files_processed")
# Generate and store embeddings if enabled
if self.store_embeddings and all_chunks and self.embedding_service and self.vector_db:
try:
log_memory_checkpoint("before_store_embeddings")
embeddings_stored = self._store_embeddings_batch(all_chunks)
log_memory_checkpoint("after_store_embeddings")
except Exception as e:
print(f"Warning: Failed to store embeddings: {e}")
return {
"status": "success",
"chunks_processed": len(all_chunks),
"files_processed": processed_files,
"failed_files": failed_files,
"embeddings_stored": embeddings_stored,
"store_embeddings": self.store_embeddings,
"processing_time_seconds": time.time() - start_time,
"chunks": all_chunks, # Include chunks for backward compatibility
}
def process_file(self, file_path: str) -> List[Dict[str, Any]]:
"""
Process a single file through the complete pipeline
Args:
file_path: Path to the file to process
Returns:
List of chunks from the file
"""
# Parse document
parsed_doc = self.parser.parse_document(file_path)
# Chunk the document
chunks = self.chunker.chunk_document(parsed_doc["content"], parsed_doc["metadata"])
return chunks
@memory_monitor
def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int:
"""
Generate embeddings and store chunks in vector database
Args:
chunks: List of text chunks with metadata
Returns:
Number of embeddings stored successfully
"""
if not self.embedding_service or not self.vector_db:
return 0
stored_count = 0
batch_size = 32 # Process in batches for memory efficiency
log_memory_checkpoint("store_batch_start")
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
try:
log_memory_checkpoint(f"before_embed_batch:{i}")
# Extract texts and prepare data for vector storage
texts = [chunk["content"] for chunk in batch]
chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch]
metadatas = [chunk["metadata"] for chunk in batch]
# Generate embeddings for the batch
embeddings = self.embedding_service.embed_texts(texts)
# Store in vector database
self.vector_db.add_embeddings(
embeddings=embeddings,
chunk_ids=chunk_ids,
documents=texts,
metadatas=metadatas,
)
log_memory_checkpoint(f"after_store_batch:{i}")
stored_count += len(batch)
print(f"Stored embeddings for batch {i // batch_size + 1}: " f"{len(batch)} chunks")
except Exception as e:
print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}")
continue
log_memory_checkpoint("store_batch_end")
return stored_count
|