Skip to content

Commit cd0b0f7

Browse files
author
Вадим Козыревский
committed
Choreographic Saga pattern supporting
1 parent 64ad422 commit cd0b0f7

6 files changed

Lines changed: 22 additions & 14 deletions

File tree

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/cor_re
199199

200200
## Saga Pattern
201201

202-
The package implements the Choreographic Saga pattern for managing distributed transactions across multiple services or operations.
202+
The package implements the Choreographic Saga pattern for managing distributed transactions across multiple services or operations.
203203
Sagas enable eventual consistency by executing a series of steps where each step can be compensated if a subsequent step fails.
204204

205205
### Key Features
@@ -236,13 +236,13 @@ class ProcessPaymentResponse(Response):
236236
class ReserveInventoryStep(SagaStepHandler[OrderContext, ReserveInventoryResponse]):
237237
def __init__(self, inventory_service):
238238
self._inventory_service = inventory_service
239-
239+
240240
async def act(self, context: OrderContext) -> SagaStepResult[OrderContext, ReserveInventoryResponse]:
241241
# Reserve inventory
242242
reservation_id = await self._inventory_service.reserve_items(context.order_id, context.items)
243243
context.inventory_reservation_id = reservation_id
244244
return self._generate_step_result(ReserveInventoryResponse(reservation_id=reservation_id))
245-
245+
246246
async def compensate(self, context: OrderContext) -> None:
247247
# Release inventory if saga fails
248248
if context.inventory_reservation_id:
@@ -251,13 +251,13 @@ class ReserveInventoryStep(SagaStepHandler[OrderContext, ReserveInventoryRespons
251251
class ProcessPaymentStep(SagaStepHandler[OrderContext, ProcessPaymentResponse]):
252252
def __init__(self, payment_service):
253253
self._payment_service = payment_service
254-
254+
255255
async def act(self, context: OrderContext) -> SagaStepResult[OrderContext, ProcessPaymentResponse]:
256256
# Process payment
257257
payment_id = await self._payment_service.charge(context.order_id, context.total_amount)
258258
context.payment_id = payment_id
259259
return self._generate_step_result(ProcessPaymentResponse(payment_id=payment_id))
260-
260+
261261
async def compensate(self, context: OrderContext) -> None:
262262
# Refund payment if saga fails
263263
if context.payment_id:
@@ -281,8 +281,8 @@ async with saga.transaction(context=context, saga_id=saga_id) as transaction:
281281
# If any step fails, compensation happens automatically
282282
```
283283

284-
The saga state and step history are persisted to `SagaStorage`. The `SagaLog` maintains a complete audit trail
285-
of all step executions (both `act` and `compensate` operations) with timestamps and status information.
284+
The saga state and step history are persisted to `SagaStorage`. The `SagaLog` maintains a complete audit trail
285+
of all step executions (both `act` and `compensate` operations) with timestamps and status information.
286286
This enables the recovery mechanism to restore saga state and ensure eventual consistency even after system failures.
287287

288288
If a saga is interrupted (e.g., due to a crash), you can recover it using the recovery mechanism:

examples/saga.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
from cqrs.saga.models import SagaContext
9393
from cqrs.saga.saga import Saga
9494
from cqrs.saga.step import SagaStepHandler, SagaStepResult
95-
from cqrs.saga.storage.enums import SagaStatus
9695
from cqrs.saga.storage.memory import MemorySagaStorage
9796

9897
logging.basicConfig(level=logging.INFO)

examples/saga_fastapi_sse.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,9 @@ async def generate_sse():
463463
yield f"data: {json.dumps(start_data)}\n\n"
464464

465465
# Execute saga with saga_id for persistence
466-
async with saga.transaction(context=context, saga_id=saga_id) as transaction:
466+
async with saga.transaction(
467+
context=context, saga_id=saga_id
468+
) as transaction:
467469
async for step_result in transaction:
468470
completed_steps += 1
469471
step_name = step_result.step_type.__name__

examples/saga_recovery.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,9 @@ async def cancel_shipment(self, shipment_id: str) -> None:
301301

302302
tracking_number = self._shipments[shipment_id]
303303
del self._shipments[shipment_id]
304-
logger.info(f" ↻ Cancelled shipment {shipment_id} (tracking: {tracking_number})")
304+
logger.info(
305+
f" ↻ Cancelled shipment {shipment_id} (tracking: {tracking_number})"
306+
)
305307

306308

307309
# ============================================================================
@@ -562,7 +564,9 @@ async def simulate_interrupted_saga() -> tuple[uuid.UUID, MemorySagaStorage]:
562564
# Simulate crash after first step
563565
if step_name == "ReserveInventoryStep":
564566
print("\n💥 SIMULATED SERVER CRASH!")
565-
print(" (In reality: server restart, network failure, timeout, etc.)")
567+
print(
568+
" (In reality: server restart, network failure, timeout, etc.)"
569+
)
566570
print(" Saga state has been persisted to storage.")
567571
print(" Current state: RUNNING, 1 step completed")
568572
# Break out of loop to simulate interruption

src/cqrs/middlewares/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def wrap(self, handle: HandleType) -> HandleType:
2929
return handle
3030

3131

32-
# Contravariant TypeVar for SagaMiddleware protocol
33-
SagaContextT = typing.TypeVar("SagaContextT", bound=SagaContext, contravariant=True)
32+
# TypeVar for SagaMiddleware protocol
33+
SagaContextT = typing.TypeVar("SagaContextT", bound=SagaContext)
3434

3535
SagaHandlerType = typing.Callable[[SagaContextT], typing.Awaitable[None]]
3636

src/cqrs/saga/models.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import dataclasses
22
import typing
33

4+
# Type variable for from_dict classmethod return type
5+
_T = typing.TypeVar("_T", bound="SagaContext")
6+
47

58
@dataclasses.dataclass
69
class SagaContext:
@@ -29,7 +32,7 @@ def to_dict(self) -> dict[str, typing.Any]:
2932
return dataclasses.asdict(self)
3033

3134
@classmethod
32-
def from_dict(cls, data: dict[str, typing.Any]) -> "SagaContext":
35+
def from_dict(cls: type[_T], data: dict[str, typing.Any]) -> _T:
3336
"""
3437
Deserialize context from dictionary.
3538

0 commit comments

Comments
 (0)