|
| 1 | +# Event Handler Fallback |
| 2 | + |
| 3 | +<div class="grid cards" markdown> |
| 4 | + |
| 5 | +- :material-home: **Back to Event Handling Overview** |
| 6 | + |
| 7 | + Return to the Event Handling overview page with all topics. |
| 8 | + |
| 9 | + [:octicons-arrow-left-24: Back to Overview](index.md) |
| 10 | + |
| 11 | +</div> |
| 12 | + |
| 13 | +--- |
| 14 | + |
| 15 | +## Overview |
| 16 | + |
| 17 | +The **Event Handler Fallback** pattern allows you to register an alternative event handler that runs when the primary event handler fails (or when the circuit breaker is open). This provides resilience for side effects such as sending notifications or updating read models when the primary path (e.g. external API) is unavailable. |
| 18 | + |
| 19 | +| Concept | Description | |
| 20 | +|---------|-------------| |
| 21 | +| **Primary handler** | Main event handler that executes first | |
| 22 | +| **Fallback handler** | Alternative handler invoked on primary failure or circuit open | |
| 23 | +| **failure_exceptions** | Optional tuple of exception types that trigger fallback; if empty, any exception | |
| 24 | +| **Circuit Breaker** | Optional; after threshold failures, primary is not called, fallback runs immediately | |
| 25 | + |
| 26 | +!!! tip "When to Use" |
| 27 | + Use Event Handler Fallback when domain event side effects (notifications, read model updates, integrations) must degrade gracefully: e.g. enqueue for later or log when the primary handler (e.g. external notification API) fails. |
| 28 | + |
| 29 | +## Registration |
| 30 | + |
| 31 | +Bind the event type to `EventHandlerFallback(primary, fallback, ...)` in your domain events mapper: |
| 32 | + |
| 33 | +```python |
| 34 | +import cqrs |
| 35 | + |
| 36 | +def events_mapper(mapper: cqrs.EventMap) -> None: |
| 37 | + mapper.bind( |
| 38 | + NotificationSent, |
| 39 | + cqrs.EventHandlerFallback( |
| 40 | + primary=PrimaryNotificationSentHandler, |
| 41 | + fallback=FallbackNotificationSentHandler, |
| 42 | + failure_exceptions=(ConnectionError, TimeoutError), # optional |
| 43 | + circuit_breaker=event_cb, # optional |
| 44 | + ), |
| 45 | + ) |
| 46 | +``` |
| 47 | + |
| 48 | +- **primary** — The primary event handler class (`EventHandler[EventType]`). |
| 49 | +- **fallback** — The fallback event handler class; must handle the same event type. |
| 50 | +- **failure_exceptions** — If non-empty, only these exception types trigger fallback; otherwise any exception triggers fallback. |
| 51 | +- **circuit_breaker** — Optional `ICircuitBreaker` instance (e.g. `AioBreakerAdapter`). Use one instance per domain (e.g. one for events). When the circuit is open, the primary handler is not called and the fallback runs immediately. |
| 52 | + |
| 53 | +## Basic Example |
| 54 | + |
| 55 | +```python |
| 56 | +class NotificationSent(cqrs.DomainEvent, frozen=True): |
| 57 | + user_id: str |
| 58 | + message: str |
| 59 | + |
| 60 | +class PrimaryNotificationSentHandler(cqrs.EventHandler[NotificationSent]): |
| 61 | + async def handle(self, event: NotificationSent) -> None: |
| 62 | + # e.g. call external notification API |
| 63 | + raise RuntimeError("External notification service unavailable") |
| 64 | + |
| 65 | +class FallbackNotificationSentHandler(cqrs.EventHandler[NotificationSent]): |
| 66 | + async def handle(self, event: NotificationSent) -> None: |
| 67 | + # e.g. enqueue for later or log |
| 68 | + logger.info("Enqueue notification for user %s: %s", event.user_id, event.message) |
| 69 | + |
| 70 | +# In events mapper: |
| 71 | +mapper.bind( |
| 72 | + NotificationSent, |
| 73 | + cqrs.EventHandlerFallback( |
| 74 | + primary=PrimaryNotificationSentHandler, |
| 75 | + fallback=FallbackNotificationSentHandler, |
| 76 | + ), |
| 77 | +) |
| 78 | +``` |
| 79 | + |
| 80 | +When a command handler emits `NotificationSent`, the event emitter runs the primary handler first. On exception (or when the circuit is open), the fallback handler is invoked. Events from the handler that actually ran are collected and processed. |
| 81 | + |
| 82 | +## Circuit Breaker (optional) |
| 83 | + |
| 84 | +To use a circuit breaker with event handlers: |
| 85 | + |
| 86 | +```bash |
| 87 | +pip install aiobreaker |
| 88 | +# or: pip install python-cqrs[aiobreaker] |
| 89 | +``` |
| 90 | + |
| 91 | +```python |
| 92 | +from cqrs.adapters.circuit_breaker import AioBreakerAdapter |
| 93 | + |
| 94 | +event_cb = AioBreakerAdapter(fail_max=5, timeout_duration=60) |
| 95 | +mapper.bind( |
| 96 | + NotificationSent, |
| 97 | + cqrs.EventHandlerFallback( |
| 98 | + primary=PrimaryNotificationSentHandler, |
| 99 | + fallback=FallbackNotificationSentHandler, |
| 100 | + circuit_breaker=event_cb, |
| 101 | + ), |
| 102 | +) |
| 103 | +``` |
| 104 | + |
| 105 | +After `fail_max` failures, the circuit opens and the fallback runs without calling the primary handler. See [Saga Fallback — Circuit Breaker](../saga/fallback/circuit_breaker.md) for the three-state pattern (CLOSED / OPEN / HALF_OPEN). |
| 106 | + |
| 107 | +### Circuit Breaker configuration |
| 108 | + |
| 109 | +Use the same `AioBreakerAdapter` as for request handlers. Parameters: |
| 110 | + |
| 111 | +| Parameter | Description | Default | |
| 112 | +|-----------|-------------|---------| |
| 113 | +| `fail_max` | Number of failures before opening the circuit | `5` | |
| 114 | +| `timeout_duration` | Seconds to wait before attempting HALF_OPEN (retry) | `60` | |
| 115 | +| `exclude` | Exception types that **do not** count as failures (e.g. business/validation errors) | `[]` | |
| 116 | +| `storage_factory` | Factory `(name: str) -> storage` for circuit state; default is in-memory | in-memory | |
| 117 | + |
| 118 | +**Example with `exclude`** — e.g. invalid payload should not open the circuit: |
| 119 | + |
| 120 | +```python |
| 121 | +event_cb = AioBreakerAdapter( |
| 122 | + fail_max=5, |
| 123 | + timeout_duration=60, |
| 124 | + exclude=[ValidationError], # invalid events don't open the circuit |
| 125 | +) |
| 126 | +mapper.bind( |
| 127 | + NotificationSent, |
| 128 | + cqrs.EventHandlerFallback( |
| 129 | + primary=PrimaryNotificationSentHandler, |
| 130 | + fallback=FallbackNotificationSentHandler, |
| 131 | + circuit_breaker=event_cb, |
| 132 | + ), |
| 133 | +) |
| 134 | +``` |
| 135 | + |
| 136 | +**One instance per domain** — use one `AioBreakerAdapter` for all event handler fallbacks that share the same policy. The adapter creates an isolated circuit per handler type. |
| 137 | + |
| 138 | +**Storage:** Default is in-memory. For multiple instances (e.g. several workers), pass a `storage_factory` that returns Redis storage so the circuit state is shared. See [Saga Fallback — Circuit Breaker: Storage Configuration](../saga/fallback/circuit_breaker.md#storage-configuration-memory-vs-redis). |
| 139 | + |
| 140 | +**Failure filtering:** Use `failure_exceptions` on `EventHandlerFallback` to restrict which exceptions trigger fallback; use `exclude` on `AioBreakerAdapter` so certain exceptions do not open the circuit. See [Saga Fallback — Circuit Breaker](../saga/fallback/circuit_breaker.md#failure-exception-filtering). |
| 141 | + |
| 142 | +## Related |
| 143 | + |
| 144 | +- [Request Handler Fallback](../request_handler/fallback.md) — Fallback for command/query handlers |
| 145 | +- [Stream Handling Fallback](../stream_handling/fallback.md) — Fallback for streaming handlers |
| 146 | +- [Saga Fallback Pattern](../saga/fallback/index.md) — Fallback for saga steps |
0 commit comments