msse-ai-engineering / src /guardrails /guardrails_system.py
sethmcknight
Refactor test cases for improved readability and consistency
159faf0
"""
Guardrails System - Main orchestrator for comprehensive response validation
This module provides the main GuardrailsSystem class that coordinates
all guardrails components for comprehensive response validation.
"""
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from .content_filters import ContentFilter, SafetyResult
from .error_handlers import ErrorHandler, GuardrailsError
from .quality_metrics import QualityMetrics, QualityScore
from .response_validator import ResponseValidator, ValidationResult
from .source_attribution import Citation, SourceAttributor
logger = logging.getLogger(__name__)
@dataclass
class GuardrailsResult:
"""Comprehensive result from guardrails validation."""
is_approved: bool
confidence_score: float
# Component results
validation_result: ValidationResult
safety_result: SafetyResult
quality_score: QualityScore
citations: List[Citation]
# Processing metadata
processing_time: float
components_used: List[str]
fallbacks_applied: List[str]
warnings: List[str]
recommendations: List[str]
# Final response data
filtered_response: str
enhanced_response: str # Response with citations
metadata: Dict[str, Any]
class GuardrailsSystem:
"""
Main guardrails system orchestrating all validation components.
Provides comprehensive response validation including:
- Response quality and safety validation
- Content filtering and PII protection
- Source attribution and citation generation
- Quality scoring and recommendations
- Error handling and graceful fallbacks
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize GuardrailsSystem with configuration.
Args:
config: Configuration dictionary for all guardrails components
"""
self.config = config or self._get_default_config()
# Initialize components
self.response_validator = ResponseValidator(self.config.get("response_validator", {}))
self.content_filter = ContentFilter(self.config.get("content_filter", {}))
self.quality_metrics = QualityMetrics(self.config.get("quality_metrics", {}))
self.source_attributor = SourceAttributor(self.config.get("source_attribution", {}))
self.error_handler = ErrorHandler(self.config.get("error_handler", {}))
logger.info("GuardrailsSystem initialized with all components")
def _get_default_config(self) -> Dict[str, Any]:
"""Get default configuration for guardrails system."""
return {
"enable_all_checks": True,
"strict_mode": False,
"require_approval": True,
"min_confidence_threshold": 0.7,
"enable_response_enhancement": True,
"log_all_results": True,
"response_validator": {
"min_overall_quality": 0.7,
"require_citations": True,
"min_response_length": 10,
"max_response_length": 2000,
"enable_safety_checks": True,
"enable_coherence_check": True,
"enable_completeness_check": True,
"enable_relevance_check": True,
},
"content_filter": {
"enable_pii_filtering": True,
"enable_bias_detection": True,
"enable_inappropriate_filter": True,
"enable_topic_validation": True,
"strict_mode": False,
"mask_pii": True,
"allowed_topics": [
"corporate policy",
"employee handbook",
"workplace guidelines",
"company procedures",
"benefits",
"hr policies",
],
"pii_mask_char": "*",
"max_bias_score": 0.3,
"min_professionalism_score": 0.7,
"safety_threshold": 0.8,
},
"quality_metrics": {
"quality_threshold": 0.7,
"relevance_weight": 0.3,
"completeness_weight": 0.25,
"coherence_weight": 0.2,
"source_fidelity_weight": 0.25,
"min_response_length": 50,
"target_response_length": 300,
"max_response_length": 1000,
"min_citation_count": 1,
"preferred_source_count": 3,
"enable_detailed_analysis": True,
"enable_relevance_scoring": True,
"enable_completeness_scoring": True,
"enable_coherence_scoring": True,
"enable_source_fidelity_scoring": True,
"enable_professionalism_scoring": True,
},
"source_attribution": {
"max_citations": 5,
"citation_format": "numbered",
"max_excerpt_length": 200,
"require_document_names": True,
"min_source_confidence": 0.5,
"min_confidence_for_citation": 0.3,
"enable_quote_extraction": True,
},
"error_handler": {
"enable_fallbacks": True,
"graceful_degradation": True,
"max_retries": 3,
"enable_circuit_breaker": True,
"failure_threshold": 5,
"recovery_timeout": 60,
},
}
def validate_response(
self,
response: str,
query: str,
sources: List[Dict[str, Any]],
context: Optional[str] = None,
) -> GuardrailsResult:
"""
Perform comprehensive validation of RAG response.
Args:
response: Generated response text
query: Original user query
sources: Source documents used for generation
context: Optional additional context
Returns:
GuardrailsResult with comprehensive validation results
"""
import time
start_time = time.time()
components_used = []
fallbacks_applied = []
warnings = []
try:
# 1. Content Safety Filtering
try:
safety_result = self.content_filter.filter_content(response, context)
components_used.append("content_filter")
if not safety_result.is_safe and self.config["strict_mode"]:
return self._create_rejection_result(
"Content safety validation failed",
safety_result,
components_used,
time.time() - start_time,
)
except Exception as e:
logger.warning(f"Content filtering failed: {e}")
safety_recovery = self.error_handler.handle_content_filter_error(e, response, context)
# Create SafetyResult from recovery data
safety_result = SafetyResult(
is_safe=safety_recovery.get("is_safe", True),
risk_level=safety_recovery.get("risk_level", "medium"),
issues_found=safety_recovery.get("issues_found", ["Recovery applied"]),
filtered_content=safety_recovery.get("filtered_content", response),
confidence=safety_recovery.get("confidence", 0.5),
)
fallbacks_applied.append("content_filter_fallback")
warnings.append("Content filtering used fallback")
# Use filtered content for subsequent checks
filtered_response = safety_result.filtered_content
# 2. Response Validation
try:
validation_result = self.response_validator.validate_response(filtered_response, sources, query)
components_used.append("response_validator")
except Exception as e:
logger.warning(f"Response validation failed: {e}")
validation_recovery = self.error_handler.handle_validation_error(
e, filtered_response, {"query": query, "sources": sources}
)
if validation_recovery["success"]:
validation_result = validation_recovery["result"]
fallbacks_applied.append("validation_fallback")
else:
# Critical failure
raise GuardrailsError(
"Response validation failed critically",
"validation_failure",
{"original_error": str(e)},
)
# 3. Quality Assessment
try:
quality_score = self.quality_metrics.calculate_quality_score(filtered_response, query, sources, context)
components_used.append("quality_metrics")
except Exception as e:
logger.warning(f"Quality assessment failed: {e}")
quality_recovery = self.error_handler.handle_quality_metrics_error(e, filtered_response, query, sources)
if quality_recovery["success"]:
quality_score = quality_recovery["quality_score"]
fallbacks_applied.append("quality_metrics_fallback")
else:
# Use minimal fallback score
quality_score = QualityScore(
overall_score=0.5,
relevance_score=0.5,
completeness_score=0.5,
coherence_score=0.5,
source_fidelity_score=0.5,
professionalism_score=0.5,
response_length=len(filtered_response),
citation_count=0,
source_count=len(sources),
confidence_level="low",
meets_threshold=False,
strengths=[],
weaknesses=["Quality assessment failed"],
recommendations=["Manual review required"],
)
fallbacks_applied.append("quality_score_minimal_fallback")
# 4. Source Attribution
try:
citations = self.source_attributor.generate_citations(filtered_response, sources)
components_used.append("source_attribution")
except Exception as e:
logger.warning(f"Source attribution failed: {e}")
citation_recovery = self.error_handler.handle_source_attribution_error(e, filtered_response, sources)
citations = citation_recovery.get("citations", [])
fallbacks_applied.append("citation_fallback")
# 5. Calculate Overall Approval
approval_decision = self._calculate_approval(validation_result, safety_result, quality_score, citations)
# 6. Enhance Response (if approved and enabled)
enhanced_response = filtered_response
if approval_decision["approved"] and self.config["enable_response_enhancement"]:
enhanced_response = self._enhance_response_with_citations(filtered_response, citations)
# 7. Generate Recommendations
recommendations = self._generate_recommendations(validation_result, safety_result, quality_score, citations)
processing_time = time.time() - start_time
# Create final result
result = GuardrailsResult(
is_approved=approval_decision["approved"],
confidence_score=approval_decision["confidence"],
validation_result=validation_result,
safety_result=safety_result,
quality_score=quality_score,
citations=citations,
processing_time=processing_time,
components_used=components_used,
fallbacks_applied=fallbacks_applied,
warnings=warnings,
recommendations=recommendations,
filtered_response=filtered_response,
enhanced_response=enhanced_response,
metadata={
"query": query,
"source_count": len(sources),
"approval_reason": approval_decision["reason"],
},
)
if self.config["log_all_results"]:
self._log_result(result)
return result
except Exception as e:
logger.error(f"Guardrails system error: {e}")
processing_time = time.time() - start_time
return self._create_error_result(str(e), response, components_used, processing_time)
def _calculate_approval(
self,
validation_result: ValidationResult,
safety_result: SafetyResult,
quality_score: QualityScore,
citations: List[Citation],
) -> Dict[str, Any]:
"""Calculate overall approval decision."""
# Safety is mandatory
if not safety_result.is_safe:
return {
"approved": False,
"confidence": 0.0,
"reason": f"Safety violation: {safety_result.risk_level} risk",
}
# Validation check
if not validation_result.is_valid and self.config["strict_mode"]:
return {
"approved": False,
"confidence": validation_result.confidence_score,
"reason": "Validation failed in strict mode",
}
# Quality threshold
min_threshold = self.config["min_confidence_threshold"]
if quality_score.overall_score < min_threshold:
return {
"approved": False,
"confidence": quality_score.overall_score,
"reason": f"Quality below threshold ({min_threshold})",
}
# Citation requirement
if self.config["response_validator"]["require_citations"] and not citations:
return {
"approved": False,
"confidence": 0.5,
"reason": "No citations provided",
}
# Calculate combined confidence
confidence_factors = [
validation_result.confidence_score,
safety_result.confidence,
quality_score.overall_score,
]
combined_confidence = sum(confidence_factors) / len(confidence_factors)
return {
"approved": True,
"confidence": combined_confidence,
"reason": "All validation checks passed",
}
def _enhance_response_with_citations(self, response: str, citations: List[Citation]) -> str:
"""Enhance response by adding formatted citations."""
if not citations:
return response
try:
citation_text = self.source_attributor.format_citation_text(citations)
return response + citation_text
except Exception as e:
logger.warning(f"Citation formatting failed: {e}")
return response
def _generate_recommendations(
self,
validation_result: ValidationResult,
safety_result: SafetyResult,
quality_score: QualityScore,
citations: List[Citation],
) -> List[str]:
"""Generate actionable recommendations."""
recommendations = []
# From validation
recommendations.extend(validation_result.suggestions)
# From quality assessment
recommendations.extend(quality_score.recommendations)
# Safety recommendations
if safety_result.risk_level != "low":
recommendations.append("Review content for safety concerns")
# Citation recommendations
if not citations:
recommendations.append("Add proper source citations")
elif len(citations) < 2:
recommendations.append("Consider adding more source citations")
return list(set(recommendations)) # Remove duplicates
def _create_rejection_result(
self,
reason: str,
safety_result: SafetyResult,
components_used: List[str],
processing_time: float,
) -> GuardrailsResult:
"""Create result for rejected response."""
# Create minimal components for rejection
validation_result = ValidationResult(
is_valid=False,
confidence_score=0.0,
safety_passed=False,
quality_score=0.0,
issues=[reason],
suggestions=["Address safety concerns before resubmitting"],
)
quality_score = QualityScore(
overall_score=0.0,
relevance_score=0.0,
completeness_score=0.0,
coherence_score=0.0,
source_fidelity_score=0.0,
professionalism_score=0.0,
response_length=0,
citation_count=0,
source_count=0,
confidence_level="low",
meets_threshold=False,
strengths=[],
weaknesses=[reason],
recommendations=["Address safety violations"],
)
return GuardrailsResult(
is_approved=False,
confidence_score=0.0,
validation_result=validation_result,
safety_result=safety_result,
quality_score=quality_score,
citations=[],
processing_time=processing_time,
components_used=components_used,
fallbacks_applied=[],
warnings=[reason],
recommendations=["Address safety concerns"],
filtered_response="",
enhanced_response="",
metadata={"rejection_reason": reason},
)
def _create_error_result(
self,
error_message: str,
original_response: str,
components_used: List[str],
processing_time: float,
) -> GuardrailsResult:
"""Create result for system error."""
# Create error components
validation_result = ValidationResult(
is_valid=False,
confidence_score=0.0,
safety_passed=False,
quality_score=0.0,
issues=[f"System error: {error_message}"],
suggestions=["Retry request or contact support"],
)
safety_result = SafetyResult(
is_safe=False,
risk_level="high",
issues_found=[f"System error: {error_message}"],
filtered_content=original_response,
confidence=0.0,
)
quality_score = QualityScore(
overall_score=0.0,
relevance_score=0.0,
completeness_score=0.0,
coherence_score=0.0,
source_fidelity_score=0.0,
professionalism_score=0.0,
response_length=len(original_response),
citation_count=0,
source_count=0,
confidence_level="low",
meets_threshold=False,
strengths=[],
weaknesses=["System error occurred"],
recommendations=["Retry or contact support"],
)
return GuardrailsResult(
is_approved=False,
confidence_score=0.0,
validation_result=validation_result,
safety_result=safety_result,
quality_score=quality_score,
citations=[],
processing_time=processing_time,
components_used=components_used,
fallbacks_applied=[],
warnings=[f"System error: {error_message}"],
recommendations=["Retry request"],
filtered_response=original_response,
enhanced_response=original_response,
metadata={"error": error_message},
)
def _log_result(self, result: GuardrailsResult) -> None:
"""Log guardrails result for monitoring."""
logger.info(
f"Guardrails validation: approved={result.is_approved}, "
f"confidence={result.confidence_score:.3f}, "
f"components={len(result.components_used)}, "
f"processing_time={result.processing_time:.3f}s"
)
if not result.is_approved:
rejection_reason = result.metadata.get("rejection_reason", "unknown")
logger.warning(f"Response rejected: {rejection_reason}")
if result.fallbacks_applied:
logger.warning(f"Fallbacks applied: {result.fallbacks_applied}")
def get_system_health(self) -> Dict[str, Any]:
"""Get health status of guardrails system."""
error_stats = self.error_handler.get_error_statistics()
# Check if any circuit breakers are open
circuit_breakers_open = any(error_stats.get("circuit_breakers", {}).values())
return {
"status": "healthy" if not circuit_breakers_open else "degraded",
"components": {
"response_validator": "healthy",
"content_filter": "healthy",
"quality_metrics": "healthy",
"source_attribution": "healthy",
"error_handler": "healthy",
},
"error_statistics": error_stats,
"configuration": {
"strict_mode": self.config["strict_mode"],
"min_confidence_threshold": self.config["min_confidence_threshold"],
"enable_response_enhancement": self.config["enable_response_enhancement"],
},
}