Skip to content

Commit b6f15bb

Browse files
author
Вадим Козыревский
committed
Update Saga documentation
1 parent 07d299e commit b6f15bb

2 files changed

Lines changed: 75 additions & 21 deletions

File tree

docs/saga/recovery.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,19 @@ Recovery steps:
3939
2. Reconstruct context object from persisted data
4040
3. Resume execution from last completed step or complete compensation
4141

42+
### Concurrency Safety (Row Locking)
43+
44+
In a distributed environment with multiple replicas, multiple recovery workers might attempt to recover the same incomplete saga simultaneously.
45+
46+
To prevent race conditions, `recover_saga` uses **Row Locking**:
47+
48+
1. It calls `storage.load_saga_state(saga_id, read_for_update=True)`.
49+
2. For SQL databases, this executes `SELECT ... FOR UPDATE`.
50+
3. This acquires a database-level lock on the saga row.
51+
4. If another worker tries to recover the same saga, it will block until the first worker completes or releases the lock.
52+
53+
This ensures that **only one worker** can actively recover and execute a specific saga at any given time.
54+
4255
## Recovery Scenarios
4356

4457
### Interrupted Forward Execution
@@ -47,7 +60,7 @@ Recovery steps:
4760
**Recovery:** Skips completed steps, resumes from last completed step
4861

4962
```python
50-
status, _ = await storage.load_saga_state(saga_id) # RUNNING
63+
status, _, _ = await storage.load_saga_state(saga_id) # RUNNING
5164
await recover_saga(saga, saga_id, OrderContext)
5265
# Skips completed steps, continues execution
5366
```
@@ -58,7 +71,7 @@ await recover_saga(saga, saga_id, OrderContext)
5871
**Recovery:** Completes compensation in reverse order
5972

6073
```python
61-
status, _ = await storage.load_saga_state(saga_id) # COMPENSATING
74+
status, _, _ = await storage.load_saga_state(saga_id) # COMPENSATING
6275
try:
6376
await recover_saga(saga, saga_id, OrderContext)
6477
except RuntimeError:

docs/saga/storage.md

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ Two storage implementations:
1616
```python
1717
class ISagaStorage(abc.ABC):
1818
async def create_saga(saga_id, name, context) -> None
19-
async def update_context(saga_id, context) -> None
19+
async def update_context(saga_id, context, current_version: int | None = None) -> None
2020
async def update_status(saga_id, status) -> None
2121
async def log_step(saga_id, step_name, action, status, details=None) -> None
22-
async def load_saga_state(saga_id) -> tuple[SagaStatus, dict]
22+
async def load_saga_state(saga_id, *, read_for_update: bool = False) -> tuple[SagaStatus, dict, int]
2323
async def get_step_history(saga_id) -> list[SagaLogEntry]
2424
```
2525

@@ -46,7 +46,7 @@ mediator = bootstrap.bootstrap(
4646
)
4747

4848
# Access storage data
49-
status, context_data = await storage.load_saga_state(saga_id)
49+
status, context_data, version = await storage.load_saga_state(saga_id)
5050
history = await storage.get_step_history(saga_id)
5151
```
5252

@@ -58,7 +58,7 @@ history = await storage.get_step_history(saga_id)
5858

5959
## SQLAlchemy Storage
6060

61-
Database-backed implementation for production.
61+
Database-backed implementation for production. It uses a session factory to manage transactions internally, ensuring that every step is committed immediately ("checkpointing").
6262

6363
### Database Schema
6464

@@ -67,6 +67,7 @@ Database-backed implementation for production.
6767
- `id` (UUID) - Primary key
6868
- `status` (VARCHAR) - PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
6969
- `context` (JSON)
70+
- `version` (INTEGER) - Optimistic locking version (default: 1)
7071
- `created_at`, `updated_at` (TIMESTAMP)
7172

7273
**saga_logs:**
@@ -83,27 +84,31 @@ Database-backed implementation for production.
8384

8485
### Usage
8586

87+
The storage requires an `async_sessionmaker` to create short-lived sessions for each operation.
88+
8689
```python
8790
import uuid
8891
import cqrs
8992
from cqrs.saga import bootstrap
9093
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
9194
from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage, Base
9295

93-
# Setup
94-
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
95-
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
96+
# Setup Engine with connection pool
97+
engine = create_async_engine(
98+
"postgresql+asyncpg://user:pass@localhost/db",
99+
pool_size=20,
100+
max_overflow=10,
101+
)
102+
103+
# Create session factory (factory, NOT session instance)
104+
session_factory = async_sessionmaker(engine, expire_on_commit=False)
96105

97106
# Initialize tables (run once)
98107
async with engine.begin() as conn:
99108
await conn.run_sync(Base.metadata.create_all)
100109

101-
# Create storage
102-
async def create_storage() -> SqlAlchemySagaStorage:
103-
return SqlAlchemySagaStorage(SessionLocal())
104-
105-
# Usage
106-
storage = await create_storage()
110+
# Initialize storage with session FACTORY
111+
storage = SqlAlchemySagaStorage(session_factory)
107112

108113
# Register saga in SagaMap
109114
def saga_mapper(mapper: cqrs.SagaMap) -> None:
@@ -120,12 +125,47 @@ mediator = bootstrap.bootstrap(
120125
context = OrderContext(...)
121126
saga_id = uuid.uuid4()
122127

128+
# The storage will automatically commit each step to the database
123129
async for step_result in mediator.stream(context, saga_id=saga_id):
124130
print(f"Step: {step_result.step_type.__name__}")
125-
126-
await storage.session.commit()
127131
```
128132

133+
### Transaction Management
134+
135+
**SqlAlchemySagaStorage** handles transactions automatically:
136+
137+
1. Each method (`create_saga`, `log_step`, etc.) opens a new session.
138+
2. The operation is performed.
139+
3. `session.commit()` is called immediately.
140+
4. The session is closed.
141+
142+
This design ensures:
143+
144+
- **Crash Safety:** Even if the application crashes mid-saga, all completed steps are safely persisted.
145+
- **Connection Efficiency:** Connections are returned to the pool immediately after each operation.
146+
- **Isolation:** Saga storage operations don't interfere with your business logic transactions.
147+
148+
### Concurrency Control
149+
150+
The storage implementation provides two mechanisms to handle concurrency in distributed environments:
151+
152+
#### 1. Optimistic Locking (Versioning)
153+
154+
To prevent "lost updates" when multiple steps might update the context simultaneously (though sagas typically execute sequentially), the `version` column is used.
155+
156+
- `update_context` accepts an optional `current_version`.
157+
- If provided, the storage checks if `version == current_version`.
158+
- If matched, it updates the context and increments the version (`version + 1`).
159+
- If not matched, it raises `SagaConcurrencyError`, indicating the state was modified by another process.
160+
161+
#### 2. Row Locking (Recovery Safety)
162+
163+
When recovering a saga (e.g., after a crash), it is critical that only one worker picks up the saga to avoid duplicate execution.
164+
165+
- `load_saga_state(..., read_for_update=True)` uses `SELECT ... FOR UPDATE` (in SQL databases).
166+
- This acquires a row-level lock on the saga execution record.
167+
- Other workers attempting to lock the same saga will wait or fail, ensuring exclusive access during the recovery process.
168+
129169
## Choosing Storage
130170

131171
**Memory Storage:**
@@ -138,10 +178,11 @@ await storage.session.commit()
138178
- ✅ Production, multi-process
139179
- ✅ Recovery after restarts
140180
- ✅ Audit trail
181+
- ✅ Robust transaction management
141182

142183
## Best Practices
143184

144-
1. **Use persistent storage in production** — Memory storage loses data on restart
145-
2. **Create indexes**Index `saga_id` and `created_at` for better performance
146-
3. **Handle session lifecycle**Commit and close SQLAlchemy sessions properly
147-
4. **Monitor storage size** — Archive old saga logs periodically
185+
1. **Use persistent storage in production** — Memory storage loses data on restart
186+
2. **Configure Connection Pool**Set appropriate `pool_size` and `max_overflow` for your load.
187+
3. **Create indexes**Index `saga_id` and `created_at` for better performance
188+
4. **Monitor storage size** — Archive old saga logs periodically

0 commit comments

Comments
 (0)