msse-ai-engineering / tests /test_ingestion /test_enhanced_ingestion_pipeline.py
sethmcknight
Refactor test cases for improved readability and consistency
159faf0
"""
Tests for enhanced ingestion pipeline with embeddings
"""
import tempfile
import unittest
from pathlib import Path
from unittest.mock import Mock, patch
from src.ingestion.ingestion_pipeline import IngestionPipeline
class TestEnhancedIngestionPipeline(unittest.TestCase):
"""Test cases for enhanced IngestionPipeline with embeddings"""
def setUp(self):
"""Set up test fixtures"""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
# Create test files
self.test_file1 = self.test_dir / "test1.md"
self.test_file1.write_text("# Test Document 1\n\nThis is test content for document 1.")
self.test_file2 = self.test_dir / "test2.txt"
self.test_file2.write_text("This is test content for document 2.")
# Create an unsupported file (should be skipped)
self.test_file3 = self.test_dir / "test3.pdf"
self.test_file3.write_text("PDF content")
def test_initialization_without_embeddings(self):
"""Test pipeline initialization without embeddings"""
pipeline = IngestionPipeline(store_embeddings=False)
self.assertIsNotNone(pipeline.parser)
self.assertIsNotNone(pipeline.chunker)
self.assertFalse(pipeline.store_embeddings)
self.assertIsNone(pipeline.embedding_service)
self.assertIsNone(pipeline.vector_db)
def test_initialization_with_embeddings(self):
"""Test pipeline initialization with embeddings"""
pipeline = IngestionPipeline(store_embeddings=True)
self.assertIsNotNone(pipeline.parser)
self.assertIsNotNone(pipeline.chunker)
self.assertTrue(pipeline.store_embeddings)
self.assertIsNotNone(pipeline.embedding_service)
self.assertIsNotNone(pipeline.vector_db)
def test_initialization_with_custom_components(self):
"""Test pipeline initialization with custom embedding components"""
mock_embedding_service = Mock()
mock_vector_db = Mock()
pipeline = IngestionPipeline(
store_embeddings=True,
embedding_service=mock_embedding_service,
vector_db=mock_vector_db,
)
self.assertEqual(pipeline.embedding_service, mock_embedding_service)
self.assertEqual(pipeline.vector_db, mock_vector_db)
def test_process_directory_without_embeddings(self):
"""Test directory processing without embeddings"""
pipeline = IngestionPipeline(store_embeddings=False)
result = pipeline.process_directory_with_embeddings(str(self.test_dir))
# Check response structure
self.assertIsInstance(result, dict)
self.assertEqual(result["status"], "success")
self.assertGreater(result["chunks_processed"], 0)
self.assertEqual(result["files_processed"], 2) # Only .md and .txt files
self.assertEqual(result["embeddings_stored"], 0)
self.assertFalse(result["store_embeddings"])
self.assertIn("chunks", result)
@patch("src.ingestion.ingestion_pipeline.VectorDatabase")
@patch("src.ingestion.ingestion_pipeline.EmbeddingService")
def test_process_directory_with_embeddings(self, mock_embedding_service_class, mock_vector_db_class):
"""Test directory processing with embeddings"""
# Mock the classes to return mock instances
mock_embedding_service = Mock()
mock_vector_db = Mock()
mock_embedding_service_class.return_value = mock_embedding_service
mock_vector_db_class.return_value = mock_vector_db
# Configure mock embedding service
mock_embedding_service.embed_texts.return_value = [
[0.1, 0.2, 0.3],
[0.4, 0.5, 0.6],
]
# Configure mock vector database
mock_vector_db.add_embeddings.return_value = True
pipeline = IngestionPipeline(store_embeddings=True)
result = pipeline.process_directory_with_embeddings(str(self.test_dir))
# Check response structure
self.assertIsInstance(result, dict)
self.assertEqual(result["status"], "success")
self.assertGreater(result["chunks_processed"], 0)
self.assertEqual(result["files_processed"], 2)
self.assertGreater(result["embeddings_stored"], 0)
self.assertTrue(result["store_embeddings"])
# Verify embedding service was called
mock_embedding_service.embed_texts.assert_called()
mock_vector_db.add_embeddings.assert_called()
def test_process_directory_nonexistent(self):
"""Test processing non-existent directory"""
pipeline = IngestionPipeline(store_embeddings=False)
with self.assertRaises(FileNotFoundError):
pipeline.process_directory("/nonexistent/directory")
def test_store_embeddings_batch_without_components(self):
"""Test batch embedding storage without embedding components"""
pipeline = IngestionPipeline(store_embeddings=False)
chunks = [
{
"content": "Test content 1",
"metadata": {"chunk_id": "test1", "source": "test1.txt"},
}
]
result = pipeline._store_embeddings_batch(chunks)
self.assertEqual(result, 0)
@patch("src.ingestion.ingestion_pipeline.VectorDatabase")
@patch("src.ingestion.ingestion_pipeline.EmbeddingService")
def test_store_embeddings_batch_success(self, mock_embedding_service_class, mock_vector_db_class):
"""Test successful batch embedding storage"""
# Mock the classes to return mock instances
mock_embedding_service = Mock()
mock_vector_db = Mock()
mock_embedding_service_class.return_value = mock_embedding_service
mock_vector_db_class.return_value = mock_vector_db
# Configure mocks
mock_embedding_service.embed_texts.return_value = [
[0.1, 0.2, 0.3],
[0.4, 0.5, 0.6],
]
mock_vector_db.add_embeddings.return_value = True
pipeline = IngestionPipeline(store_embeddings=True)
chunks = [
{
"content": "Test content 1",
"metadata": {"chunk_id": "test1", "source": "test1.txt"},
},
{
"content": "Test content 2",
"metadata": {"chunk_id": "test2", "source": "test2.txt"},
},
]
result = pipeline._store_embeddings_batch(chunks)
self.assertEqual(result, 2)
# Verify method calls
mock_embedding_service.embed_texts.assert_called_once_with(["Test content 1", "Test content 2"])
mock_vector_db.add_embeddings.assert_called_once()
@patch("src.ingestion.ingestion_pipeline.VectorDatabase")
@patch("src.ingestion.ingestion_pipeline.EmbeddingService")
def test_store_embeddings_batch_error_handling(self, mock_embedding_service_class, mock_vector_db_class):
"""Test error handling in batch embedding storage"""
# Mock the classes to return mock instances
mock_embedding_service = Mock()
mock_vector_db = Mock()
mock_embedding_service_class.return_value = mock_embedding_service
mock_vector_db_class.return_value = mock_vector_db
# Configure embedding service to raise an error
mock_embedding_service.embed_texts.side_effect = Exception("Embedding error")
pipeline = IngestionPipeline(store_embeddings=True)
chunks = [
{
"content": "Test content 1",
"metadata": {"chunk_id": "test1", "source": "test1.txt"},
}
]
# Should handle error gracefully and return 0
result = pipeline._store_embeddings_batch(chunks)
self.assertEqual(result, 0)
def test_backward_compatibility(self):
"""Test that enhanced pipeline maintains backward compatibility"""
pipeline = IngestionPipeline(store_embeddings=False)
result = pipeline.process_directory(str(self.test_dir))
# Should return list for backward compatibility
self.assertIsInstance(result, list)
self.assertGreater(len(result), 0)
# First chunk should have expected structure
chunk = result[0]
self.assertIn("content", chunk)
self.assertIn("metadata", chunk)
self.assertIn("chunk_id", chunk["metadata"])
def tearDown(self):
"""Clean up test fixtures"""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
if __name__ == "__main__":
unittest.main()