|
| 1 | +""" |
| 2 | +Orchestrator for the Circuit Breaker utility. |
| 3 | +
|
| 4 | +:class:`CircuitBreakerHandler` owns the state machine and the per-environment failure |
| 5 | +counter; the persistence layer owns the shared truth. This split keeps the healthy |
| 6 | +path write-free: failures are counted locally and only persisted on a state transition. |
| 7 | +""" |
| 8 | + |
| 9 | +from __future__ import annotations |
| 10 | + |
| 11 | +import datetime |
| 12 | +import logging |
| 13 | +import uuid |
| 14 | +from typing import TYPE_CHECKING, Any |
| 15 | + |
| 16 | +from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import CircuitBreakerOpenError |
| 17 | +from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState, CircuitTransition |
| 18 | + |
| 19 | +if TYPE_CHECKING: |
| 20 | + from collections.abc import Callable |
| 21 | + |
| 22 | + from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig |
| 23 | + from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import ( |
| 24 | + CircuitBreakerPersistenceLayer, |
| 25 | + ) |
| 26 | + from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo |
| 27 | + |
| 28 | +logger = logging.getLogger(__name__) |
| 29 | + |
| 30 | +# Per-environment, per-circuit consecutive counters. Module-level so they survive across |
| 31 | +# invocations within the same execution environment, the same way idempotency caches do. |
| 32 | +_LOCAL_FAILURES: dict[str, int] = {} |
| 33 | +_LOCAL_SUCCESSES: dict[str, int] = {} |
| 34 | + |
| 35 | +# Stable per-environment identifier used to claim the half-open probe lock. |
| 36 | +_ENVIRONMENT_ID = uuid.uuid4().hex |
| 37 | + |
| 38 | + |
| 39 | +class CircuitBreakerHandler: |
| 40 | + """ |
| 41 | + Drive a single protected call through the circuit breaker state machine. |
| 42 | +
|
| 43 | + A new handler is created per invocation by the decorator. It reads the shared state, |
| 44 | + routes the call (run, short-circuit, or probe), and records the outcome. |
| 45 | +
|
| 46 | + Parameters |
| 47 | + ---------- |
| 48 | + function : Callable |
| 49 | + The protected function. |
| 50 | + name : str |
| 51 | + Circuit name. |
| 52 | + config : CircuitBreakerConfig |
| 53 | + Circuit configuration. |
| 54 | + persistence_store : CircuitBreakerPersistenceLayer |
| 55 | + Shared state store. |
| 56 | + on_circuit_open : Callable | None |
| 57 | + Callback invoked with the protected call's own ``*args``/``**kwargs`` plus a |
| 58 | + trailing ``circuit`` keyword argument when the circuit is open. If ``None``, an |
| 59 | + open circuit raises :class:`CircuitBreakerOpenError`. |
| 60 | + function_args : tuple |
| 61 | + Positional arguments the protected function was called with. |
| 62 | + function_kwargs : dict |
| 63 | + Keyword arguments the protected function was called with. |
| 64 | + """ |
| 65 | + |
| 66 | + def __init__( |
| 67 | + self, |
| 68 | + function: Callable, |
| 69 | + name: str, |
| 70 | + config: CircuitBreakerConfig, |
| 71 | + persistence_store: CircuitBreakerPersistenceLayer, |
| 72 | + on_circuit_open: Callable | None = None, |
| 73 | + on_transition: Callable | None = None, |
| 74 | + function_args: tuple | None = None, |
| 75 | + function_kwargs: dict | None = None, |
| 76 | + ): |
| 77 | + self.function = function |
| 78 | + self.name = name |
| 79 | + self.config = config |
| 80 | + self.on_circuit_open = on_circuit_open |
| 81 | + self.on_transition = on_transition |
| 82 | + self.fn_args = function_args or () |
| 83 | + self.fn_kwargs = function_kwargs or {} |
| 84 | + |
| 85 | + persistence_store.configure(config=config, circuit_name=name) |
| 86 | + self.persistence_store = persistence_store |
| 87 | + |
| 88 | + def handle(self) -> Any: |
| 89 | + """ |
| 90 | + Evaluate the circuit and route the call. |
| 91 | +
|
| 92 | + Returns |
| 93 | + ------- |
| 94 | + Any |
| 95 | + The protected function's result when the call runs, or the |
| 96 | + ``on_circuit_open`` callback's return value when the circuit is open. |
| 97 | +
|
| 98 | + Raises |
| 99 | + ------ |
| 100 | + CircuitBreakerOpenError |
| 101 | + If the circuit is open and no callback is registered. |
| 102 | + """ |
| 103 | + record = self.persistence_store.get_state(self.name) |
| 104 | + |
| 105 | + if record.state == CircuitState.CLOSED: |
| 106 | + return self._call_closed() |
| 107 | + |
| 108 | + if record.state == CircuitState.OPEN: |
| 109 | + # ``opened_at`` may legitimately be 0 (epoch); treat only None as missing. |
| 110 | + opened_at = record.opened_at if record.opened_at is not None else self._now() |
| 111 | + if self._now() >= opened_at + self.config.recovery_timeout: |
| 112 | + # Recovery window elapsed: try to become the single prober. |
| 113 | + if self.persistence_store.try_acquire_half_open(self.name, _ENVIRONMENT_ID, opened_at): |
| 114 | + self._notify(CircuitState.OPEN, CircuitState.HALF_OPEN, opened_at=opened_at) |
| 115 | + return self._call_probe() |
| 116 | + return self._open_response(record.to_circuit_info()) |
| 117 | + |
| 118 | + # HALF_OPEN: only the environment that owns the probe lock runs. |
| 119 | + if record.half_open_owner == _ENVIRONMENT_ID: |
| 120 | + return self._call_probe() |
| 121 | + return self._open_response(record.to_circuit_info()) |
| 122 | + |
| 123 | + def _call_closed(self) -> Any: |
| 124 | + """Run the protected call while the circuit is closed, tracking failures.""" |
| 125 | + try: |
| 126 | + result = self.function(*self.fn_args, **self.fn_kwargs) |
| 127 | + except Exception as exc: |
| 128 | + if not self.config.counts_as_failure(exc): |
| 129 | + raise |
| 130 | + failures = _LOCAL_FAILURES.get(self.name, 0) + 1 |
| 131 | + _LOCAL_FAILURES[self.name] = failures |
| 132 | + if failures >= self.config.failure_threshold: |
| 133 | + logger.debug("Circuit '%s' tripping CLOSED to OPEN after %d failures.", self.name, failures) |
| 134 | + opened_at = self._now() |
| 135 | + self.persistence_store.save_open(self.name, failure_count=failures, opened_at=opened_at) |
| 136 | + _LOCAL_FAILURES[self.name] = 0 |
| 137 | + self._notify(CircuitState.CLOSED, CircuitState.OPEN, opened_at=opened_at) |
| 138 | + raise |
| 139 | + else: |
| 140 | + _LOCAL_FAILURES[self.name] = 0 |
| 141 | + return result |
| 142 | + |
| 143 | + def _call_probe(self) -> Any: |
| 144 | + """Run a probe during half-open, closing or reopening based on the outcome.""" |
| 145 | + try: |
| 146 | + result = self.function(*self.fn_args, **self.fn_kwargs) |
| 147 | + except Exception as exc: |
| 148 | + if not self.config.counts_as_failure(exc): |
| 149 | + raise |
| 150 | + logger.debug("Circuit '%s' probe failed; reopening.", self.name) |
| 151 | + opened_at = self._now() |
| 152 | + self.persistence_store.save_reopen(self.name, opened_at=opened_at) |
| 153 | + _LOCAL_SUCCESSES[self.name] = 0 |
| 154 | + self._notify(CircuitState.HALF_OPEN, CircuitState.OPEN, opened_at=opened_at) |
| 155 | + raise |
| 156 | + else: |
| 157 | + successes = _LOCAL_SUCCESSES.get(self.name, 0) + 1 |
| 158 | + _LOCAL_SUCCESSES[self.name] = successes |
| 159 | + if successes >= self.config.success_threshold: |
| 160 | + logger.debug("Circuit '%s' closing after %d probe successes.", self.name, successes) |
| 161 | + self.persistence_store.save_closed(self.name) |
| 162 | + _LOCAL_SUCCESSES[self.name] = 0 |
| 163 | + _LOCAL_FAILURES[self.name] = 0 |
| 164 | + self._notify(CircuitState.HALF_OPEN, CircuitState.CLOSED) |
| 165 | + return result |
| 166 | + |
| 167 | + def _open_response(self, circuit: CircuitInfo) -> Any: |
| 168 | + """Produce the response for an open circuit: callback result or raise.""" |
| 169 | + if self.on_circuit_open is not None: |
| 170 | + # Forward the protected call's arguments unchanged: positional stay positional, |
| 171 | + # keyword stay keyword. The circuit snapshot is passed as a keyword argument so |
| 172 | + # it never collides with positionalized kwargs nor depends on dict ordering. |
| 173 | + return self.on_circuit_open(*self.fn_args, **self.fn_kwargs, circuit=circuit) |
| 174 | + raise CircuitBreakerOpenError( |
| 175 | + f"Circuit '{self.name}' is open.", |
| 176 | + circuit=circuit, |
| 177 | + ) |
| 178 | + |
| 179 | + def _notify(self, from_state: CircuitState, to_state: CircuitState, opened_at: int | None = None) -> None: |
| 180 | + """ |
| 181 | + Fire the ``on_transition`` hook for a state change. |
| 182 | +
|
| 183 | + Called only on real transitions, never on the hot path. Any exception the hook |
| 184 | + raises is swallowed and logged: observability must never break the protected call. |
| 185 | + """ |
| 186 | + if self.on_transition is None: |
| 187 | + return |
| 188 | + try: |
| 189 | + self.on_transition( |
| 190 | + CircuitTransition( |
| 191 | + circuit_name=self.name, |
| 192 | + from_state=from_state, |
| 193 | + to_state=to_state, |
| 194 | + opened_at=opened_at, |
| 195 | + ), |
| 196 | + ) |
| 197 | + except Exception: |
| 198 | + logger.warning("on_transition hook for circuit '%s' raised; ignoring.", self.name, exc_info=True) |
| 199 | + |
| 200 | + @staticmethod |
| 201 | + def _now() -> int: |
| 202 | + """Current unix timestamp in seconds.""" |
| 203 | + return int(datetime.datetime.now().timestamp()) |
0 commit comments