from pathlib import Path from typing import Any, Dict, List, Optional from ..embedding.embedding_service import EmbeddingService from ..utils.memory_utils import log_memory_checkpoint, memory_monitor from ..vector_store.vector_db import VectorDatabase from .document_chunker import DocumentChunker from .document_parser import DocumentParser class IngestionPipeline: """Complete ingestion pipeline for processing document corpus with embeddings""" def __init__( self, chunk_size: int = 1000, overlap: int = 200, seed: int = 42, store_embeddings: bool = True, vector_db: Optional[VectorDatabase] = None, embedding_service: Optional[EmbeddingService] = None, ): """ Initialize the ingestion pipeline Args: chunk_size: Size of text chunks overlap: Overlap between chunks seed: Random seed for reproducibility store_embeddings: Whether to generate and store embeddings vector_db: Vector database instance for storing embeddings embedding_service: Embedding service for generating embeddings """ self.parser = DocumentParser() self.chunker = DocumentChunker(chunk_size=chunk_size, overlap=overlap, seed=seed) self.seed = seed self.store_embeddings = store_embeddings # Initialize embedding components if storing embeddings if store_embeddings: # Log memory before loading embedding model log_memory_checkpoint("before_embedding_service_init") self.embedding_service = embedding_service or EmbeddingService() log_memory_checkpoint("after_embedding_service_init") if vector_db is None: from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH log_memory_checkpoint("before_vector_db_init") self.vector_db = VectorDatabase(persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME) log_memory_checkpoint("after_vector_db_init") else: self.vector_db = vector_db else: self.embedding_service = None self.vector_db = None @memory_monitor def process_directory(self, directory_path: str) -> List[Dict[str, Any]]: """ Process all supported documents in a directory (backward compatible) Args: directory_path: Path to directory containing documents Returns: List of processed chunks with metadata """ directory = Path(directory_path) if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory_path}") all_chunks = [] # Process each supported file log_memory_checkpoint("ingest_directory_start") for file_path in directory.iterdir(): if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS: try: log_memory_checkpoint(f"before_process_file:{file_path.name}") chunks = self.process_file(str(file_path)) all_chunks.extend(chunks) log_memory_checkpoint(f"after_process_file:{file_path.name}") except Exception as e: print(f"Warning: Failed to process {file_path}: {e}") continue log_memory_checkpoint("ingest_directory_end") return all_chunks @memory_monitor def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]: """ Process all supported documents in a directory with embeddings and enhanced reporting Args: directory_path: Path to directory containing documents Returns: Dictionary with processing results and statistics """ import time start_time = time.time() directory = Path(directory_path) if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory_path}") all_chunks = [] processed_files = 0 failed_files = [] embeddings_stored = 0 # Process each supported file log_memory_checkpoint("ingest_with_embeddings_start") for file_path in directory.iterdir(): if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS: try: log_memory_checkpoint(f"before_process_file:{file_path.name}") chunks = self.process_file(str(file_path)) all_chunks.extend(chunks) processed_files += 1 log_memory_checkpoint(f"after_process_file:{file_path.name}") except Exception as e: print(f"Warning: Failed to process {file_path}: {e}") failed_files.append({"file": str(file_path), "error": str(e)}) continue log_memory_checkpoint("files_processed") # Generate and store embeddings if enabled if self.store_embeddings and all_chunks and self.embedding_service and self.vector_db: try: log_memory_checkpoint("before_store_embeddings") embeddings_stored = self._store_embeddings_batch(all_chunks) log_memory_checkpoint("after_store_embeddings") except Exception as e: print(f"Warning: Failed to store embeddings: {e}") return { "status": "success", "chunks_processed": len(all_chunks), "files_processed": processed_files, "failed_files": failed_files, "embeddings_stored": embeddings_stored, "store_embeddings": self.store_embeddings, "processing_time_seconds": time.time() - start_time, "chunks": all_chunks, # Include chunks for backward compatibility } def process_file(self, file_path: str) -> List[Dict[str, Any]]: """ Process a single file through the complete pipeline Args: file_path: Path to the file to process Returns: List of chunks from the file """ # Parse document parsed_doc = self.parser.parse_document(file_path) # Chunk the document chunks = self.chunker.chunk_document(parsed_doc["content"], parsed_doc["metadata"]) return chunks @memory_monitor def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int: """ Generate embeddings and store chunks in vector database Args: chunks: List of text chunks with metadata Returns: Number of embeddings stored successfully """ if not self.embedding_service or not self.vector_db: return 0 stored_count = 0 batch_size = 32 # Process in batches for memory efficiency log_memory_checkpoint("store_batch_start") for i in range(0, len(chunks), batch_size): batch = chunks[i : i + batch_size] try: log_memory_checkpoint(f"before_embed_batch:{i}") # Extract texts and prepare data for vector storage texts = [chunk["content"] for chunk in batch] chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch] metadatas = [chunk["metadata"] for chunk in batch] # Generate embeddings for the batch embeddings = self.embedding_service.embed_texts(texts) # Store in vector database self.vector_db.add_embeddings( embeddings=embeddings, chunk_ids=chunk_ids, documents=texts, metadatas=metadatas, ) log_memory_checkpoint(f"after_store_batch:{i}") stored_count += len(batch) print(f"Stored embeddings for batch {i // batch_size + 1}: " f"{len(batch)} chunks") except Exception as e: print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}") continue log_memory_checkpoint("store_batch_end") return stored_count