Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Migration script to move data from ChromaDB to PostgreSQL with data optimization. | |
| This script reduces data size to fit within Render's 1GB PostgreSQL free tier limit. | |
| """ | |
| import gc | |
| import logging | |
| import os | |
| import re | |
| import sys | |
| from typing import Any, Dict, List, Optional | |
| # Add the src directory to the path | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) | |
| from src.config import ( # noqa: E402 | |
| COLLECTION_NAME, | |
| MAX_DOCUMENT_LENGTH, | |
| MAX_DOCUMENTS_IN_MEMORY, | |
| VECTOR_DB_PERSIST_PATH, | |
| ) | |
| from src.embedding.embedding_service import EmbeddingService # noqa: E402 | |
| from src.vector_db.postgres_vector_service import PostgresVectorService # noqa: E402 | |
| from src.vector_store.vector_db import VectorDatabase # noqa: E402 | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| class DataOptimizer: | |
| """Optimizes document data to reduce storage requirements.""" | |
| def summarize_text(text: str, max_length: int = MAX_DOCUMENT_LENGTH) -> str: | |
| """ | |
| Summarize text to reduce storage while preserving key information. | |
| Args: | |
| text: Original text | |
| max_length: Maximum length for summarized text | |
| Returns: | |
| Summarized text | |
| """ | |
| if len(text) <= max_length: | |
| return text.strip() | |
| # Simple extractive summarization: keep first few sentences | |
| sentences = re.split(r"[.!?]+", text) | |
| summary = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Check if adding this sentence would exceed limit | |
| if len(summary + sentence + ".") > max_length: | |
| break | |
| summary += sentence + ". " | |
| # If summary is too short, take first max_length characters | |
| if len(summary) < max_length // 4: | |
| summary = text[:max_length].strip() | |
| return summary.strip() | |
| def clean_metadata(metadata: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Clean metadata to keep only essential fields. | |
| Args: | |
| metadata: Original metadata | |
| Returns: | |
| Cleaned metadata with only essential fields | |
| """ | |
| essential_fields = { | |
| "source", | |
| "title", | |
| "page", | |
| "chunk_id", | |
| "document_type", | |
| "created_at", | |
| "file_path", | |
| "section", | |
| } | |
| cleaned = {} | |
| for key, value in metadata.items(): | |
| if key in essential_fields and value is not None: | |
| # Convert to simple types and truncate long strings | |
| if isinstance(value, str) and len(value) > 100: | |
| cleaned[key] = value[:100] | |
| elif isinstance(value, (str, int, float, bool)): | |
| cleaned[key] = value | |
| return cleaned | |
| def should_include_document(metadata: Dict[str, Any], content: str) -> bool: | |
| """ | |
| Decide whether to include a document based on quality metrics. | |
| Args: | |
| metadata: Document metadata | |
| content: Document content | |
| Returns: | |
| True if document should be included | |
| """ | |
| # Skip very short documents (likely not useful) | |
| if len(content.strip()) < 50: | |
| return False | |
| # Skip documents with no meaningful content | |
| if not re.search(r"[a-zA-Z]{3,}", content): | |
| return False | |
| # Prioritize certain document types if available | |
| doc_type = metadata.get("document_type", "").lower() | |
| if doc_type in ["policy", "procedure", "guideline"]: | |
| return True | |
| return True | |
| class ChromaToPostgresMigrator: | |
| """Migrates data from ChromaDB to PostgreSQL with optimization.""" | |
| def __init__(self, database_url: Optional[str] = None): | |
| """ | |
| Initialize the migrator. | |
| Args: | |
| database_url: PostgreSQL connection string | |
| """ | |
| self.database_url = database_url or os.getenv("DATABASE_URL") | |
| if not self.database_url: | |
| raise ValueError("DATABASE_URL environment variable is required") | |
| self.optimizer = DataOptimizer() | |
| self.embedding_service = None | |
| self.total_migrated = 0 | |
| self.total_skipped = 0 | |
| def initialize_services(self): | |
| """Initialize embedding service and database connections.""" | |
| logger.info("Initializing services...") | |
| # Initialize embedding service | |
| self.embedding_service = EmbeddingService() | |
| # Initialize ChromaDB (source) | |
| self.chroma_db = VectorDatabase(persist_path=VECTOR_DB_PERSIST_PATH, collection_name=COLLECTION_NAME) | |
| # Initialize PostgreSQL (destination) | |
| self.postgres_service = PostgresVectorService(connection_string=self.database_url, table_name=COLLECTION_NAME) | |
| logger.info("Services initialized successfully") | |
| def get_chroma_documents(self, batch_size: int = MAX_DOCUMENTS_IN_MEMORY) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all documents from ChromaDB in batches. | |
| Args: | |
| batch_size: Number of documents to retrieve per batch | |
| Yields: | |
| Batches of documents | |
| """ | |
| try: | |
| total_count = self.chroma_db.get_count() | |
| logger.info(f"Found {total_count} documents in ChromaDB") | |
| if total_count == 0: | |
| return | |
| # Get all documents (ChromaDB doesn't have native pagination) | |
| collection = self.chroma_db.get_collection() | |
| all_data = collection.get(include=["documents", "metadatas", "embeddings"]) | |
| if not all_data or not all_data.get("documents"): | |
| logger.warning("No documents found in ChromaDB collection") | |
| return | |
| # Process in batches | |
| documents = all_data["documents"] | |
| metadatas = all_data.get("metadatas", [{}] * len(documents)) | |
| embeddings = all_data.get("embeddings", []) | |
| ids = all_data.get("ids", []) | |
| for i in range(0, len(documents), batch_size): | |
| batch_end = min(i + batch_size, len(documents)) | |
| batch_docs = documents[i:batch_end] | |
| batch_metadata = metadatas[i:batch_end] if metadatas else [{}] * len(batch_docs) | |
| batch_embeddings = embeddings[i:batch_end] if embeddings else [] | |
| batch_ids = ids[i:batch_end] if ids else [] | |
| yield { | |
| "documents": batch_docs, | |
| "metadatas": batch_metadata, | |
| "embeddings": batch_embeddings, | |
| "ids": batch_ids, | |
| } | |
| except Exception as e: | |
| logger.error(f"Error retrieving ChromaDB documents: {e}") | |
| raise | |
| def process_batch(self, batch: Dict[str, Any]) -> Dict[str, int]: | |
| """ | |
| Process a batch of documents with optimization. | |
| Args: | |
| batch: Batch of documents from ChromaDB | |
| Returns: | |
| Dictionary with processing statistics | |
| """ | |
| documents = batch["documents"] | |
| metadatas = batch["metadatas"] | |
| embeddings = batch["embeddings"] | |
| processed_docs = [] | |
| processed_metadata = [] | |
| processed_embeddings = [] | |
| stats = {"processed": 0, "skipped": 0, "reembedded": 0} | |
| for i, (doc, metadata) in enumerate(zip(documents, metadatas)): | |
| # Clean and optimize document | |
| cleaned_metadata = self.optimizer.clean_metadata(metadata or {}) | |
| # Check if we should include this document | |
| if not self.optimizer.should_include_document(cleaned_metadata, doc): | |
| stats["skipped"] += 1 | |
| continue | |
| # Summarize document content | |
| summarized_doc = self.optimizer.summarize_text(doc) | |
| # Use existing embedding if available and document wasn't changed much | |
| if embeddings and i < len(embeddings) and len(doc) == len(summarized_doc): | |
| # Document unchanged, use existing embedding | |
| embedding = embeddings[i] | |
| else: | |
| # Document changed, need new embedding | |
| try: | |
| embedding = self.embedding_service.generate_embeddings([summarized_doc])[0] | |
| stats["reembedded"] += 1 | |
| except Exception as e: | |
| logger.warning(f"Failed to generate embedding for document {i}: {e}") | |
| stats["skipped"] += 1 | |
| continue | |
| processed_docs.append(summarized_doc) | |
| processed_metadata.append(cleaned_metadata) | |
| processed_embeddings.append(embedding) | |
| stats["processed"] += 1 | |
| # Add processed documents to PostgreSQL | |
| if processed_docs: | |
| try: | |
| doc_ids = self.postgres_service.add_documents( | |
| texts=processed_docs, | |
| embeddings=processed_embeddings, | |
| metadatas=processed_metadata, | |
| ) | |
| logger.info(f"Added {len(doc_ids)} documents to PostgreSQL") | |
| except Exception as e: | |
| logger.error(f"Failed to add documents to PostgreSQL: {e}") | |
| raise | |
| # Force garbage collection | |
| gc.collect() | |
| return stats | |
| def migrate(self) -> Dict[str, int]: | |
| """ | |
| Perform the complete migration. | |
| Returns: | |
| Migration statistics | |
| """ | |
| logger.info("Starting ChromaDB to PostgreSQL migration...") | |
| self.initialize_services() | |
| # Clear existing PostgreSQL data | |
| logger.info("Clearing existing PostgreSQL data...") | |
| deleted_count = self.postgres_service.delete_all_documents() | |
| logger.info(f"Deleted {deleted_count} existing documents from PostgreSQL") | |
| total_stats = {"processed": 0, "skipped": 0, "reembedded": 0} | |
| batch_count = 0 | |
| try: | |
| # Process documents in batches | |
| for batch in self.get_chroma_documents(): | |
| batch_count += 1 | |
| logger.info(f"Processing batch {batch_count}...") | |
| batch_stats = self.process_batch(batch) | |
| # Update totals | |
| for key in total_stats: | |
| total_stats[key] += batch_stats[key] | |
| logger.info(f"Batch {batch_count} complete: {batch_stats}") | |
| # Memory cleanup between batches | |
| gc.collect() | |
| # Final statistics | |
| logger.info("Migration completed successfully!") | |
| logger.info(f"Final statistics: {total_stats}") | |
| # Verify migration | |
| postgres_info = self.postgres_service.get_collection_info() | |
| logger.info(f"PostgreSQL collection info: {postgres_info}") | |
| return total_stats | |
| except Exception as e: | |
| logger.error(f"Migration failed: {e}") | |
| raise | |
| def test_migration(self, test_query: str = "policy") -> Dict[str, Any]: | |
| """ | |
| Test the migrated data by performing a search. | |
| Args: | |
| test_query: Query to test with | |
| Returns: | |
| Test results | |
| """ | |
| logger.info(f"Testing migration with query: '{test_query}'") | |
| try: | |
| # Generate query embedding | |
| query_embedding = self.embedding_service.generate_embeddings([test_query])[0] | |
| # Search PostgreSQL | |
| results = self.postgres_service.similarity_search(query_embedding, k=5) | |
| logger.info("Test search returned %d results", len(results)) | |
| for i, result in enumerate(results): | |
| logger.info( | |
| "Result %d: %s... (score: %.3f)" | |
| % ( | |
| i + 1, | |
| result.get("content", "")[:100], | |
| result.get("similarity_score", 0), | |
| ) | |
| ) | |
| return { | |
| "query": test_query, | |
| "results_count": len(results), | |
| "results": results, | |
| } | |
| except Exception as e: | |
| logger.error(f"Migration test failed: {e}") | |
| return {"error": str(e)} | |
| def main(): | |
| """Main migration function.""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Migrate ChromaDB to PostgreSQL") | |
| parser.add_argument("--database-url", help="PostgreSQL connection URL") | |
| parser.add_argument("--test-only", action="store_true", help="Only run migration test") | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be migrated without actually migrating", | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| migrator = ChromaToPostgresMigrator(database_url=args.database_url) | |
| if args.test_only: | |
| # Only test existing migration | |
| migrator.initialize_services() | |
| results = migrator.test_migration() | |
| print(f"Test results: {results}") | |
| elif args.dry_run: | |
| # Show what would be migrated | |
| migrator.initialize_services() | |
| total_docs = migrator.chroma_db.get_count() | |
| logger.info(f"Would migrate {total_docs} documents from ChromaDB to PostgreSQL") | |
| else: | |
| # Perform actual migration | |
| stats = migrator.migrate() | |
| logger.info(f"Migration complete: {stats}") | |
| # Test the migration | |
| test_results = migrator.test_migration() | |
| logger.info(f"Migration test: {test_results}") | |
| except Exception as e: | |
| logger.error(f"Migration script failed: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |