""" Guardrails System - Main orchestrator for comprehensive response validation This module provides the main GuardrailsSystem class that coordinates all guardrails components for comprehensive response validation. """ import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional from .content_filters import ContentFilter, SafetyResult from .error_handlers import ErrorHandler, GuardrailsError from .quality_metrics import QualityMetrics, QualityScore from .response_validator import ResponseValidator, ValidationResult from .source_attribution import Citation, SourceAttributor logger = logging.getLogger(__name__) @dataclass class GuardrailsResult: """Comprehensive result from guardrails validation.""" is_approved: bool confidence_score: float # Component results validation_result: ValidationResult safety_result: SafetyResult quality_score: QualityScore citations: List[Citation] # Processing metadata processing_time: float components_used: List[str] fallbacks_applied: List[str] warnings: List[str] recommendations: List[str] # Final response data filtered_response: str enhanced_response: str # Response with citations metadata: Dict[str, Any] class GuardrailsSystem: """ Main guardrails system orchestrating all validation components. Provides comprehensive response validation including: - Response quality and safety validation - Content filtering and PII protection - Source attribution and citation generation - Quality scoring and recommendations - Error handling and graceful fallbacks """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize GuardrailsSystem with configuration. Args: config: Configuration dictionary for all guardrails components """ self.config = config or self._get_default_config() # Initialize components self.response_validator = ResponseValidator(self.config.get("response_validator", {})) self.content_filter = ContentFilter(self.config.get("content_filter", {})) self.quality_metrics = QualityMetrics(self.config.get("quality_metrics", {})) self.source_attributor = SourceAttributor(self.config.get("source_attribution", {})) self.error_handler = ErrorHandler(self.config.get("error_handler", {})) logger.info("GuardrailsSystem initialized with all components") def _get_default_config(self) -> Dict[str, Any]: """Get default configuration for guardrails system.""" return { "enable_all_checks": True, "strict_mode": False, "require_approval": True, "min_confidence_threshold": 0.7, "enable_response_enhancement": True, "log_all_results": True, "response_validator": { "min_overall_quality": 0.7, "require_citations": True, "min_response_length": 10, "max_response_length": 2000, "enable_safety_checks": True, "enable_coherence_check": True, "enable_completeness_check": True, "enable_relevance_check": True, }, "content_filter": { "enable_pii_filtering": True, "enable_bias_detection": True, "enable_inappropriate_filter": True, "enable_topic_validation": True, "strict_mode": False, "mask_pii": True, "allowed_topics": [ "corporate policy", "employee handbook", "workplace guidelines", "company procedures", "benefits", "hr policies", ], "pii_mask_char": "*", "max_bias_score": 0.3, "min_professionalism_score": 0.7, "safety_threshold": 0.8, }, "quality_metrics": { "quality_threshold": 0.7, "relevance_weight": 0.3, "completeness_weight": 0.25, "coherence_weight": 0.2, "source_fidelity_weight": 0.25, "min_response_length": 50, "target_response_length": 300, "max_response_length": 1000, "min_citation_count": 1, "preferred_source_count": 3, "enable_detailed_analysis": True, "enable_relevance_scoring": True, "enable_completeness_scoring": True, "enable_coherence_scoring": True, "enable_source_fidelity_scoring": True, "enable_professionalism_scoring": True, }, "source_attribution": { "max_citations": 5, "citation_format": "numbered", "max_excerpt_length": 200, "require_document_names": True, "min_source_confidence": 0.5, "min_confidence_for_citation": 0.3, "enable_quote_extraction": True, }, "error_handler": { "enable_fallbacks": True, "graceful_degradation": True, "max_retries": 3, "enable_circuit_breaker": True, "failure_threshold": 5, "recovery_timeout": 60, }, } def validate_response( self, response: str, query: str, sources: List[Dict[str, Any]], context: Optional[str] = None, ) -> GuardrailsResult: """ Perform comprehensive validation of RAG response. Args: response: Generated response text query: Original user query sources: Source documents used for generation context: Optional additional context Returns: GuardrailsResult with comprehensive validation results """ import time start_time = time.time() components_used = [] fallbacks_applied = [] warnings = [] try: # 1. Content Safety Filtering try: safety_result = self.content_filter.filter_content(response, context) components_used.append("content_filter") if not safety_result.is_safe and self.config["strict_mode"]: return self._create_rejection_result( "Content safety validation failed", safety_result, components_used, time.time() - start_time, ) except Exception as e: logger.warning(f"Content filtering failed: {e}") safety_recovery = self.error_handler.handle_content_filter_error(e, response, context) # Create SafetyResult from recovery data safety_result = SafetyResult( is_safe=safety_recovery.get("is_safe", True), risk_level=safety_recovery.get("risk_level", "medium"), issues_found=safety_recovery.get("issues_found", ["Recovery applied"]), filtered_content=safety_recovery.get("filtered_content", response), confidence=safety_recovery.get("confidence", 0.5), ) fallbacks_applied.append("content_filter_fallback") warnings.append("Content filtering used fallback") # Use filtered content for subsequent checks filtered_response = safety_result.filtered_content # 2. Response Validation try: validation_result = self.response_validator.validate_response(filtered_response, sources, query) components_used.append("response_validator") except Exception as e: logger.warning(f"Response validation failed: {e}") validation_recovery = self.error_handler.handle_validation_error( e, filtered_response, {"query": query, "sources": sources} ) if validation_recovery["success"]: validation_result = validation_recovery["result"] fallbacks_applied.append("validation_fallback") else: # Critical failure raise GuardrailsError( "Response validation failed critically", "validation_failure", {"original_error": str(e)}, ) # 3. Quality Assessment try: quality_score = self.quality_metrics.calculate_quality_score(filtered_response, query, sources, context) components_used.append("quality_metrics") except Exception as e: logger.warning(f"Quality assessment failed: {e}") quality_recovery = self.error_handler.handle_quality_metrics_error(e, filtered_response, query, sources) if quality_recovery["success"]: quality_score = quality_recovery["quality_score"] fallbacks_applied.append("quality_metrics_fallback") else: # Use minimal fallback score quality_score = QualityScore( overall_score=0.5, relevance_score=0.5, completeness_score=0.5, coherence_score=0.5, source_fidelity_score=0.5, professionalism_score=0.5, response_length=len(filtered_response), citation_count=0, source_count=len(sources), confidence_level="low", meets_threshold=False, strengths=[], weaknesses=["Quality assessment failed"], recommendations=["Manual review required"], ) fallbacks_applied.append("quality_score_minimal_fallback") # 4. Source Attribution try: citations = self.source_attributor.generate_citations(filtered_response, sources) components_used.append("source_attribution") except Exception as e: logger.warning(f"Source attribution failed: {e}") citation_recovery = self.error_handler.handle_source_attribution_error(e, filtered_response, sources) citations = citation_recovery.get("citations", []) fallbacks_applied.append("citation_fallback") # 5. Calculate Overall Approval approval_decision = self._calculate_approval(validation_result, safety_result, quality_score, citations) # 6. Enhance Response (if approved and enabled) enhanced_response = filtered_response if approval_decision["approved"] and self.config["enable_response_enhancement"]: enhanced_response = self._enhance_response_with_citations(filtered_response, citations) # 7. Generate Recommendations recommendations = self._generate_recommendations(validation_result, safety_result, quality_score, citations) processing_time = time.time() - start_time # Create final result result = GuardrailsResult( is_approved=approval_decision["approved"], confidence_score=approval_decision["confidence"], validation_result=validation_result, safety_result=safety_result, quality_score=quality_score, citations=citations, processing_time=processing_time, components_used=components_used, fallbacks_applied=fallbacks_applied, warnings=warnings, recommendations=recommendations, filtered_response=filtered_response, enhanced_response=enhanced_response, metadata={ "query": query, "source_count": len(sources), "approval_reason": approval_decision["reason"], }, ) if self.config["log_all_results"]: self._log_result(result) return result except Exception as e: logger.error(f"Guardrails system error: {e}") processing_time = time.time() - start_time return self._create_error_result(str(e), response, components_used, processing_time) def _calculate_approval( self, validation_result: ValidationResult, safety_result: SafetyResult, quality_score: QualityScore, citations: List[Citation], ) -> Dict[str, Any]: """Calculate overall approval decision.""" # Safety is mandatory if not safety_result.is_safe: return { "approved": False, "confidence": 0.0, "reason": f"Safety violation: {safety_result.risk_level} risk", } # Validation check if not validation_result.is_valid and self.config["strict_mode"]: return { "approved": False, "confidence": validation_result.confidence_score, "reason": "Validation failed in strict mode", } # Quality threshold min_threshold = self.config["min_confidence_threshold"] if quality_score.overall_score < min_threshold: return { "approved": False, "confidence": quality_score.overall_score, "reason": f"Quality below threshold ({min_threshold})", } # Citation requirement if self.config["response_validator"]["require_citations"] and not citations: return { "approved": False, "confidence": 0.5, "reason": "No citations provided", } # Calculate combined confidence confidence_factors = [ validation_result.confidence_score, safety_result.confidence, quality_score.overall_score, ] combined_confidence = sum(confidence_factors) / len(confidence_factors) return { "approved": True, "confidence": combined_confidence, "reason": "All validation checks passed", } def _enhance_response_with_citations(self, response: str, citations: List[Citation]) -> str: """Enhance response by adding formatted citations.""" if not citations: return response try: citation_text = self.source_attributor.format_citation_text(citations) return response + citation_text except Exception as e: logger.warning(f"Citation formatting failed: {e}") return response def _generate_recommendations( self, validation_result: ValidationResult, safety_result: SafetyResult, quality_score: QualityScore, citations: List[Citation], ) -> List[str]: """Generate actionable recommendations.""" recommendations = [] # From validation recommendations.extend(validation_result.suggestions) # From quality assessment recommendations.extend(quality_score.recommendations) # Safety recommendations if safety_result.risk_level != "low": recommendations.append("Review content for safety concerns") # Citation recommendations if not citations: recommendations.append("Add proper source citations") elif len(citations) < 2: recommendations.append("Consider adding more source citations") return list(set(recommendations)) # Remove duplicates def _create_rejection_result( self, reason: str, safety_result: SafetyResult, components_used: List[str], processing_time: float, ) -> GuardrailsResult: """Create result for rejected response.""" # Create minimal components for rejection validation_result = ValidationResult( is_valid=False, confidence_score=0.0, safety_passed=False, quality_score=0.0, issues=[reason], suggestions=["Address safety concerns before resubmitting"], ) quality_score = QualityScore( overall_score=0.0, relevance_score=0.0, completeness_score=0.0, coherence_score=0.0, source_fidelity_score=0.0, professionalism_score=0.0, response_length=0, citation_count=0, source_count=0, confidence_level="low", meets_threshold=False, strengths=[], weaknesses=[reason], recommendations=["Address safety violations"], ) return GuardrailsResult( is_approved=False, confidence_score=0.0, validation_result=validation_result, safety_result=safety_result, quality_score=quality_score, citations=[], processing_time=processing_time, components_used=components_used, fallbacks_applied=[], warnings=[reason], recommendations=["Address safety concerns"], filtered_response="", enhanced_response="", metadata={"rejection_reason": reason}, ) def _create_error_result( self, error_message: str, original_response: str, components_used: List[str], processing_time: float, ) -> GuardrailsResult: """Create result for system error.""" # Create error components validation_result = ValidationResult( is_valid=False, confidence_score=0.0, safety_passed=False, quality_score=0.0, issues=[f"System error: {error_message}"], suggestions=["Retry request or contact support"], ) safety_result = SafetyResult( is_safe=False, risk_level="high", issues_found=[f"System error: {error_message}"], filtered_content=original_response, confidence=0.0, ) quality_score = QualityScore( overall_score=0.0, relevance_score=0.0, completeness_score=0.0, coherence_score=0.0, source_fidelity_score=0.0, professionalism_score=0.0, response_length=len(original_response), citation_count=0, source_count=0, confidence_level="low", meets_threshold=False, strengths=[], weaknesses=["System error occurred"], recommendations=["Retry or contact support"], ) return GuardrailsResult( is_approved=False, confidence_score=0.0, validation_result=validation_result, safety_result=safety_result, quality_score=quality_score, citations=[], processing_time=processing_time, components_used=components_used, fallbacks_applied=[], warnings=[f"System error: {error_message}"], recommendations=["Retry request"], filtered_response=original_response, enhanced_response=original_response, metadata={"error": error_message}, ) def _log_result(self, result: GuardrailsResult) -> None: """Log guardrails result for monitoring.""" logger.info( f"Guardrails validation: approved={result.is_approved}, " f"confidence={result.confidence_score:.3f}, " f"components={len(result.components_used)}, " f"processing_time={result.processing_time:.3f}s" ) if not result.is_approved: rejection_reason = result.metadata.get("rejection_reason", "unknown") logger.warning(f"Response rejected: {rejection_reason}") if result.fallbacks_applied: logger.warning(f"Fallbacks applied: {result.fallbacks_applied}") def get_system_health(self) -> Dict[str, Any]: """Get health status of guardrails system.""" error_stats = self.error_handler.get_error_statistics() # Check if any circuit breakers are open circuit_breakers_open = any(error_stats.get("circuit_breakers", {}).values()) return { "status": "healthy" if not circuit_breakers_open else "degraded", "components": { "response_validator": "healthy", "content_filter": "healthy", "quality_metrics": "healthy", "source_attribution": "healthy", "error_handler": "healthy", }, "error_statistics": error_stats, "configuration": { "strict_mode": self.config["strict_mode"], "min_confidence_threshold": self.config["min_confidence_threshold"], "enable_response_enhancement": self.config["enable_response_enhancement"], }, }