Skip to content

Commit 6716a42

Browse files
author
Вадим Козыревский
committed
Update documentation
1 parent 556f13e commit 6716a42

2 files changed

Lines changed: 24 additions & 9 deletions

File tree

docs/saga/recovery.md

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,23 +104,34 @@ Each saga in storage has a **recovery_attempts** counter. It is used to:
104104

105105
**Automatic increment:** When `recover_saga()` fails (exception during resume), the storage's `increment_recovery_attempts(saga_id, new_status=SagaStatus.FAILED)` is called automatically. Callers do **not** need to call `increment_recovery_attempts` themselves.
106106

107+
**Explicit set:** Use `storage.set_recovery_attempts(saga_id, attempts)` to set the counter to a specific value: e.g. `0` after successfully recovering one of the steps, or the maximum value so the saga is excluded from further recovery without changing its status.
108+
107109
**Getting sagas for recovery:** Use `storage.get_sagas_for_recovery()` instead of a custom query:
108110

109111
```python
112+
# All saga types (default)
110113
ids = await storage.get_sagas_for_recovery(
111114
limit=50,
112115
max_recovery_attempts=5, # Only sagas with recovery_attempts < 5
113116
stale_after_seconds=120, # Only sagas not updated in last 2 minutes (avoids picking active sagas)
114117
)
118+
119+
# Only sagas of a specific type (e.g. one recovery job per saga name)
120+
ids = await storage.get_sagas_for_recovery(
121+
limit=50,
122+
max_recovery_attempts=5,
123+
saga_name="OrderSaga",
124+
)
115125
```
116126

117127
| Parameter | Description |
118128
|-----------|-------------|
119129
| `limit` | Maximum number of saga IDs to return |
120130
| `max_recovery_attempts` | Only include sagas with `recovery_attempts` strictly less than this value (default: 5) |
121131
| `stale_after_seconds` | If set, only include sagas whose `updated_at` is older than (now − this value). Use to avoid picking sagas currently being executed. `None` = no filter |
132+
| `saga_name` | If set, only include sagas with this name (e.g. handler/type name). `None` (default) = return all saga types |
122133

123-
Returns saga IDs in status RUNNING, COMPENSATING, or FAILED, ordered by `updated_at` ascending (oldest first).
134+
Returns saga IDs in status RUNNING or COMPENSATING, ordered by `updated_at` ascending (oldest first).
124135

125136
## Strict Backward Recovery
126137

@@ -132,18 +143,19 @@ This prevents "zombie states" where compensation actions conflict with new execu
132143

133144
### Background Recovery Job
134145

135-
Use `storage.get_sagas_for_recovery()` to get saga IDs that need recovery. On recovery failure, `recover_saga()` calls `increment_recovery_attempts` internally — no extra code needed.
146+
Use `storage.get_sagas_for_recovery()` to get saga IDs that need recovery. On recovery failure, `recover_saga()` calls `increment_recovery_attempts` internally — no extra code needed. You can pass `saga_name` to run separate recovery jobs per saga type.
136147

137148
```python
138149
import asyncio
139150
from cqrs.saga.recovery import recover_saga
140151

141-
async def recovery_job(storage, saga, context_builder, container):
152+
async def recovery_job(storage, saga, context_builder, container, saga_name=None):
142153
while True:
143154
ids = await storage.get_sagas_for_recovery(
144155
limit=50,
145156
max_recovery_attempts=5,
146157
stale_after_seconds=120, # Avoid sagas currently being executed
158+
saga_name=saga_name, # None = all types; or e.g. "OrderSaga" for one type
147159
)
148160
for saga_id in ids:
149161
try:
@@ -182,6 +194,7 @@ scheduler.start()
182194
1. **Run recovery periodically** — Background job using `get_sagas_for_recovery()` to scan for incomplete sagas
183195
2. **Use `max_recovery_attempts`** — Exclude sagas that fail recovery too many times (e.g. 5) to avoid infinite retries
184196
3. **Use `stale_after_seconds`** — Avoid picking sagas that are currently being executed by another worker
185-
4. **Handle failures** — Log errors and send alerts; `increment_recovery_attempts` is called automatically by `recover_saga`
186-
5. **Monitor metrics** — Track recovery rate, duration, failures, and sagas exceeding max attempts
187-
6. **Use persistent storage** — Memory storage loses data on restart
197+
4. **Use `saga_name` for per-type recovery** — When running separate recovery jobs per saga type, pass `saga_name` so each job only processes its own sagas
198+
5. **Handle failures** — Log errors and send alerts; `increment_recovery_attempts` is called automatically by `recover_saga`
199+
6. **Monitor metrics** — Track recovery rate, duration, failures, and sagas exceeding max attempts
200+
7. **Use persistent storage** — Memory storage loses data on restart

docs/saga/storage.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ class ISagaStorage(abc.ABC):
3131
async def log_step(saga_id, step_name, action, status, details=None) -> None
3232
async def load_saga_state(saga_id, *, read_for_update: bool = False) -> tuple[SagaStatus, dict, int]
3333
async def get_step_history(saga_id) -> list[SagaLogEntry]
34-
async def get_sagas_for_recovery(limit, max_recovery_attempts=5, stale_after_seconds=None) -> list[uuid.UUID]
34+
async def get_sagas_for_recovery(limit, max_recovery_attempts=5, stale_after_seconds=None, saga_name=None) -> list[uuid.UUID]
3535
async def increment_recovery_attempts(saga_id, new_status: SagaStatus | None = None) -> None
36+
async def set_recovery_attempts(saga_id, attempts: int) -> None
3637
```
3738

38-
- **get_sagas_for_recovery** — Returns saga IDs that need recovery (RUNNING, COMPENSATING, FAILED) with `recovery_attempts` &lt; `max_recovery_attempts`, optionally filtered by staleness. Used by recovery jobs.
39+
- **get_sagas_for_recovery** — Returns saga IDs that need recovery (RUNNING, COMPENSATING) with `recovery_attempts` &lt; `max_recovery_attempts`, optionally filtered by staleness and by saga name. When `saga_name` is `None` (default), returns all saga types; when set, only sagas with that name. Used by recovery jobs.
3940
- **increment_recovery_attempts** — Called automatically by `recover_saga()` on recovery failure; increments `recovery_attempts` and optionally updates status (e.g. to FAILED).
41+
- **set_recovery_attempts** — Sets the recovery attempt counter to an explicit value. Use to reset after successfully recovering a step (e.g. set to `0`) or to set to the maximum so the saga is excluded from further recovery (e.g. mark as permanently failed without changing status).
4042

4143
## Memory Storage
4244

@@ -83,7 +85,7 @@ Database-backed implementation for production. It uses a session factory to mana
8385
- `status` (VARCHAR) - PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
8486
- `context` (JSON)
8587
- `version` (INTEGER) - Optimistic locking version (default: 1)
86-
- `recovery_attempts` (INTEGER) - Number of failed recovery attempts (default: 0); used by `get_sagas_for_recovery` and `increment_recovery_attempts`
88+
- `recovery_attempts` (INTEGER) - Number of failed recovery attempts (default: 0); used by `get_sagas_for_recovery`, `increment_recovery_attempts`, and `set_recovery_attempts`
8789
- `created_at`, `updated_at` (TIMESTAMP)
8890

8991
**saga_logs:**

0 commit comments

Comments
 (0)