Spaces:
Sleeping
Sleeping
| """ | |
| Comprehensive end-to-end tests for Phase 2B implementation. | |
| This module tests the complete pipeline from document ingestion through | |
| embedding generation to semantic search, validating both functionality | |
| and quality of results. | |
| """ | |
| import os | |
| import shutil | |
| import tempfile | |
| import time | |
| from typing import List | |
| import pytest | |
| import src.config as config | |
| from src.embedding.embedding_service import EmbeddingService | |
| from src.ingestion.ingestion_pipeline import IngestionPipeline | |
| from src.search.search_service import SearchService | |
| from src.vector_store.vector_db import VectorDatabase | |
| class TestPhase2BEndToEnd: | |
| """Comprehensive end-to-end tests for Phase 2B semantic search pipeline.""" | |
| # Test queries for search quality validation | |
| TEST_QUERIES = [ | |
| "remote work from home policy", | |
| "employee benefits and health insurance", | |
| "vacation time and PTO", | |
| "code of conduct and ethics", | |
| "information security requirements", | |
| "performance review process", | |
| "expense reimbursement", | |
| "parental leave", | |
| "workplace safety", | |
| "professional development", | |
| ] | |
| def setup_method(self): | |
| """Set up test environment with temporary database and services.""" | |
| self.test_dir = tempfile.mkdtemp() | |
| # Initialize all services | |
| self.embedding_service = EmbeddingService() | |
| self.vector_db = VectorDatabase(persist_path=self.test_dir, collection_name="test_phase2b_e2e") | |
| self.search_service = SearchService(self.vector_db, self.embedding_service) | |
| self.ingestion_pipeline = IngestionPipeline( | |
| chunk_size=config.DEFAULT_CHUNK_SIZE, | |
| overlap=config.DEFAULT_OVERLAP, | |
| seed=config.RANDOM_SEED, | |
| embedding_service=self.embedding_service, | |
| vector_db=self.vector_db, | |
| ) | |
| # Performance tracking | |
| self.performance_metrics = {} | |
| def teardown_method(self): | |
| """Clean up temporary resources.""" | |
| if hasattr(self, "test_dir"): | |
| shutil.rmtree(self.test_dir, ignore_errors=True) | |
| def test_full_pipeline_ingestion_to_search(self): | |
| """Test complete pipeline: ingest documents → generate embeddings → search.""" | |
| start_time = time.time() | |
| # Step 1: Ingest synthetic policies with embeddings | |
| synthetic_dir = "synthetic_policies" | |
| assert os.path.exists(synthetic_dir), "Synthetic policies directory required" | |
| ingestion_start = time.time() | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| ingestion_time = time.time() - ingestion_start | |
| # Validate ingestion results | |
| assert result["status"] == "success" | |
| assert result["chunks_processed"] > 0 | |
| assert "embeddings_stored" in result | |
| assert result["embeddings_stored"] > 0 | |
| assert result["chunks_processed"] == result["embeddings_stored"] | |
| # Store metrics | |
| self.performance_metrics["ingestion_time"] = ingestion_time | |
| self.performance_metrics["chunks_processed"] = result["chunks_processed"] | |
| # Step 2: Test search functionality | |
| search_start = time.time() | |
| search_results = self.search_service.search("remote work policy", top_k=5, threshold=0.2) | |
| search_time = time.time() - search_start | |
| # Validate search results | |
| assert len(search_results) > 0, "Search should return results" | |
| assert all(r["similarity_score"] >= 0.2 for r in search_results) | |
| assert all("chunk_id" in r for r in search_results) | |
| assert all("content" in r for r in search_results) | |
| assert all("metadata" in r for r in search_results) | |
| # Store metrics | |
| self.performance_metrics["search_time"] = search_time | |
| self.performance_metrics["total_pipeline_time"] = time.time() - start_time | |
| # Validate performance thresholds | |
| assert ingestion_time < 120, f"Ingestion took {ingestion_time:.2f}s, should be < 120s" | |
| assert search_time < 5, f"Search took {search_time:.2f}s, should be < 5s" | |
| def test_search_quality_validation(self): | |
| """Test search quality across different policy areas.""" | |
| # First ingest the policies | |
| synthetic_dir = "synthetic_policies" | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| assert result["status"] == "success" | |
| quality_results = {} | |
| for query in self.TEST_QUERIES: | |
| search_results = self.search_service.search(query, top_k=3, threshold=0.0) | |
| # Basic quality checks | |
| assert len(search_results) > 0, f"No results for query: {query}" | |
| # Relevance validation - relaxed threshold for testing | |
| top_result = search_results[0] | |
| print(f"Query: '{query}' - Top similarity: {top_result['similarity_score']}") | |
| assert top_result["similarity_score"] >= 0.0, ( | |
| f"Top result for '{query}' has invalid similarity: " f"{top_result['similarity_score']}" | |
| ) | |
| # Content relevance heuristics | |
| query_keywords = query.lower().split() | |
| content_lower = top_result["content"].lower() | |
| # At least one query keyword should appear in top result | |
| keyword_found = any(keyword in content_lower for keyword in query_keywords) | |
| if not keyword_found: | |
| # For semantic search, check if related terms appear | |
| related_terms = self._get_related_terms(query) | |
| semantic_match = any(term in content_lower for term in related_terms) | |
| assert semantic_match, ( | |
| f"No relevant keywords found in top result for '{query}'. " | |
| f"Content: {top_result['content'][:100]}..." | |
| ) | |
| quality_results[query] = { | |
| "results_count": len(search_results), | |
| "top_similarity": top_result["similarity_score"], | |
| "avg_similarity": sum(r["similarity_score"] for r in search_results) / len(search_results), | |
| } | |
| # Store quality metrics | |
| self.performance_metrics["search_quality"] = quality_results | |
| # Overall quality validation | |
| avg_top_similarity = sum(metrics["top_similarity"] for metrics in quality_results.values()) / len( | |
| quality_results | |
| ) | |
| assert avg_top_similarity >= 0.2, f"Average top similarity {avg_top_similarity:.3f} below threshold 0.2" | |
| def test_data_persistence_across_sessions(self): | |
| """Test that vector data persists correctly across database sessions.""" | |
| # Ingest some data | |
| synthetic_dir = "synthetic_policies" | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| assert result["status"] == "success" | |
| # Perform initial search | |
| initial_results = self.search_service.search("remote work", top_k=3) | |
| assert len(initial_results) > 0 | |
| # Simulate session restart by creating new services | |
| new_vector_db = VectorDatabase(persist_path=self.test_dir, collection_name="test_phase2b_e2e") | |
| new_search_service = SearchService(new_vector_db, self.embedding_service) | |
| # Verify data persistence | |
| persistent_results = new_search_service.search("remote work", top_k=3) | |
| assert len(persistent_results) == len(initial_results) | |
| assert persistent_results[0]["chunk_id"] == initial_results[0]["chunk_id"] | |
| assert persistent_results[0]["similarity_score"] == initial_results[0]["similarity_score"] | |
| def test_error_handling_and_recovery(self): | |
| """Test error handling scenarios and recovery mechanisms.""" | |
| # Test 1: Search before ingestion | |
| empty_results = self.search_service.search("any query", top_k=5) | |
| assert len(empty_results) == 0, "Should return empty results for empty database" | |
| # Test 2: Invalid search parameters | |
| with pytest.raises((ValueError, TypeError)): | |
| self.search_service.search("", top_k=-1) | |
| with pytest.raises((ValueError, TypeError)): | |
| self.search_service.search("valid query", top_k=0) | |
| # Test 3: Very long query | |
| long_query = "very long query " * 100 # 1500+ characters | |
| long_results = self.search_service.search(long_query, top_k=3) | |
| # Should not crash, may return 0 or valid results | |
| assert isinstance(long_results, list) | |
| # Test 4: Special characters in query | |
| special_query = "query with @#$%^&*(){}[] special characters" | |
| special_results = self.search_service.search(special_query, top_k=3) | |
| # Should not crash | |
| assert isinstance(special_results, list) | |
| def test_batch_processing_efficiency(self): | |
| """Test that batch processing works efficiently for large document sets.""" | |
| # Ingest with timing | |
| synthetic_dir = "synthetic_policies" | |
| start_time = time.time() | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| processing_time = time.time() - start_time | |
| # Validate batch processing results | |
| assert result["status"] == "success" | |
| chunks_processed = result["chunks_processed"] | |
| # Calculate processing rate | |
| processing_rate = chunks_processed / processing_time if processing_time > 0 else 0 | |
| self.performance_metrics["processing_rate"] = processing_rate | |
| # Validate reasonable processing rate (at least 1 chunk/second) | |
| assert processing_rate >= 1, f"Processing rate {processing_rate:.2f} chunks/sec too slow" | |
| # Validate memory efficiency (no excessive memory usage) | |
| # This is implicit - if the test completes without memory errors, it passes | |
| def test_search_parameter_variations(self): | |
| """Test search functionality with different parameter combinations.""" | |
| # Ingest data first | |
| synthetic_dir = "synthetic_policies" | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| assert result["status"] == "success" | |
| test_query = "employee benefits" | |
| # Test different top_k values | |
| for top_k in [1, 3, 5, 10]: | |
| results = self.search_service.search(test_query, top_k=top_k) | |
| assert len(results) <= top_k, f"Returned more than top_k={top_k} results" | |
| # Test different threshold values | |
| for threshold in [0.0, 0.2, 0.5, 0.8]: | |
| results = self.search_service.search(test_query, top_k=10, threshold=threshold) | |
| assert all(r["similarity_score"] >= threshold for r in results), f"Results below threshold {threshold}" | |
| # Test edge cases | |
| high_threshold_results = self.search_service.search(test_query, top_k=5, threshold=0.9) | |
| # May return 0 results with high threshold, which is valid | |
| assert isinstance(high_threshold_results, list) | |
| def test_concurrent_search_operations(self): | |
| """Test multiple concurrent search operations.""" | |
| # Ingest data first | |
| synthetic_dir = "synthetic_policies" | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| assert result["status"] == "success" | |
| # Perform multiple searches in sequence (simulating concurrency) | |
| queries = [ | |
| "remote work", | |
| "benefits", | |
| "security", | |
| "vacation", | |
| "training", | |
| ] | |
| results_list = [] | |
| for query in queries: | |
| results = self.search_service.search(query, top_k=3) | |
| results_list.append(results) | |
| # Validate all searches completed successfully | |
| assert len(results_list) == len(queries) | |
| assert all(isinstance(results, list) for results in results_list) | |
| def test_vector_database_performance(self): | |
| """Test vector database performance and storage efficiency.""" | |
| # Ingest data and measure | |
| synthetic_dir = "synthetic_policies" | |
| start_time = time.time() | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| ingestion_time = time.time() - start_time | |
| # Measure database size | |
| db_size = self._get_database_size() | |
| self.performance_metrics["database_size_mb"] = db_size | |
| # Performance assertions | |
| chunks_processed = result["chunks_processed"] | |
| avg_time_per_chunk = ingestion_time / chunks_processed if chunks_processed > 0 else 0 | |
| assert avg_time_per_chunk < 5, f"Average time per chunk {avg_time_per_chunk:.3f}s too slow" | |
| # Database size should be reasonable (not excessive) | |
| max_size_mb = chunks_processed * 0.1 # Conservative estimate: 0.1MB per chunk | |
| assert db_size <= max_size_mb, f"Database size {db_size:.2f}MB exceeds threshold {max_size_mb:.2f}MB" | |
| def test_search_result_consistency(self): | |
| """Test that identical searches return consistent results.""" | |
| # Ingest data | |
| synthetic_dir = "synthetic_policies" | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| assert result["status"] == "success" | |
| query = "remote work policy" | |
| # Perform same search multiple times | |
| results_1 = self.search_service.search(query, top_k=5, threshold=0.3) | |
| results_2 = self.search_service.search(query, top_k=5, threshold=0.3) | |
| results_3 = self.search_service.search(query, top_k=5, threshold=0.3) | |
| # Validate consistency | |
| assert len(results_1) == len(results_2) == len(results_3) | |
| for i in range(len(results_1)): | |
| assert results_1[i]["chunk_id"] == results_2[i]["chunk_id"] == results_3[i]["chunk_id"] | |
| assert abs(results_1[i]["similarity_score"] - results_2[i]["similarity_score"]) < 0.001 | |
| assert abs(results_1[i]["similarity_score"] - results_3[i]["similarity_score"]) < 0.001 | |
| def test_comprehensive_pipeline_validation(self): | |
| """Comprehensive validation of the entire Phase 2B pipeline.""" | |
| # Complete pipeline test with detailed validation | |
| synthetic_dir = "synthetic_policies" | |
| # Step 1: Validate directory exists and has content | |
| assert os.path.exists(synthetic_dir) | |
| policy_files = [f for f in os.listdir(synthetic_dir) if f.endswith(".md")] | |
| assert len(policy_files) > 0, "No policy files found" | |
| # Step 2: Full ingestion with comprehensive validation | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| assert result["status"] == "success" | |
| assert result["chunks_processed"] >= len(policy_files) # At least one chunk per file | |
| assert result["embeddings_stored"] == result["chunks_processed"] | |
| assert "processing_time_seconds" in result | |
| assert result["processing_time_seconds"] > 0 | |
| # Step 3: Comprehensive search validation | |
| for query in self.TEST_QUERIES[:5]: # Test first 5 queries | |
| results = self.search_service.search(query, top_k=3, threshold=0.0) | |
| # Validate result structure | |
| for result_item in results: | |
| assert "chunk_id" in result_item | |
| assert "content" in result_item | |
| assert "similarity_score" in result_item | |
| assert "metadata" in result_item | |
| # Validate content quality | |
| assert result_item["content"] is not None, "Content should not be None" | |
| assert isinstance(result_item["content"], str), "Content should be a string" | |
| assert len(result_item["content"].strip()) > 0, "Content should not be empty" | |
| assert result_item["similarity_score"] >= 0.0 | |
| assert isinstance(result_item["metadata"], dict) | |
| # Step 4: Performance validation | |
| search_start = time.time() | |
| for _ in range(10): # 10 consecutive searches | |
| self.search_service.search("employee policy", top_k=3) | |
| avg_search_time = (time.time() - search_start) / 10 | |
| assert avg_search_time < 1, f"Average search time {avg_search_time:.3f}s exceeds 1s threshold" | |
| def _get_related_terms(self, query: str) -> List[str]: | |
| """Get related terms for semantic matching validation.""" | |
| related_terms_map = { | |
| "remote work": ["telecommute", "home office", "wfh", "flexible"], | |
| "benefits": ["health insurance", "medical", "dental", "retirement"], | |
| "vacation": ["pto", "time off", "leave", "holiday"], | |
| "security": ["password", "access", "data protection", "privacy"], | |
| "performance": ["review", "evaluation", "feedback", "assessment"], | |
| } | |
| query_lower = query.lower() | |
| for key, terms in related_terms_map.items(): | |
| if key in query_lower: | |
| return terms | |
| return [] | |
| def _get_database_size(self) -> float: | |
| """Get approximate database size in MB.""" | |
| total_size = 0 | |
| for root, _, files in os.walk(self.test_dir): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| if os.path.exists(file_path): | |
| total_size += os.path.getsize(file_path) | |
| return total_size / (1024 * 1024) # Convert to MB | |
| def test_performance_benchmarks(self): | |
| """Generate and validate performance benchmarks.""" | |
| # Run complete pipeline with timing | |
| synthetic_dir = "synthetic_policies" | |
| start_time = time.time() | |
| result = self.ingestion_pipeline.process_directory_with_embeddings(synthetic_dir) | |
| total_time = time.time() - start_time | |
| # Collect comprehensive metrics | |
| benchmarks = { | |
| "ingestion_total_time": total_time, | |
| "chunks_processed": result["chunks_processed"], | |
| "processing_rate_chunks_per_second": result["chunks_processed"] / total_time, | |
| "database_size_mb": self._get_database_size(), | |
| } | |
| # Search performance benchmarks | |
| search_times = [] | |
| for query in self.TEST_QUERIES[:5]: | |
| start = time.time() | |
| self.search_service.search(query, top_k=5) | |
| search_times.append(time.time() - start) | |
| benchmarks["avg_search_time"] = sum(search_times) / len(search_times) | |
| benchmarks["max_search_time"] = max(search_times) | |
| benchmarks["min_search_time"] = min(search_times) | |
| # Store benchmarks for reporting | |
| self.performance_metrics.update(benchmarks) | |
| # Validate benchmarks meet thresholds | |
| assert benchmarks["processing_rate_chunks_per_second"] >= 1 | |
| assert benchmarks["avg_search_time"] <= 2 | |
| assert benchmarks["max_search_time"] <= 5 | |
| # Print benchmarks for documentation | |
| print("\n=== Phase 2B Performance Benchmarks ===") | |
| for metric, value in benchmarks.items(): | |
| if "time" in metric: | |
| print(f"{metric}: {value:.3f}s") | |
| elif "rate" in metric: | |
| print(f"{metric}: {value:.2f}") | |
| elif "size" in metric: | |
| print(f"{metric}: {value:.2f}MB") | |
| else: | |
| print(f"{metric}: {value}") | |
| if __name__ == "__main__": | |
| # Run tests with verbose output for documentation | |
| pytest.main([__file__, "-v", "-s"]) | |