msse-ai-engineering / src /rag /response_formatter.py
sethmcknight
Refactor test cases for improved readability and consistency
159faf0
"""
Response Formatter for RAG Pipeline
This module handles formatting of RAG responses with proper citation
formatting, metadata inclusion, and consistent response structure.
"""
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class FormattedResponse:
"""Standardized formatted response for API endpoints."""
status: str
answer: str
sources: List[Dict[str, Any]]
metadata: Dict[str, Any]
processing_info: Dict[str, Any]
error: Optional[str] = None
class ResponseFormatter:
"""
Formats RAG pipeline responses for various output formats.
Handles:
- API response formatting
- Citation formatting
- Metadata inclusion
- Error response formatting
"""
def __init__(self):
"""Initialize ResponseFormatter."""
logger.info("ResponseFormatter initialized")
def format_api_response(self, rag_response: Any, include_debug: bool = False) -> Dict[str, Any]: # RAGResponse type
"""
Format RAG response for API consumption.
Args:
rag_response: RAGResponse from RAG pipeline
include_debug: Whether to include debug information
Returns:
Formatted dictionary for JSON API response
"""
if not rag_response.success:
return self._format_error_response(rag_response)
# Base response structure
formatted_response = {
"status": "success",
"answer": rag_response.answer,
"sources": self._format_source_list(rag_response.sources),
"metadata": {
"confidence": round(rag_response.confidence, 3),
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
"source_count": len(rag_response.sources),
"context_length": rag_response.context_length,
},
}
# Add debug information if requested
if include_debug:
formatted_response["debug"] = {
"llm_provider": rag_response.llm_provider,
"llm_model": rag_response.llm_model,
"search_results_count": rag_response.search_results_count,
"processing_time_seconds": round(rag_response.processing_time, 3),
}
return formatted_response
def format_chat_response(
self,
rag_response: Any, # RAGResponse type
conversation_id: Optional[str] = None,
include_sources: bool = True,
) -> Dict[str, Any]:
"""
Format RAG response for chat interface.
Args:
rag_response: RAGResponse from RAG pipeline
conversation_id: Optional conversation ID
include_sources: Whether to include source information
Returns:
Formatted dictionary for chat interface
"""
if not rag_response.success:
return self._format_chat_error(rag_response, conversation_id)
response = {
"message": rag_response.answer,
"confidence": round(rag_response.confidence, 2),
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
}
if conversation_id:
response["conversation_id"] = conversation_id
if include_sources and rag_response.sources:
response["sources"] = self._format_sources_for_chat(rag_response.sources)
return response
def _format_source_list(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format source list for API response."""
formatted_sources = []
for source in sources:
formatted_source = {
"document": source.get("document", "unknown"),
"relevance_score": round(source.get("relevance_score", 0.0), 3),
"excerpt": source.get("excerpt", ""),
}
# Add chunk ID if available
chunk_id = source.get("chunk_id", "")
if chunk_id:
formatted_source["chunk_id"] = chunk_id
formatted_sources.append(formatted_source)
return formatted_sources
def _format_sources_for_chat(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format sources for chat interface (more concise)."""
formatted_sources = []
for i, source in enumerate(sources[:3], 1): # Limit to top 3 for chat
formatted_source = {
"id": i,
"document": source.get("document", "unknown"),
"relevance": f"{source.get('relevance_score', 0.0):.1%}",
"preview": (
source.get("excerpt", "")[:100] + "..."
if len(source.get("excerpt", "")) > 100
else source.get("excerpt", "")
),
}
formatted_sources.append(formatted_source)
return formatted_sources
def _format_error_response(self, rag_response: Any) -> Dict[str, Any]:
"""Format error response for API."""
return {
"status": "error",
"error": {
"message": rag_response.answer,
"details": rag_response.error_message,
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
},
"sources": [],
"metadata": {"confidence": 0.0, "source_count": 0, "context_length": 0},
}
def _format_chat_error(self, rag_response: Any, conversation_id: Optional[str] = None) -> Dict[str, Any]:
"""Format error response for chat interface."""
response = {
"message": rag_response.answer,
"error": True,
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
}
if conversation_id:
response["conversation_id"] = conversation_id
return response
def validate_response_format(self, response: Dict[str, Any]) -> bool:
"""
Validate that response follows expected format.
Args:
response: Formatted response dictionary
Returns:
True if format is valid, False otherwise
"""
required_fields = ["status"]
# Check required fields
for field in required_fields:
if field not in response:
logger.error(f"Missing required field: {field}")
return False
# Check status-specific requirements
if response["status"] == "success":
success_fields = ["answer", "sources", "metadata"]
for field in success_fields:
if field not in response:
logger.error(f"Missing success field: {field}")
return False
elif response["status"] == "error":
if "error" not in response:
logger.error("Missing error field in error response")
return False
return True
def create_health_response(self, health_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Format health check response.
Args:
health_data: Health status from RAG pipeline
Returns:
Formatted health response
"""
return {
"status": "success",
"health": {
"pipeline_status": health_data.get("pipeline", "unknown"),
"components": health_data.get("components", {}),
"timestamp": self._get_timestamp(),
},
}
def create_no_answer_response(self, question: str, reason: str = "no_context") -> Dict[str, Any]:
"""
Create standardized response when no answer can be provided.
Args:
question: Original user question
reason: Reason for no answer (no_context, insufficient_context, etc.)
Returns:
Formatted no-answer response
"""
messages = {
"no_context": (
"I couldn't find any relevant information in our corporate " "policies to answer your question."
),
"insufficient_context": (
"I found some potentially relevant information, but not " "enough to provide a complete answer."
),
"off_topic": ("This question appears to be outside the scope of our " "corporate policies."),
"error": "I encountered an error while processing your question.",
}
message = messages.get(reason, messages["error"])
return {
"status": "no_answer",
"message": message,
"reason": reason,
"suggestion": ("Please contact HR or rephrase your question for better results."),
"sources": [],
}
def _get_timestamp(self) -> str:
"""Get current timestamp in ISO format."""
from datetime import datetime
return datetime.utcnow().isoformat() + "Z"
def format_for_logging(self, rag_response: Any, question: str) -> Dict[str, Any]:
"""
Format response data for logging purposes.
Args:
rag_response: RAGResponse from pipeline
question: Original question
Returns:
Formatted data for logging
"""
return {
"timestamp": self._get_timestamp(),
"question_length": len(question),
"question_hash": hash(question) % 10000, # Simple hash for tracking
"success": rag_response.success,
"confidence": rag_response.confidence,
"processing_time": rag_response.processing_time,
"llm_provider": rag_response.llm_provider,
"source_count": len(rag_response.sources),
"context_length": rag_response.context_length,
"answer_length": len(rag_response.answer),
"error": rag_response.error_message,
}