""" Response Formatter for RAG Pipeline This module handles formatting of RAG responses with proper citation formatting, metadata inclusion, and consistent response structure. """ import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class FormattedResponse: """Standardized formatted response for API endpoints.""" status: str answer: str sources: List[Dict[str, Any]] metadata: Dict[str, Any] processing_info: Dict[str, Any] error: Optional[str] = None class ResponseFormatter: """ Formats RAG pipeline responses for various output formats. Handles: - API response formatting - Citation formatting - Metadata inclusion - Error response formatting """ def __init__(self): """Initialize ResponseFormatter.""" logger.info("ResponseFormatter initialized") def format_api_response(self, rag_response: Any, include_debug: bool = False) -> Dict[str, Any]: # RAGResponse type """ Format RAG response for API consumption. Args: rag_response: RAGResponse from RAG pipeline include_debug: Whether to include debug information Returns: Formatted dictionary for JSON API response """ if not rag_response.success: return self._format_error_response(rag_response) # Base response structure formatted_response = { "status": "success", "answer": rag_response.answer, "sources": self._format_source_list(rag_response.sources), "metadata": { "confidence": round(rag_response.confidence, 3), "processing_time_ms": round(rag_response.processing_time * 1000, 1), "source_count": len(rag_response.sources), "context_length": rag_response.context_length, }, } # Add debug information if requested if include_debug: formatted_response["debug"] = { "llm_provider": rag_response.llm_provider, "llm_model": rag_response.llm_model, "search_results_count": rag_response.search_results_count, "processing_time_seconds": round(rag_response.processing_time, 3), } return formatted_response def format_chat_response( self, rag_response: Any, # RAGResponse type conversation_id: Optional[str] = None, include_sources: bool = True, ) -> Dict[str, Any]: """ Format RAG response for chat interface. Args: rag_response: RAGResponse from RAG pipeline conversation_id: Optional conversation ID include_sources: Whether to include source information Returns: Formatted dictionary for chat interface """ if not rag_response.success: return self._format_chat_error(rag_response, conversation_id) response = { "message": rag_response.answer, "confidence": round(rag_response.confidence, 2), "processing_time_ms": round(rag_response.processing_time * 1000, 1), } if conversation_id: response["conversation_id"] = conversation_id if include_sources and rag_response.sources: response["sources"] = self._format_sources_for_chat(rag_response.sources) return response def _format_source_list(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format source list for API response.""" formatted_sources = [] for source in sources: formatted_source = { "document": source.get("document", "unknown"), "relevance_score": round(source.get("relevance_score", 0.0), 3), "excerpt": source.get("excerpt", ""), } # Add chunk ID if available chunk_id = source.get("chunk_id", "") if chunk_id: formatted_source["chunk_id"] = chunk_id formatted_sources.append(formatted_source) return formatted_sources def _format_sources_for_chat(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format sources for chat interface (more concise).""" formatted_sources = [] for i, source in enumerate(sources[:3], 1): # Limit to top 3 for chat formatted_source = { "id": i, "document": source.get("document", "unknown"), "relevance": f"{source.get('relevance_score', 0.0):.1%}", "preview": ( source.get("excerpt", "")[:100] + "..." if len(source.get("excerpt", "")) > 100 else source.get("excerpt", "") ), } formatted_sources.append(formatted_source) return formatted_sources def _format_error_response(self, rag_response: Any) -> Dict[str, Any]: """Format error response for API.""" return { "status": "error", "error": { "message": rag_response.answer, "details": rag_response.error_message, "processing_time_ms": round(rag_response.processing_time * 1000, 1), }, "sources": [], "metadata": {"confidence": 0.0, "source_count": 0, "context_length": 0}, } def _format_chat_error(self, rag_response: Any, conversation_id: Optional[str] = None) -> Dict[str, Any]: """Format error response for chat interface.""" response = { "message": rag_response.answer, "error": True, "processing_time_ms": round(rag_response.processing_time * 1000, 1), } if conversation_id: response["conversation_id"] = conversation_id return response def validate_response_format(self, response: Dict[str, Any]) -> bool: """ Validate that response follows expected format. Args: response: Formatted response dictionary Returns: True if format is valid, False otherwise """ required_fields = ["status"] # Check required fields for field in required_fields: if field not in response: logger.error(f"Missing required field: {field}") return False # Check status-specific requirements if response["status"] == "success": success_fields = ["answer", "sources", "metadata"] for field in success_fields: if field not in response: logger.error(f"Missing success field: {field}") return False elif response["status"] == "error": if "error" not in response: logger.error("Missing error field in error response") return False return True def create_health_response(self, health_data: Dict[str, Any]) -> Dict[str, Any]: """ Format health check response. Args: health_data: Health status from RAG pipeline Returns: Formatted health response """ return { "status": "success", "health": { "pipeline_status": health_data.get("pipeline", "unknown"), "components": health_data.get("components", {}), "timestamp": self._get_timestamp(), }, } def create_no_answer_response(self, question: str, reason: str = "no_context") -> Dict[str, Any]: """ Create standardized response when no answer can be provided. Args: question: Original user question reason: Reason for no answer (no_context, insufficient_context, etc.) Returns: Formatted no-answer response """ messages = { "no_context": ( "I couldn't find any relevant information in our corporate " "policies to answer your question." ), "insufficient_context": ( "I found some potentially relevant information, but not " "enough to provide a complete answer." ), "off_topic": ("This question appears to be outside the scope of our " "corporate policies."), "error": "I encountered an error while processing your question.", } message = messages.get(reason, messages["error"]) return { "status": "no_answer", "message": message, "reason": reason, "suggestion": ("Please contact HR or rephrase your question for better results."), "sources": [], } def _get_timestamp(self) -> str: """Get current timestamp in ISO format.""" from datetime import datetime return datetime.utcnow().isoformat() + "Z" def format_for_logging(self, rag_response: Any, question: str) -> Dict[str, Any]: """ Format response data for logging purposes. Args: rag_response: RAGResponse from pipeline question: Original question Returns: Formatted data for logging """ return { "timestamp": self._get_timestamp(), "question_length": len(question), "question_hash": hash(question) % 10000, # Simple hash for tracking "success": rag_response.success, "confidence": rag_response.confidence, "processing_time": rag_response.processing_time, "llm_provider": rag_response.llm_provider, "source_count": len(rag_response.sources), "context_length": rag_response.context_length, "answer_length": len(rag_response.answer), "error": rag_response.error_message, }