File size: 14,588 Bytes
01d5d1b
135f0d6
 
 
 
 
7169a55
933e63c
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01d5d1b
135f0d6
 
01d5d1b
135f0d6
 
01d5d1b
135f0d6
01d5d1b
 
 
 
 
 
 
135f0d6
 
01d5d1b
135f0d6
 
01d5d1b
 
 
 
 
135f0d6
 
01d5d1b
135f0d6
01d5d1b
135f0d6
01d5d1b
 
 
 
 
 
 
 
135f0d6
01d5d1b
 
 
 
 
 
 
 
 
 
135f0d6
01d5d1b
 
135f0d6
01d5d1b
 
 
 
 
135f0d6
 
01d5d1b
 
135f0d6
01d5d1b
 
 
135f0d6
01d5d1b
 
 
 
 
 
 
 
135f0d6
01d5d1b
 
 
 
 
135f0d6
01d5d1b
 
 
 
 
 
 
 
 
 
 
 
135f0d6
01d5d1b
 
 
 
135f0d6
 
01d5d1b
 
 
135f0d6
01d5d1b
 
 
 
 
135f0d6
01d5d1b
 
 
 
135f0d6
01d5d1b
 
 
 
 
 
 
 
135f0d6
01d5d1b
 
 
135f0d6
01d5d1b
 
 
135f0d6
01d5d1b
 
135f0d6
01d5d1b
 
 
 
135f0d6
 
 
01d5d1b
 
135f0d6
01d5d1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135f0d6
01d5d1b
 
 
 
135f0d6
01d5d1b
135f0d6
01d5d1b
 
 
135f0d6
01d5d1b
 
 
 
 
135f0d6
 
01d5d1b
 
135f0d6
01d5d1b
 
 
 
135f0d6
 
01d5d1b
 
 
135f0d6
 
 
 
 
 
 
 
 
 
01d5d1b
135f0d6
01d5d1b
 
135f0d6
01d5d1b
 
 
 
135f0d6
01d5d1b
 
 
 
 
 
135f0d6
01d5d1b
 
 
135f0d6
01d5d1b
 
 
 
135f0d6
 
01d5d1b
135f0d6
01d5d1b
135f0d6
 
 
 
 
 
 
 
 
 
 
 
01d5d1b
135f0d6
 
01d5d1b
135f0d6
 
 
01d5d1b
135f0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01d5d1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# fmt: off
"""
Error Handlers - Comprehensive error handling and fallbacks

This module provides robust error handling, graceful degradation,
and fallback mechanisms for the guardrails system.

Updated: October 18, 2025 - CI/CD format fix attempt
"""

import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


class GuardrailsError(Exception):
    """Base exception for guardrails-related errors."""

    def __init__(
        self,
        message: str,
        error_type: str = "unknown",
        details: Optional[Dict[str, Any]] = None,
    ):
        super().__init__(message)
        self.message = message
        self.error_type = error_type
        self.details = details or {}


@dataclass
class ErrorContext:
    """Context information for error handling."""

    component: str
    operation: str
    input_data: Dict[str, Any]
    error_message: str
    error_type: str
    timestamp: str
    recovery_attempted: bool = False
    recovery_successful: bool = False


class ErrorHandler:
    """
    Comprehensive error handling system for guardrails.

    Provides:
    - Graceful error recovery
    - Fallback mechanisms
    - Circuit breaker patterns
    - Detailed error logging and metrics
    """

    def __init__(self, circuit_breaker_threshold: int = 5):
        self.error_history: List[ErrorContext] = []
        self.circuit_breakers: Dict[str, Dict[str, Any]] = {}
        self.circuit_breaker_threshold = circuit_breaker_threshold

    def handle_error(
        self,
        error: Exception,
        component: str,
        operation: str,
        input_data: Dict[str, Any],
        recovery_strategy: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Handle an error with appropriate strategy.

        Args:
            error: The exception that occurred
            component: Component where error occurred
            operation: Operation being performed
            input_data: Input data when error occurred
            recovery_strategy: Strategy to use for recovery

        Returns:
            Dictionary with error handling results
        """
        from datetime import datetime

        error_context = ErrorContext(
            component=component,
            operation=operation,
            input_data=input_data,
            error_message=str(error),
            error_type=type(error).__name__,
            timestamp=datetime.now().isoformat(),
        )

        # Log the error
        logger.error(
            f"Error in {component}.{operation}: {error_context.error_message}",
            extra={
                "component": component,
                "operation": operation,
                "error_type": error_context.error_type,
                "details": error_context.input_data,
            },
        )

        # Update circuit breaker
        self._update_circuit_breaker(component)

        # Try recovery if not in circuit breaker state
        recovery_result = None
        if not self._is_circuit_breaker_open(component):
            recovery_result = self._attempt_recovery(
                error_context, recovery_strategy
            )

        # Store error in history
        self.error_history.append(error_context)

        # Maintain history size (keep last 1000 errors)
        if len(self.error_history) > 1000:
            self.error_history = self.error_history[-1000:]

        return {
            "error_handled": True,
            "error_context": error_context,
            "recovery_attempted": recovery_result is not None,
            "recovery_successful": recovery_result.get("success", False) if recovery_result else False,
            "circuit_breaker_open": self._is_circuit_breaker_open(component),
            "fallback_available": self._has_fallback(component, operation),
        }

    def _attempt_recovery(
        self, error_context: ErrorContext, strategy: Optional[str] = None
    ) -> Optional[Dict[str, Any]]:
        """Attempt to recover from error using specified strategy."""
        error_context.recovery_attempted = True

        if strategy == "retry":
            return self._retry_operation(error_context)
        elif strategy == "fallback":
            return self._use_fallback(error_context)
        elif strategy == "degrade":
            return self._graceful_degradation(error_context)
        else:
            # Auto-select strategy based on error type
            return self._auto_recovery(error_context)

    def _retry_operation(self, error_context: ErrorContext) -> Dict[str, Any]:
        """Attempt to retry the failed operation."""
        try:
            # This would implement actual retry logic
            # For now, we simulate a recovery attempt
            logger.info(
                f"Retrying operation {error_context.operation} in {error_context.component}"
            )

            # Simulate retry success/failure
            import random
            success = random.random() > 0.3  # 70% success rate for simulation

            if success:
                error_context.recovery_successful = True
                logger.info(f"Retry successful for {error_context.component}.{error_context.operation}")
            else:
                logger.warning(f"Retry failed for {error_context.component}.{error_context.operation}")

            return {"success": success, "strategy": "retry", "attempts": 1}
        except Exception as e:
            logger.error(f"Retry operation failed: {e}")
            return {"success": False, "strategy": "retry", "error": str(e)}

    def _use_fallback(self, error_context: ErrorContext) -> Dict[str, Any]:
        """Use fallback mechanism for the failed operation."""
        try:
            fallback_response = self._generate_fallback_response(error_context)
            error_context.recovery_successful = True
            logger.info(
                f"Fallback used for {error_context.component}.{error_context.operation}"
            )
            return {
                "success": True,
                "strategy": "fallback",
                "response": fallback_response,
            }
        except Exception as e:
            logger.error(f"Fallback failed: {e}")
            return {"success": False, "strategy": "fallback", "error": str(e)}

    def _graceful_degradation(self, error_context: ErrorContext) -> Dict[str, Any]:
        """Implement graceful degradation."""
        try:
            degraded_response = self._generate_degraded_response(error_context)
            error_context.recovery_successful = True
            logger.info(
                f"Graceful degradation for {error_context.component}.{error_context.operation}"
            )
            return {
                "success": True,
                "strategy": "degrade",
                "response": degraded_response,
            }
        except Exception as e:
            logger.error(f"Graceful degradation failed: {e}")
            return {"success": False, "strategy": "degrade", "error": str(e)}

    def _auto_recovery(self, error_context: ErrorContext) -> Dict[str, Any]:
        """Auto-select recovery strategy based on error context."""
        # Select strategy based on error type and component
        if error_context.error_type in ["ConnectionError", "TimeoutError"]:
            return self._retry_operation(error_context)
        elif error_context.component in ["llm", "vector_store"]:
            return self._use_fallback(error_context)
        else:
            return self._graceful_degradation(error_context)

    def _generate_fallback_response(self, error_context: ErrorContext) -> Dict[str, Any]:
        """Generate a fallback response for the failed operation."""
        if error_context.component == "llm":
            return {
                "response": "I apologize, but I'm experiencing technical difficulties. Please try your question again or rephrase it.",
                "confidence": 0.1,
                "source": "fallback_handler",
                "citations": [],
            }
        elif error_context.component == "vector_store":
            return {
                "documents": [],
                "scores": [],
                "message": "Search temporarily unavailable. Please try again.",
            }
        else:
            return {
                "result": None,
                "status": "error",
                "message": f"Service temporarily unavailable in {error_context.component}",
            }

    def _generate_degraded_response(self, error_context: ErrorContext) -> Dict[str, Any]:
        """Generate a degraded response with limited functionality."""
        return {
            "result": "limited_functionality",
            "message": f"Operating in degraded mode for {error_context.component}",
            "available_operations": ["basic_query", "status_check"],
            "degradation_reason": error_context.error_message,
        }

    def _update_circuit_breaker(self, component: str) -> None:
        """Update circuit breaker state for component."""
        from datetime import datetime, timedelta

        if component not in self.circuit_breakers:
            self.circuit_breakers[component] = {
                "failure_count": 0,
                "last_failure": None,
                "is_open": False,
            }

        breaker = self.circuit_breakers[component]
        breaker["failure_count"] += 1
        breaker["last_failure"] = datetime.now()

        # Open circuit breaker if threshold exceeded
        if breaker["failure_count"] >= self.circuit_breaker_threshold:
            breaker["is_open"] = True
            logger.warning(
                f"Circuit breaker opened for {component} "
                f"(failures: {breaker['failure_count']})"
            )

        # Auto-reset after 5 minutes
        if breaker["is_open"] and breaker["last_failure"]:
            if datetime.now() - breaker["last_failure"] > timedelta(minutes=5):
                breaker["is_open"] = False
                breaker["failure_count"] = 0
                logger.info(f"Circuit breaker auto-reset for {component}")

    def _is_circuit_breaker_open(self, component: str) -> bool:
        """Check if circuit breaker is open for component."""
        return self.circuit_breakers.get(component, {}).get("is_open", False)

    def _has_fallback(self, component: str, operation: str) -> bool:
        """Check if fallback is available for component/operation."""
        fallback_components = ["llm", "vector_store", "guardrails"]
        return component in fallback_components

    def get_error_statistics(self) -> Dict[str, Any]:
        """Get comprehensive error statistics."""
        if not self.error_history:
            return {"total_errors": 0, "component_errors": {}, "most_common_errors": []}

        total_errors = len(self.error_history)
        component_errors = {}
        error_types = {}

        for error in self.error_history:
            component = error.component
            error_type = error.error_type

            component_errors[component] = component_errors.get(component, 0) + 1
            error_types[error_type] = error_types.get(error_type, 0) + 1

        # Get most common errors
        most_common = sorted(error_types.items(), key=lambda x: x[1], reverse=True)[:5]

        # Component health status
        component_health = {}
        for component, breaker in self.circuit_breakers.items():
            component_health[component] = {
                "status": "degraded" if breaker["is_open"] else "healthy",
                "failure_count": breaker["failure_count"],
                "is_circuit_breaker_open": breaker["is_open"],
            }

        return {
            "total_errors": total_errors,
            "component_errors": component_errors,
            "most_common_errors": most_common,
            "component_health": component_health,
            "circuit_breakers": {
                k: v["is_open"] for k, v in self.circuit_breakers.items()
            },
        }

    def reset_circuit_breaker(self, component: str) -> bool:
        """Manually reset circuit breaker for component."""
        if component in self.circuit_breakers:
            self.circuit_breakers[component] = {
                "failure_count": 0,
                "last_failure": None,
                "is_open": False,
            }
            logger.info(f"Circuit breaker reset for {component}")
            return True
        return False

    def clear_error_history(self) -> None:
        """Clear error history."""
        self.error_history.clear()
        logger.info("Error history cleared")


class FallbackResponseGenerator:
    """Generates fallback responses when primary systems fail."""

    @staticmethod
    def generate_llm_fallback(query: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Generate a fallback LLM response."""
        fallback_responses = [
            "I apologize, but I'm experiencing technical difficulties. Please try your question again.",
            "The service is temporarily unavailable. Please rephrase your question or try again later.",
            "I'm having trouble processing your request right now. Could you try a simpler question?",
        ]

        import random
        response = random.choice(fallback_responses)

        return {
            "response": response,
            "confidence": 0.1,
            "source": "fallback_generator",
            "citations": [],
            "fallback": True,
        }

    @staticmethod
    def generate_search_fallback(query: str) -> Dict[str, Any]:
        """Generate a fallback search response."""
        return {
            "documents": [],
            "scores": [],
            "message": "Search service temporarily unavailable. Please try again later.",
            "fallback": True,
        }

    @staticmethod
    def generate_generic_fallback(operation: str, error_message: str) -> Dict[str, Any]:
        """Generate a generic fallback response."""
        return {
            "result": None,
            "status": "service_unavailable",
            "message": f"The {operation} service is temporarily unavailable.",
            "error_summary": error_message,
            "fallback": True,
            "suggested_actions": [
                "Please try again in a few moments",
                "Check your internet connection",
                "Contact support if the problem persists",
            ],
        }
# fmt: on