-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresilience.py
More file actions
283 lines (222 loc) · 9.34 KB
/
resilience.py
File metadata and controls
283 lines (222 loc) · 9.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""Resilience patterns for reliable span export.
Provides retry with exponential backoff and circuit breaker patterns.
"""
from __future__ import annotations
import asyncio
import logging
import random
import threading
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from enum import Enum
from typing import TypeVar
T = TypeVar("T")
logger = logging.getLogger(__name__)
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_attempts: int = 3
initial_delay_seconds: float = 0.1
max_delay_seconds: float = 10.0
exponential_base: float = 2.0
jitter: bool = True # Prevent thundering herd
def calculate_backoff_delay(
attempt: int,
config: RetryConfig,
) -> float:
"""Calculate delay for a given retry attempt using exponential backoff.
Args:
attempt: Current attempt number (1-based)
config: Retry configuration
Returns:
Delay in seconds before next retry
"""
delay = config.initial_delay_seconds * (config.exponential_base ** (attempt - 1))
delay = min(delay, config.max_delay_seconds)
if config.jitter:
jitter_factor = 0.75 + random.random() * 0.5
delay *= jitter_factor
return delay
async def retry_async(
operation: Callable[[], Awaitable[T]],
config: RetryConfig | None = None,
retryable_exceptions: tuple[type[Exception], ...] = (Exception,),
non_retryable_exceptions: tuple[type[Exception], ...] = (),
operation_name: str = "operation",
) -> T:
"""Execute an async operation with retry and exponential backoff.
Args:
operation: Async callable to execute
config: Retry configuration (uses defaults if None)
retryable_exceptions: Tuple of exception types that trigger retry
non_retryable_exceptions: Tuple of exception types that should fail immediately
operation_name: Name for logging purposes
Returns:
Result of the operation
Raises:
The last exception if all retries are exhausted, or immediately for non-retryable
"""
config = config or RetryConfig()
last_exception: Exception | None = None
for attempt in range(1, config.max_attempts + 1):
try:
return await operation()
except non_retryable_exceptions:
# Don't retry these - re-raise immediately
raise
except retryable_exceptions as e:
last_exception = e
if attempt == config.max_attempts:
logger.warning(f"{operation_name} failed after {config.max_attempts} attempts: {e}")
raise
delay = calculate_backoff_delay(attempt, config)
logger.debug(f"{operation_name} attempt {attempt} failed: {e}. Retrying in {delay:.2f}s...")
await asyncio.sleep(delay)
# Should never reach here, but satisfy type checker
if last_exception:
raise last_exception
raise RuntimeError("Unexpected retry loop exit")
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation, requests pass through
OPEN = "open" # Circuit tripped, requests fail fast
HALF_OPEN = "half_open" # Testing if service recovered
@dataclass
class CircuitBreakerConfig:
"""Configuration for circuit breaker behavior."""
failure_threshold: int = 5 # Failures before opening circuit
success_threshold: int = 2 # Successes in half-open to close circuit
timeout_seconds: float = 30.0 # Time before transitioning open -> half-open
# Count failures in this time window (0 = no window, count all)
failure_window_seconds: float = 60.0
@dataclass
class CircuitBreakerStats:
"""Statistics for circuit breaker."""
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
rejected_requests: int = 0 # Requests rejected due to open circuit
state_transitions: int = 0
class CircuitBreaker:
"""Circuit breaker for protecting against cascading failures.
States:
- CLOSED: Normal operation. Failures are counted.
- OPEN: Circuit tripped after too many failures. Requests fail fast.
- HALF_OPEN: After timeout, allow limited requests to test recovery.
"""
def __init__(
self,
name: str,
config: CircuitBreakerConfig | None = None,
) -> None:
self._name = name
self._config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._lock = threading.Lock()
# Failure tracking
self._failure_timestamps: list[float] = []
self._consecutive_successes = 0
# State tracking
self._last_failure_time: float = 0
self._last_state_change_time: float = time.monotonic()
# Statistics
self._stats = CircuitBreakerStats()
@property
def state(self) -> CircuitState:
"""Get current circuit state (may trigger state transition)."""
with self._lock:
self._check_state_transition()
return self._state
@property
def stats(self) -> CircuitBreakerStats:
"""Get circuit breaker statistics."""
return self._stats
@property
def is_closed(self) -> bool:
"""Check if circuit is closed (allowing requests)."""
return self.state == CircuitState.CLOSED
def allow_request(self) -> bool:
"""Check if a request should be allowed through.
Returns:
True if request should proceed, False if it should fail fast
"""
with self._lock:
self._check_state_transition()
self._stats.total_requests += 1
if self._state == CircuitState.CLOSED:
return True
elif self._state == CircuitState.HALF_OPEN:
# Allow limited requests in half-open state
return True
else: # OPEN
self._stats.rejected_requests += 1
return False
def record_success(self) -> None:
"""Record a successful request."""
with self._lock:
self._stats.successful_requests += 1
if self._state == CircuitState.HALF_OPEN:
self._consecutive_successes += 1
if self._consecutive_successes >= self._config.success_threshold:
self._transition_to(CircuitState.CLOSED)
elif self._state == CircuitState.CLOSED:
self._prune_old_failures()
def record_failure(self) -> None:
"""Record a failed request."""
now = time.monotonic()
with self._lock:
self._stats.failed_requests += 1
self._last_failure_time = now
self._failure_timestamps.append(now)
self._consecutive_successes = 0
if self._state == CircuitState.HALF_OPEN:
# Any failure in half-open trips the circuit again
self._transition_to(CircuitState.OPEN)
elif self._state == CircuitState.CLOSED:
self._prune_old_failures()
if len(self._failure_timestamps) >= self._config.failure_threshold:
self._transition_to(CircuitState.OPEN)
def _prune_old_failures(self) -> None:
"""Remove failures outside the time window."""
if self._config.failure_window_seconds <= 0:
return
cutoff = time.monotonic() - self._config.failure_window_seconds
self._failure_timestamps = [ts for ts in self._failure_timestamps if ts > cutoff]
def _check_state_transition(self) -> None:
"""Check if state should transition based on time."""
if self._state == CircuitState.OPEN:
time_since_open = time.monotonic() - self._last_state_change_time
if time_since_open >= self._config.timeout_seconds:
self._transition_to(CircuitState.HALF_OPEN)
def _transition_to(self, new_state: CircuitState) -> None:
"""Transition to a new state."""
from .metrics import get_metrics_collector
old_state = self._state
self._state = new_state
self._last_state_change_time = time.monotonic()
self._stats.state_transitions += 1
if new_state == CircuitState.CLOSED:
self._failure_timestamps.clear()
self._consecutive_successes = 0
# Notify metrics collector that circuit is healthy again
get_metrics_collector().notify_circuit_breaker_closed()
elif new_state == CircuitState.HALF_OPEN:
self._consecutive_successes = 0
elif new_state == CircuitState.OPEN:
# Warn about circuit breaker opening
get_metrics_collector().warn_circuit_breaker_open()
logger.info(f"Circuit breaker '{self._name}' transitioned: {old_state.value} -> {new_state.value}")
def reset(self) -> None:
"""Reset circuit breaker to initial closed state."""
with self._lock:
self._state = CircuitState.CLOSED
self._failure_timestamps.clear()
self._consecutive_successes = 0
self._last_state_change_time = time.monotonic()
logger.debug(f"Circuit breaker '{self._name}' reset to CLOSED")
class CircuitOpenError(Exception):
"""Raised when circuit breaker is open and request is rejected."""
def __init__(self, circuit_name: str) -> None:
self.circuit_name = circuit_name
super().__init__(f"Circuit breaker '{circuit_name}' is open")