Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from ..embedding.embedding_service import EmbeddingService | |
| from ..utils.memory_utils import log_memory_checkpoint, memory_monitor | |
| from ..vector_store.vector_db import VectorDatabase | |
| from .document_chunker import DocumentChunker | |
| from .document_parser import DocumentParser | |
| class IngestionPipeline: | |
| """Complete ingestion pipeline for processing document corpus with embeddings""" | |
| def __init__( | |
| self, | |
| chunk_size: int = 1000, | |
| overlap: int = 200, | |
| seed: int = 42, | |
| store_embeddings: bool = True, | |
| vector_db: Optional[VectorDatabase] = None, | |
| embedding_service: Optional[EmbeddingService] = None, | |
| ): | |
| """ | |
| Initialize the ingestion pipeline | |
| Args: | |
| chunk_size: Size of text chunks | |
| overlap: Overlap between chunks | |
| seed: Random seed for reproducibility | |
| store_embeddings: Whether to generate and store embeddings | |
| vector_db: Vector database instance for storing embeddings | |
| embedding_service: Embedding service for generating embeddings | |
| """ | |
| self.parser = DocumentParser() | |
| self.chunker = DocumentChunker(chunk_size=chunk_size, overlap=overlap, seed=seed) | |
| self.seed = seed | |
| self.store_embeddings = store_embeddings | |
| # Initialize embedding components if storing embeddings | |
| if store_embeddings: | |
| # Log memory before loading embedding model | |
| log_memory_checkpoint("before_embedding_service_init") | |
| self.embedding_service = embedding_service or EmbeddingService() | |
| log_memory_checkpoint("after_embedding_service_init") | |
| if vector_db is None: | |
| from ..config import COLLECTION_NAME, VECTOR_DB_PERSIST_PATH | |
| log_memory_checkpoint("before_vector_db_init") | |
| self.vector_db = VectorDatabase(persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME) | |
| log_memory_checkpoint("after_vector_db_init") | |
| else: | |
| self.vector_db = vector_db | |
| else: | |
| self.embedding_service = None | |
| self.vector_db = None | |
| def process_directory(self, directory_path: str) -> List[Dict[str, Any]]: | |
| """ | |
| Process all supported documents in a directory (backward compatible) | |
| Args: | |
| directory_path: Path to directory containing documents | |
| Returns: | |
| List of processed chunks with metadata | |
| """ | |
| directory = Path(directory_path) | |
| if not directory.exists(): | |
| raise FileNotFoundError(f"Directory not found: {directory_path}") | |
| all_chunks = [] | |
| # Process each supported file | |
| log_memory_checkpoint("ingest_directory_start") | |
| for file_path in directory.iterdir(): | |
| if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS: | |
| try: | |
| log_memory_checkpoint(f"before_process_file:{file_path.name}") | |
| chunks = self.process_file(str(file_path)) | |
| all_chunks.extend(chunks) | |
| log_memory_checkpoint(f"after_process_file:{file_path.name}") | |
| except Exception as e: | |
| print(f"Warning: Failed to process {file_path}: {e}") | |
| continue | |
| log_memory_checkpoint("ingest_directory_end") | |
| return all_chunks | |
| def process_directory_with_embeddings(self, directory_path: str) -> Dict[str, Any]: | |
| """ | |
| Process all supported documents in a directory with embeddings and enhanced | |
| reporting | |
| Args: | |
| directory_path: Path to directory containing documents | |
| Returns: | |
| Dictionary with processing results and statistics | |
| """ | |
| import time | |
| start_time = time.time() | |
| directory = Path(directory_path) | |
| if not directory.exists(): | |
| raise FileNotFoundError(f"Directory not found: {directory_path}") | |
| all_chunks = [] | |
| processed_files = 0 | |
| failed_files = [] | |
| embeddings_stored = 0 | |
| # Process each supported file | |
| log_memory_checkpoint("ingest_with_embeddings_start") | |
| for file_path in directory.iterdir(): | |
| if file_path.is_file() and file_path.suffix.lower() in self.parser.SUPPORTED_FORMATS: | |
| try: | |
| log_memory_checkpoint(f"before_process_file:{file_path.name}") | |
| chunks = self.process_file(str(file_path)) | |
| all_chunks.extend(chunks) | |
| processed_files += 1 | |
| log_memory_checkpoint(f"after_process_file:{file_path.name}") | |
| except Exception as e: | |
| print(f"Warning: Failed to process {file_path}: {e}") | |
| failed_files.append({"file": str(file_path), "error": str(e)}) | |
| continue | |
| log_memory_checkpoint("files_processed") | |
| # Generate and store embeddings if enabled | |
| if self.store_embeddings and all_chunks and self.embedding_service and self.vector_db: | |
| try: | |
| log_memory_checkpoint("before_store_embeddings") | |
| embeddings_stored = self._store_embeddings_batch(all_chunks) | |
| log_memory_checkpoint("after_store_embeddings") | |
| except Exception as e: | |
| print(f"Warning: Failed to store embeddings: {e}") | |
| return { | |
| "status": "success", | |
| "chunks_processed": len(all_chunks), | |
| "files_processed": processed_files, | |
| "failed_files": failed_files, | |
| "embeddings_stored": embeddings_stored, | |
| "store_embeddings": self.store_embeddings, | |
| "processing_time_seconds": time.time() - start_time, | |
| "chunks": all_chunks, # Include chunks for backward compatibility | |
| } | |
| def process_file(self, file_path: str) -> List[Dict[str, Any]]: | |
| """ | |
| Process a single file through the complete pipeline | |
| Args: | |
| file_path: Path to the file to process | |
| Returns: | |
| List of chunks from the file | |
| """ | |
| # Parse document | |
| parsed_doc = self.parser.parse_document(file_path) | |
| # Chunk the document | |
| chunks = self.chunker.chunk_document(parsed_doc["content"], parsed_doc["metadata"]) | |
| return chunks | |
| def _store_embeddings_batch(self, chunks: List[Dict[str, Any]]) -> int: | |
| """ | |
| Generate embeddings and store chunks in vector database | |
| Args: | |
| chunks: List of text chunks with metadata | |
| Returns: | |
| Number of embeddings stored successfully | |
| """ | |
| if not self.embedding_service or not self.vector_db: | |
| return 0 | |
| stored_count = 0 | |
| batch_size = 32 # Process in batches for memory efficiency | |
| log_memory_checkpoint("store_batch_start") | |
| for i in range(0, len(chunks), batch_size): | |
| batch = chunks[i : i + batch_size] | |
| try: | |
| log_memory_checkpoint(f"before_embed_batch:{i}") | |
| # Extract texts and prepare data for vector storage | |
| texts = [chunk["content"] for chunk in batch] | |
| chunk_ids = [chunk["metadata"]["chunk_id"] for chunk in batch] | |
| metadatas = [chunk["metadata"] for chunk in batch] | |
| # Generate embeddings for the batch | |
| embeddings = self.embedding_service.embed_texts(texts) | |
| # Store in vector database | |
| self.vector_db.add_embeddings( | |
| embeddings=embeddings, | |
| chunk_ids=chunk_ids, | |
| documents=texts, | |
| metadatas=metadatas, | |
| ) | |
| log_memory_checkpoint(f"after_store_batch:{i}") | |
| stored_count += len(batch) | |
| print(f"Stored embeddings for batch {i // batch_size + 1}: " f"{len(batch)} chunks") | |
| except Exception as e: | |
| print(f"Warning: Failed to store batch {i // batch_size + 1}: {e}") | |
| continue | |
| log_memory_checkpoint("store_batch_end") | |
| return stored_count | |