msse-ai-engineering / src /ingestion /ingestion_pipeline.py
sethmcknight
Refactor test cases for improved readability and consistency
159faf0
from pathlib import Path
from typing import Any, Dict, List, Optional
from ..embedding.embedding_service import EmbeddingService
from ..utils.memory_utils import log_memory_checkpoint, memory_monitor
from ..vector_store.vector_db import VectorDatabase
from .document_chunker import DocumentChunker
from .document_parser import DocumentParser
class IngestionPipeline:
"""Complete ingestion pipeline for processing document corpus with embeddings"""
def __init__(
self,
chunk_size: int = 1000,
overlap: int = 200,
seed: int = 42,
store_embeddings: bool = True,
vector_db: Optional[VectorDatabase] = None,
embedding_service: Optional[EmbeddingService] = None,
):
"""
Initialize the ingestion pipeline
Args:
chunk_size: Size of text chunks
overlap: Overlap between chunks
seed: Random seed for reproducibility
store_embeddings: Whether to generate and store embeddings
vector_db: Vector database instance for storing embeddings
embedding_service: Embedding service for generating embeddings
"""
self.parser = DocumentParser()
self.chunker = DocumentChunker(chunk_size=chunk_size, overlap=overlap, seed=seed)
self.seed = seed
self.store_embeddings = store_embeddings
# Initialize embedding components if storing embeddings
if store_embeddings:
# Log memory before loading embedding model
log_memory_checkpoint("before_embedding_service_init")
self.embedding_service = embedding_service or EmbeddingService()
log_memory_checkpoint("after_embedding_service_init")
if vector_db is None:
from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH
log_memory_checkpoint("before_vector_db_init")
self.vector_db = VectorDatabase(persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME)
log_memory_checkpoint("after_vector_db_init")
else:
self.vector_db = vector_db
else:
self.embedding_service = None
self.vector_db = None
@memory_monitor
def process_directory(self, directory_path: str) -> List[Dict[str, Any]]:
"""
Process all supported documents in a directory (backward compatible)
Args:
directory_path: Path to directory containing documents
Returns:
List of processed chunks with metadata
"""
directory = Path(directory_path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory_path}")
all_chunks = []
# Process each supported file
log_memory_checkpoint("ingest_directory_start")
for file_path in directory.iterdir():
if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS:
try:
log_memory_checkpoint(f"before_process_file:{file_path.name}")
chunks = self.process_file(str(file_path))
all_chunks.extend(chunks)
log_memory_checkpoint(f"after_process_file:{file_path.name}")
except Exception as e:
print(f"Warning: Failed to process {file_path}: {e}")
continue
log_memory_checkpoint("ingest_directory_end")
return all_chunks
@memory_monitor
def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]:
"""
Process all supported documents in a directory with embeddings and enhanced
reporting
Args:
directory_path: Path to directory containing documents
Returns:
Dictionary with processing results and statistics
"""
import time
start_time = time.time()
directory = Path(directory_path)
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory_path}")
all_chunks = []
processed_files = 0
failed_files = []
embeddings_stored = 0
# Process each supported file
log_memory_checkpoint("ingest_with_embeddings_start")
for file_path in directory.iterdir():
if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS:
try:
log_memory_checkpoint(f"before_process_file:{file_path.name}")
chunks = self.process_file(str(file_path))
all_chunks.extend(chunks)
processed_files += 1
log_memory_checkpoint(f"after_process_file:{file_path.name}")
except Exception as e:
print(f"Warning: Failed to process {file_path}: {e}")
failed_files.append({"file": str(file_path), "error": str(e)})
continue
log_memory_checkpoint("files_processed")
# Generate and store embeddings if enabled
if self.store_embeddings and all_chunks and self.embedding_service and self.vector_db:
try:
log_memory_checkpoint("before_store_embeddings")
embeddings_stored = self._store_embeddings_batch(all_chunks)
log_memory_checkpoint("after_store_embeddings")
except Exception as e:
print(f"Warning: Failed to store embeddings: {e}")
return {
"status": "success",
"chunks_processed": len(all_chunks),
"files_processed": processed_files,
"failed_files": failed_files,
"embeddings_stored": embeddings_stored,
"store_embeddings": self.store_embeddings,
"processing_time_seconds": time.time() - start_time,
"chunks": all_chunks, # Include chunks for backward compatibility
}
def process_file(self, file_path: str) -> List[Dict[str, Any]]:
"""
Process a single file through the complete pipeline
Args:
file_path: Path to the file to process
Returns:
List of chunks from the file
"""
# Parse document
parsed_doc = self.parser.parse_document(file_path)
# Chunk the document
chunks = self.chunker.chunk_document(parsed_doc["content"], parsed_doc["metadata"])
return chunks
@memory_monitor
def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int:
"""
Generate embeddings and store chunks in vector database
Args:
chunks: List of text chunks with metadata
Returns:
Number of embeddings stored successfully
"""
if not self.embedding_service or not self.vector_db:
return 0
stored_count = 0
batch_size = 32 # Process in batches for memory efficiency
log_memory_checkpoint("store_batch_start")
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
try:
log_memory_checkpoint(f"before_embed_batch:{i}")
# Extract texts and prepare data for vector storage
texts = [chunk["content"] for chunk in batch]
chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch]
metadatas = [chunk["metadata"] for chunk in batch]
# Generate embeddings for the batch
embeddings = self.embedding_service.embed_texts(texts)
# Store in vector database
self.vector_db.add_embeddings(
embeddings=embeddings,
chunk_ids=chunk_ids,
documents=texts,
metadatas=metadatas,
)
log_memory_checkpoint(f"after_store_batch:{i}")
stored_count += len(batch)
print(f"Stored embeddings for batch {i // batch_size + 1}: " f"{len(batch)} chunks")
except Exception as e:
print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}")
continue
log_memory_checkpoint("store_batch_end")
return stored_count