Spaces:
Sleeping
Sleeping
| # fmt: off | |
| """ | |
| Error Handlers - Comprehensive error handling and fallbacks | |
| This module provides robust error handling, graceful degradation, | |
| and fallback mechanisms for the guardrails system. | |
| Updated: October 18, 2025 - CI/CD format fix attempt | |
| """ | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| class GuardrailsError(Exception): | |
| """Base exception for guardrails-related errors.""" | |
| def __init__( | |
| self, | |
| message: str, | |
| error_type: str = "unknown", | |
| details: Optional[Dict[str, Any]] = None, | |
| ): | |
| super().__init__(message) | |
| self.message = message | |
| self.error_type = error_type | |
| self.details = details or {} | |
| class ErrorContext: | |
| """Context information for error handling.""" | |
| component: str | |
| operation: str | |
| input_data: Dict[str, Any] | |
| error_message: str | |
| error_type: str | |
| timestamp: str | |
| recovery_attempted: bool = False | |
| recovery_successful: bool = False | |
| class ErrorHandler: | |
| """ | |
| Comprehensive error handling system for guardrails. | |
| Provides: | |
| - Graceful error recovery | |
| - Fallback mechanisms | |
| - Circuit breaker patterns | |
| - Detailed error logging and metrics | |
| """ | |
| def __init__(self, circuit_breaker_threshold: int = 5): | |
| self.error_history: List[ErrorContext] = [] | |
| self.circuit_breakers: Dict[str, Dict[str, Any]] = {} | |
| self.circuit_breaker_threshold = circuit_breaker_threshold | |
| def handle_error( | |
| self, | |
| error: Exception, | |
| component: str, | |
| operation: str, | |
| input_data: Dict[str, Any], | |
| recovery_strategy: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Handle an error with appropriate strategy. | |
| Args: | |
| error: The exception that occurred | |
| component: Component where error occurred | |
| operation: Operation being performed | |
| input_data: Input data when error occurred | |
| recovery_strategy: Strategy to use for recovery | |
| Returns: | |
| Dictionary with error handling results | |
| """ | |
| from datetime import datetime | |
| error_context = ErrorContext( | |
| component=component, | |
| operation=operation, | |
| input_data=input_data, | |
| error_message=str(error), | |
| error_type=type(error).__name__, | |
| timestamp=datetime.now().isoformat(), | |
| ) | |
| # Log the error | |
| logger.error( | |
| f"Error in {component}.{operation}: {error_context.error_message}", | |
| extra={ | |
| "component": component, | |
| "operation": operation, | |
| "error_type": error_context.error_type, | |
| "details": error_context.input_data, | |
| }, | |
| ) | |
| # Update circuit breaker | |
| self._update_circuit_breaker(component) | |
| # Try recovery if not in circuit breaker state | |
| recovery_result = None | |
| if not self._is_circuit_breaker_open(component): | |
| recovery_result = self._attempt_recovery( | |
| error_context, recovery_strategy | |
| ) | |
| # Store error in history | |
| self.error_history.append(error_context) | |
| # Maintain history size (keep last 1000 errors) | |
| if len(self.error_history) > 1000: | |
| self.error_history = self.error_history[-1000:] | |
| return { | |
| "error_handled": True, | |
| "error_context": error_context, | |
| "recovery_attempted": recovery_result is not None, | |
| "recovery_successful": recovery_result.get("success", False) if recovery_result else False, | |
| "circuit_breaker_open": self._is_circuit_breaker_open(component), | |
| "fallback_available": self._has_fallback(component, operation), | |
| } | |
| def _attempt_recovery( | |
| self, error_context: ErrorContext, strategy: Optional[str] = None | |
| ) -> Optional[Dict[str, Any]]: | |
| """Attempt to recover from error using specified strategy.""" | |
| error_context.recovery_attempted = True | |
| if strategy == "retry": | |
| return self._retry_operation(error_context) | |
| elif strategy == "fallback": | |
| return self._use_fallback(error_context) | |
| elif strategy == "degrade": | |
| return self._graceful_degradation(error_context) | |
| else: | |
| # Auto-select strategy based on error type | |
| return self._auto_recovery(error_context) | |
| def _retry_operation(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Attempt to retry the failed operation.""" | |
| try: | |
| # This would implement actual retry logic | |
| # For now, we simulate a recovery attempt | |
| logger.info( | |
| f"Retrying operation {error_context.operation} in {error_context.component}" | |
| ) | |
| # Simulate retry success/failure | |
| import random | |
| success = random.random() > 0.3 # 70% success rate for simulation | |
| if success: | |
| error_context.recovery_successful = True | |
| logger.info(f"Retry successful for {error_context.component}.{error_context.operation}") | |
| else: | |
| logger.warning(f"Retry failed for {error_context.component}.{error_context.operation}") | |
| return {"success": success, "strategy": "retry", "attempts": 1} | |
| except Exception as e: | |
| logger.error(f"Retry operation failed: {e}") | |
| return {"success": False, "strategy": "retry", "error": str(e)} | |
| def _use_fallback(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Use fallback mechanism for the failed operation.""" | |
| try: | |
| fallback_response = self._generate_fallback_response(error_context) | |
| error_context.recovery_successful = True | |
| logger.info( | |
| f"Fallback used for {error_context.component}.{error_context.operation}" | |
| ) | |
| return { | |
| "success": True, | |
| "strategy": "fallback", | |
| "response": fallback_response, | |
| } | |
| except Exception as e: | |
| logger.error(f"Fallback failed: {e}") | |
| return {"success": False, "strategy": "fallback", "error": str(e)} | |
| def _graceful_degradation(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Implement graceful degradation.""" | |
| try: | |
| degraded_response = self._generate_degraded_response(error_context) | |
| error_context.recovery_successful = True | |
| logger.info( | |
| f"Graceful degradation for {error_context.component}.{error_context.operation}" | |
| ) | |
| return { | |
| "success": True, | |
| "strategy": "degrade", | |
| "response": degraded_response, | |
| } | |
| except Exception as e: | |
| logger.error(f"Graceful degradation failed: {e}") | |
| return {"success": False, "strategy": "degrade", "error": str(e)} | |
| def _auto_recovery(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Auto-select recovery strategy based on error context.""" | |
| # Select strategy based on error type and component | |
| if error_context.error_type in ["ConnectionError", "TimeoutError"]: | |
| return self._retry_operation(error_context) | |
| elif error_context.component in ["llm", "vector_store"]: | |
| return self._use_fallback(error_context) | |
| else: | |
| return self._graceful_degradation(error_context) | |
| def _generate_fallback_response(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Generate a fallback response for the failed operation.""" | |
| if error_context.component == "llm": | |
| return { | |
| "response": "I apologize, but I'm experiencing technical difficulties. Please try your question again or rephrase it.", | |
| "confidence": 0.1, | |
| "source": "fallback_handler", | |
| "citations": [], | |
| } | |
| elif error_context.component == "vector_store": | |
| return { | |
| "documents": [], | |
| "scores": [], | |
| "message": "Search temporarily unavailable. Please try again.", | |
| } | |
| else: | |
| return { | |
| "result": None, | |
| "status": "error", | |
| "message": f"Service temporarily unavailable in {error_context.component}", | |
| } | |
| def _generate_degraded_response(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Generate a degraded response with limited functionality.""" | |
| return { | |
| "result": "limited_functionality", | |
| "message": f"Operating in degraded mode for {error_context.component}", | |
| "available_operations": ["basic_query", "status_check"], | |
| "degradation_reason": error_context.error_message, | |
| } | |
| def _update_circuit_breaker(self, component: str) -> None: | |
| """Update circuit breaker state for component.""" | |
| from datetime import datetime, timedelta | |
| if component not in self.circuit_breakers: | |
| self.circuit_breakers[component] = { | |
| "failure_count": 0, | |
| "last_failure": None, | |
| "is_open": False, | |
| } | |
| breaker = self.circuit_breakers[component] | |
| breaker["failure_count"] += 1 | |
| breaker["last_failure"] = datetime.now() | |
| # Open circuit breaker if threshold exceeded | |
| if breaker["failure_count"] >= self.circuit_breaker_threshold: | |
| breaker["is_open"] = True | |
| logger.warning( | |
| f"Circuit breaker opened for {component} " | |
| f"(failures: {breaker['failure_count']})" | |
| ) | |
| # Auto-reset after 5 minutes | |
| if breaker["is_open"] and breaker["last_failure"]: | |
| if datetime.now() - breaker["last_failure"] > timedelta(minutes=5): | |
| breaker["is_open"] = False | |
| breaker["failure_count"] = 0 | |
| logger.info(f"Circuit breaker auto-reset for {component}") | |
| def _is_circuit_breaker_open(self, component: str) -> bool: | |
| """Check if circuit breaker is open for component.""" | |
| return self.circuit_breakers.get(component, {}).get("is_open", False) | |
| def _has_fallback(self, component: str, operation: str) -> bool: | |
| """Check if fallback is available for component/operation.""" | |
| fallback_components = ["llm", "vector_store", "guardrails"] | |
| return component in fallback_components | |
| def get_error_statistics(self) -> Dict[str, Any]: | |
| """Get comprehensive error statistics.""" | |
| if not self.error_history: | |
| return {"total_errors": 0, "component_errors": {}, "most_common_errors": []} | |
| total_errors = len(self.error_history) | |
| component_errors = {} | |
| error_types = {} | |
| for error in self.error_history: | |
| component = error.component | |
| error_type = error.error_type | |
| component_errors[component] = component_errors.get(component, 0) + 1 | |
| error_types[error_type] = error_types.get(error_type, 0) + 1 | |
| # Get most common errors | |
| most_common = sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:5] | |
| # Component health status | |
| component_health = {} | |
| for component, breaker in self.circuit_breakers.items(): | |
| component_health[component] = { | |
| "status": "degraded" if breaker["is_open"] else "healthy", | |
| "failure_count": breaker["failure_count"], | |
| "is_circuit_breaker_open": breaker["is_open"], | |
| } | |
| return { | |
| "total_errors": total_errors, | |
| "component_errors": component_errors, | |
| "most_common_errors": most_common, | |
| "component_health": component_health, | |
| "circuit_breakers": { | |
| k: v["is_open"] for k, v in self.circuit_breakers.items() | |
| }, | |
| } | |
| def reset_circuit_breaker(self, component: str) -> bool: | |
| """Manually reset circuit breaker for component.""" | |
| if component in self.circuit_breakers: | |
| self.circuit_breakers[component] = { | |
| "failure_count": 0, | |
| "last_failure": None, | |
| "is_open": False, | |
| } | |
| logger.info(f"Circuit breaker reset for {component}") | |
| return True | |
| return False | |
| def clear_error_history(self) -> None: | |
| """Clear error history.""" | |
| self.error_history.clear() | |
| logger.info("Error history cleared") | |
| class FallbackResponseGenerator: | |
| """Generates fallback responses when primary systems fail.""" | |
| def generate_llm_fallback(query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Generate a fallback LLM response.""" | |
| fallback_responses = [ | |
| "I apologize, but I'm experiencing technical difficulties. Please try your question again.", | |
| "The service is temporarily unavailable. Please rephrase your question or try again later.", | |
| "I'm having trouble processing your request right now. Could you try a simpler question?", | |
| ] | |
| import random | |
| response = random.choice(fallback_responses) | |
| return { | |
| "response": response, | |
| "confidence": 0.1, | |
| "source": "fallback_generator", | |
| "citations": [], | |
| "fallback": True, | |
| } | |
| def generate_search_fallback(query: str) -> Dict[str, Any]: | |
| """Generate a fallback search response.""" | |
| return { | |
| "documents": [], | |
| "scores": [], | |
| "message": "Search service temporarily unavailable. Please try again later.", | |
| "fallback": True, | |
| } | |
| def generate_generic_fallback(operation: str, error_message: str) -> Dict[str, Any]: | |
| """Generate a generic fallback response.""" | |
| return { | |
| "result": None, | |
| "status": "service_unavailable", | |
| "message": f"The {operation} service is temporarily unavailable.", | |
| "error_summary": error_message, | |
| "fallback": True, | |
| "suggested_actions": [ | |
| "Please try again in a few moments", | |
| "Check your internet connection", | |
| "Contact support if the problem persists", | |
| ], | |
| } | |
| # fmt: on | |