Spaces:
Sleeping
Sleeping
| """ | |
| Tests for enhanced ingestion pipeline with embeddings | |
| """ | |
| import tempfile | |
| import unittest | |
| from pathlib import Path | |
| from unittest.mock import Mock, patch | |
| from src.ingestion.ingestion_pipeline import IngestionPipeline | |
| class TestEnhancedIngestionPipeline(unittest.TestCase): | |
| """Test cases for enhanced IngestionPipeline with embeddings""" | |
| def setUp(self): | |
| """Set up test fixtures""" | |
| self.temp_dir = tempfile.mkdtemp() | |
| self.test_dir = Path(self.temp_dir) | |
| # Create test files | |
| self.test_file1 = self.test_dir / "test1.md" | |
| self.test_file1.write_text("# Test Document 1\n\nThis is test content for document 1.") | |
| self.test_file2 = self.test_dir / "test2.txt" | |
| self.test_file2.write_text("This is test content for document 2.") | |
| # Create an unsupported file (should be skipped) | |
| self.test_file3 = self.test_dir / "test3.pdf" | |
| self.test_file3.write_text("PDF content") | |
| def test_initialization_without_embeddings(self): | |
| """Test pipeline initialization without embeddings""" | |
| pipeline = IngestionPipeline(store_embeddings=False) | |
| self.assertIsNotNone(pipeline.parser) | |
| self.assertIsNotNone(pipeline.chunker) | |
| self.assertFalse(pipeline.store_embeddings) | |
| self.assertIsNone(pipeline.embedding_service) | |
| self.assertIsNone(pipeline.vector_db) | |
| def test_initialization_with_embeddings(self): | |
| """Test pipeline initialization with embeddings""" | |
| pipeline = IngestionPipeline(store_embeddings=True) | |
| self.assertIsNotNone(pipeline.parser) | |
| self.assertIsNotNone(pipeline.chunker) | |
| self.assertTrue(pipeline.store_embeddings) | |
| self.assertIsNotNone(pipeline.embedding_service) | |
| self.assertIsNotNone(pipeline.vector_db) | |
| def test_initialization_with_custom_components(self): | |
| """Test pipeline initialization with custom embedding components""" | |
| mock_embedding_service = Mock() | |
| mock_vector_db = Mock() | |
| pipeline = IngestionPipeline( | |
| store_embeddings=True, | |
| embedding_service=mock_embedding_service, | |
| vector_db=mock_vector_db, | |
| ) | |
| self.assertEqual(pipeline.embedding_service, mock_embedding_service) | |
| self.assertEqual(pipeline.vector_db, mock_vector_db) | |
| def test_process_directory_without_embeddings(self): | |
| """Test directory processing without embeddings""" | |
| pipeline = IngestionPipeline(store_embeddings=False) | |
| result = pipeline.process_directory_with_embeddings(str(self.test_dir)) | |
| # Check response structure | |
| self.assertIsInstance(result, dict) | |
| self.assertEqual(result["status"], "success") | |
| self.assertGreater(result["chunks_processed"], 0) | |
| self.assertEqual(result["files_processed"], 2) # Only .md and .txt files | |
| self.assertEqual(result["embeddings_stored"], 0) | |
| self.assertFalse(result["store_embeddings"]) | |
| self.assertIn("chunks", result) | |
| def test_process_directory_with_embeddings(self, mock_embedding_service_class, mock_vector_db_class): | |
| """Test directory processing with embeddings""" | |
| # Mock the classes to return mock instances | |
| mock_embedding_service = Mock() | |
| mock_vector_db = Mock() | |
| mock_embedding_service_class.return_value = mock_embedding_service | |
| mock_vector_db_class.return_value = mock_vector_db | |
| # Configure mock embedding service | |
| mock_embedding_service.embed_texts.return_value = [ | |
| [0.1, 0.2, 0.3], | |
| [0.4, 0.5, 0.6], | |
| ] | |
| # Configure mock vector database | |
| mock_vector_db.add_embeddings.return_value = True | |
| pipeline = IngestionPipeline(store_embeddings=True) | |
| result = pipeline.process_directory_with_embeddings(str(self.test_dir)) | |
| # Check response structure | |
| self.assertIsInstance(result, dict) | |
| self.assertEqual(result["status"], "success") | |
| self.assertGreater(result["chunks_processed"], 0) | |
| self.assertEqual(result["files_processed"], 2) | |
| self.assertGreater(result["embeddings_stored"], 0) | |
| self.assertTrue(result["store_embeddings"]) | |
| # Verify embedding service was called | |
| mock_embedding_service.embed_texts.assert_called() | |
| mock_vector_db.add_embeddings.assert_called() | |
| def test_process_directory_nonexistent(self): | |
| """Test processing non-existent directory""" | |
| pipeline = IngestionPipeline(store_embeddings=False) | |
| with self.assertRaises(FileNotFoundError): | |
| pipeline.process_directory("/nonexistent/directory") | |
| def test_store_embeddings_batch_without_components(self): | |
| """Test batch embedding storage without embedding components""" | |
| pipeline = IngestionPipeline(store_embeddings=False) | |
| chunks = [ | |
| { | |
| "content": "Test content 1", | |
| "metadata": {"chunk_id": "test1", "source": "test1.txt"}, | |
| } | |
| ] | |
| result = pipeline._store_embeddings_batch(chunks) | |
| self.assertEqual(result, 0) | |
| def test_store_embeddings_batch_success(self, mock_embedding_service_class, mock_vector_db_class): | |
| """Test successful batch embedding storage""" | |
| # Mock the classes to return mock instances | |
| mock_embedding_service = Mock() | |
| mock_vector_db = Mock() | |
| mock_embedding_service_class.return_value = mock_embedding_service | |
| mock_vector_db_class.return_value = mock_vector_db | |
| # Configure mocks | |
| mock_embedding_service.embed_texts.return_value = [ | |
| [0.1, 0.2, 0.3], | |
| [0.4, 0.5, 0.6], | |
| ] | |
| mock_vector_db.add_embeddings.return_value = True | |
| pipeline = IngestionPipeline(store_embeddings=True) | |
| chunks = [ | |
| { | |
| "content": "Test content 1", | |
| "metadata": {"chunk_id": "test1", "source": "test1.txt"}, | |
| }, | |
| { | |
| "content": "Test content 2", | |
| "metadata": {"chunk_id": "test2", "source": "test2.txt"}, | |
| }, | |
| ] | |
| result = pipeline._store_embeddings_batch(chunks) | |
| self.assertEqual(result, 2) | |
| # Verify method calls | |
| mock_embedding_service.embed_texts.assert_called_once_with(["Test content 1", "Test content 2"]) | |
| mock_vector_db.add_embeddings.assert_called_once() | |
| def test_store_embeddings_batch_error_handling(self, mock_embedding_service_class, mock_vector_db_class): | |
| """Test error handling in batch embedding storage""" | |
| # Mock the classes to return mock instances | |
| mock_embedding_service = Mock() | |
| mock_vector_db = Mock() | |
| mock_embedding_service_class.return_value = mock_embedding_service | |
| mock_vector_db_class.return_value = mock_vector_db | |
| # Configure embedding service to raise an error | |
| mock_embedding_service.embed_texts.side_effect = Exception("Embedding error") | |
| pipeline = IngestionPipeline(store_embeddings=True) | |
| chunks = [ | |
| { | |
| "content": "Test content 1", | |
| "metadata": {"chunk_id": "test1", "source": "test1.txt"}, | |
| } | |
| ] | |
| # Should handle error gracefully and return 0 | |
| result = pipeline._store_embeddings_batch(chunks) | |
| self.assertEqual(result, 0) | |
| def test_backward_compatibility(self): | |
| """Test that enhanced pipeline maintains backward compatibility""" | |
| pipeline = IngestionPipeline(store_embeddings=False) | |
| result = pipeline.process_directory(str(self.test_dir)) | |
| # Should return list for backward compatibility | |
| self.assertIsInstance(result, list) | |
| self.assertGreater(len(result), 0) | |
| # First chunk should have expected structure | |
| chunk = result[0] | |
| self.assertIn("content", chunk) | |
| self.assertIn("metadata", chunk) | |
| self.assertIn("chunk_id", chunk["metadata"]) | |
| def tearDown(self): | |
| """Clean up test fixtures""" | |
| import shutil | |
| shutil.rmtree(self.temp_dir, ignore_errors=True) | |
| if __name__ == "__main__": | |
| unittest.main() | |