Spaces:
Sleeping
Sleeping
| """ | |
| Response Formatter for RAG Pipeline | |
| This module handles formatting of RAG responses with proper citation | |
| formatting, metadata inclusion, and consistent response structure. | |
| """ | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| class FormattedResponse: | |
| """Standardized formatted response for API endpoints.""" | |
| status: str | |
| answer: str | |
| sources: List[Dict[str, Any]] | |
| metadata: Dict[str, Any] | |
| processing_info: Dict[str, Any] | |
| error: Optional[str] = None | |
| class ResponseFormatter: | |
| """ | |
| Formats RAG pipeline responses for various output formats. | |
| Handles: | |
| - API response formatting | |
| - Citation formatting | |
| - Metadata inclusion | |
| - Error response formatting | |
| """ | |
| def __init__(self): | |
| """Initialize ResponseFormatter.""" | |
| logger.info("ResponseFormatter initialized") | |
| def format_api_response(self, rag_response: Any, include_debug: bool = False) -> Dict[str, Any]: # RAGResponse type | |
| """ | |
| Format RAG response for API consumption. | |
| Args: | |
| rag_response: RAGResponse from RAG pipeline | |
| include_debug: Whether to include debug information | |
| Returns: | |
| Formatted dictionary for JSON API response | |
| """ | |
| if not rag_response.success: | |
| return self._format_error_response(rag_response) | |
| # Base response structure | |
| formatted_response = { | |
| "status": "success", | |
| "answer": rag_response.answer, | |
| "sources": self._format_source_list(rag_response.sources), | |
| "metadata": { | |
| "confidence": round(rag_response.confidence, 3), | |
| "processing_time_ms": round(rag_response.processing_time * 1000, 1), | |
| "source_count": len(rag_response.sources), | |
| "context_length": rag_response.context_length, | |
| }, | |
| } | |
| # Add debug information if requested | |
| if include_debug: | |
| formatted_response["debug"] = { | |
| "llm_provider": rag_response.llm_provider, | |
| "llm_model": rag_response.llm_model, | |
| "search_results_count": rag_response.search_results_count, | |
| "processing_time_seconds": round(rag_response.processing_time, 3), | |
| } | |
| return formatted_response | |
| def format_chat_response( | |
| self, | |
| rag_response: Any, # RAGResponse type | |
| conversation_id: Optional[str] = None, | |
| include_sources: bool = True, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Format RAG response for chat interface. | |
| Args: | |
| rag_response: RAGResponse from RAG pipeline | |
| conversation_id: Optional conversation ID | |
| include_sources: Whether to include source information | |
| Returns: | |
| Formatted dictionary for chat interface | |
| """ | |
| if not rag_response.success: | |
| return self._format_chat_error(rag_response, conversation_id) | |
| response = { | |
| "message": rag_response.answer, | |
| "confidence": round(rag_response.confidence, 2), | |
| "processing_time_ms": round(rag_response.processing_time * 1000, 1), | |
| } | |
| if conversation_id: | |
| response["conversation_id"] = conversation_id | |
| if include_sources and rag_response.sources: | |
| response["sources"] = self._format_sources_for_chat(rag_response.sources) | |
| return response | |
| def _format_source_list(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Format source list for API response.""" | |
| formatted_sources = [] | |
| for source in sources: | |
| formatted_source = { | |
| "document": source.get("document", "unknown"), | |
| "relevance_score": round(source.get("relevance_score", 0.0), 3), | |
| "excerpt": source.get("excerpt", ""), | |
| } | |
| # Add chunk ID if available | |
| chunk_id = source.get("chunk_id", "") | |
| if chunk_id: | |
| formatted_source["chunk_id"] = chunk_id | |
| formatted_sources.append(formatted_source) | |
| return formatted_sources | |
| def _format_sources_for_chat(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Format sources for chat interface (more concise).""" | |
| formatted_sources = [] | |
| for i, source in enumerate(sources[:3], 1): # Limit to top 3 for chat | |
| formatted_source = { | |
| "id": i, | |
| "document": source.get("document", "unknown"), | |
| "relevance": f"{source.get('relevance_score', 0.0):.1%}", | |
| "preview": ( | |
| source.get("excerpt", "")[:100] + "..." | |
| if len(source.get("excerpt", "")) > 100 | |
| else source.get("excerpt", "") | |
| ), | |
| } | |
| formatted_sources.append(formatted_source) | |
| return formatted_sources | |
| def _format_error_response(self, rag_response: Any) -> Dict[str, Any]: | |
| """Format error response for API.""" | |
| return { | |
| "status": "error", | |
| "error": { | |
| "message": rag_response.answer, | |
| "details": rag_response.error_message, | |
| "processing_time_ms": round(rag_response.processing_time * 1000, 1), | |
| }, | |
| "sources": [], | |
| "metadata": {"confidence": 0.0, "source_count": 0, "context_length": 0}, | |
| } | |
| def _format_chat_error(self, rag_response: Any, conversation_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Format error response for chat interface.""" | |
| response = { | |
| "message": rag_response.answer, | |
| "error": True, | |
| "processing_time_ms": round(rag_response.processing_time * 1000, 1), | |
| } | |
| if conversation_id: | |
| response["conversation_id"] = conversation_id | |
| return response | |
| def validate_response_format(self, response: Dict[str, Any]) -> bool: | |
| """ | |
| Validate that response follows expected format. | |
| Args: | |
| response: Formatted response dictionary | |
| Returns: | |
| True if format is valid, False otherwise | |
| """ | |
| required_fields = ["status"] | |
| # Check required fields | |
| for field in required_fields: | |
| if field not in response: | |
| logger.error(f"Missing required field: {field}") | |
| return False | |
| # Check status-specific requirements | |
| if response["status"] == "success": | |
| success_fields = ["answer", "sources", "metadata"] | |
| for field in success_fields: | |
| if field not in response: | |
| logger.error(f"Missing success field: {field}") | |
| return False | |
| elif response["status"] == "error": | |
| if "error" not in response: | |
| logger.error("Missing error field in error response") | |
| return False | |
| return True | |
| def create_health_response(self, health_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Format health check response. | |
| Args: | |
| health_data: Health status from RAG pipeline | |
| Returns: | |
| Formatted health response | |
| """ | |
| return { | |
| "status": "success", | |
| "health": { | |
| "pipeline_status": health_data.get("pipeline", "unknown"), | |
| "components": health_data.get("components", {}), | |
| "timestamp": self._get_timestamp(), | |
| }, | |
| } | |
| def create_no_answer_response(self, question: str, reason: str = "no_context") -> Dict[str, Any]: | |
| """ | |
| Create standardized response when no answer can be provided. | |
| Args: | |
| question: Original user question | |
| reason: Reason for no answer (no_context, insufficient_context, etc.) | |
| Returns: | |
| Formatted no-answer response | |
| """ | |
| messages = { | |
| "no_context": ( | |
| "I couldn't find any relevant information in our corporate " "policies to answer your question." | |
| ), | |
| "insufficient_context": ( | |
| "I found some potentially relevant information, but not " "enough to provide a complete answer." | |
| ), | |
| "off_topic": ("This question appears to be outside the scope of our " "corporate policies."), | |
| "error": "I encountered an error while processing your question.", | |
| } | |
| message = messages.get(reason, messages["error"]) | |
| return { | |
| "status": "no_answer", | |
| "message": message, | |
| "reason": reason, | |
| "suggestion": ("Please contact HR or rephrase your question for better results."), | |
| "sources": [], | |
| } | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp in ISO format.""" | |
| from datetime import datetime | |
| return datetime.utcnow().isoformat() + "Z" | |
| def format_for_logging(self, rag_response: Any, question: str) -> Dict[str, Any]: | |
| """ | |
| Format response data for logging purposes. | |
| Args: | |
| rag_response: RAGResponse from pipeline | |
| question: Original question | |
| Returns: | |
| Formatted data for logging | |
| """ | |
| return { | |
| "timestamp": self._get_timestamp(), | |
| "question_length": len(question), | |
| "question_hash": hash(question) % 10000, # Simple hash for tracking | |
| "success": rag_response.success, | |
| "confidence": rag_response.confidence, | |
| "processing_time": rag_response.processing_time, | |
| "llm_provider": rag_response.llm_provider, | |
| "source_count": len(rag_response.sources), | |
| "context_length": rag_response.context_length, | |
| "answer_length": len(rag_response.answer), | |
| "error": rag_response.error_message, | |
| } | |