Skip to content

Commit fe18c9d

Browse files
author
Вадим Козыревский
committed
Added SagaMediator supporting
1 parent ec5ed0b commit fe18c9d

32 files changed

Lines changed: 2904 additions & 691 deletions

README.md

Lines changed: 28 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,13 @@ Sagas enable eventual consistency by executing a series of steps where each step
249249
### Example
250250

251251
```python
252-
from cqrs.saga.saga import Saga
253-
from cqrs.saga.step import SagaStepHandler, SagaStepResult
254-
from cqrs.saga.storage.memory import MemorySagaStorage
255-
from cqrs.saga.models import SagaContext
256-
from cqrs.response import Response
252+
import dataclasses
257253
import uuid
254+
from cqrs.saga.models import SagaContext
255+
from cqrs.saga.saga import Saga
256+
from cqrs.saga.step import SagaStepHandler
258257

258+
@dataclasses.dataclass
259259
class OrderContext(SagaContext):
260260
order_id: str
261261
user_id: str
@@ -264,58 +264,20 @@ class OrderContext(SagaContext):
264264
inventory_reservation_id: str | None = None
265265
payment_id: str | None = None
266266

267-
class ReserveInventoryResponse(Response):
268-
reservation_id: str
269-
270-
class ProcessPaymentResponse(Response):
271-
payment_id: str
272-
273-
class ReserveInventoryStep(SagaStepHandler[OrderContext, ReserveInventoryResponse]):
274-
def __init__(self, inventory_service):
275-
self._inventory_service = inventory_service
276-
277-
async def act(self, context: OrderContext) -> SagaStepResult[OrderContext, ReserveInventoryResponse]:
278-
# Reserve inventory
279-
reservation_id = await self._inventory_service.reserve_items(context.order_id, context.items)
280-
context.inventory_reservation_id = reservation_id
281-
return self._generate_step_result(ReserveInventoryResponse(reservation_id=reservation_id))
282-
283-
async def compensate(self, context: OrderContext) -> None:
284-
# Release inventory if saga fails
285-
if context.inventory_reservation_id:
286-
await self._inventory_service.release_items(context.inventory_reservation_id)
287-
288-
class ProcessPaymentStep(SagaStepHandler[OrderContext, ProcessPaymentResponse]):
289-
def __init__(self, payment_service):
290-
self._payment_service = payment_service
267+
# Define saga class with steps
268+
class OrderSaga(Saga[OrderContext]):
269+
steps = [
270+
ReserveInventoryStep,
271+
ProcessPaymentStep,
272+
]
291273

292-
async def act(self, context: OrderContext) -> SagaStepResult[OrderContext, ProcessPaymentResponse]:
293-
# Process payment
294-
payment_id = await self._payment_service.charge(context.order_id, context.total_amount)
295-
context.payment_id = payment_id
296-
return self._generate_step_result(ProcessPaymentResponse(payment_id=payment_id))
297-
298-
async def compensate(self, context: OrderContext) -> None:
299-
# Refund payment if saga fails
300-
if context.payment_id:
301-
await self._payment_service.refund(context.payment_id)
302-
303-
# Create saga with storage
304-
storage = MemorySagaStorage()
305-
saga = Saga(
306-
steps=[ReserveInventoryStep, ProcessPaymentStep],
307-
container=container, # DI container
308-
storage=storage,
309-
)
310-
311-
# Execute saga
312-
saga_id = uuid.uuid4()
274+
# Execute saga via mediator
313275
context = OrderContext(order_id="123", user_id="user_1", items=["item_1"], total_amount=100.0)
276+
saga_id = uuid.uuid4()
314277

315-
async with saga.transaction(context=context, saga_id=saga_id) as transaction:
316-
async for step_result in transaction:
317-
print(f"Step completed: {step_result.step_type.__name__}")
318-
# If any step fails, compensation happens automatically
278+
async for step_result in mediator.stream(context, saga_id=saga_id):
279+
print(f"Step completed: {step_result.step_type.__name__}")
280+
# If any step fails, compensation happens automatically
319281
```
320282

321283
The saga state and step history are persisted to `SagaStorage`. The `SagaLog` maintains a complete audit trail
@@ -327,9 +289,18 @@ If a saga is interrupted (e.g., due to a crash), you can recover it using the re
327289
```python
328290
from cqrs.saga.recovery import recover_saga
329291

292+
# Get saga instance from mediator's saga map (or keep reference to saga class)
293+
saga = OrderSaga()
294+
330295
# Recover interrupted saga - will resume from last completed step
331296
# or continue compensation if saga was in compensating state
332-
await recover_saga(saga, saga_id, OrderContext)
297+
await recover_saga(
298+
saga=saga,
299+
saga_id=saga_id,
300+
context_builder=OrderContext,
301+
container=di_container, # Same container used in bootstrap
302+
storage=storage,
303+
)
333304

334305
# Access execution history (SagaLog) for monitoring and debugging
335306
history = await storage.get_step_history(saga_id)
@@ -350,7 +321,8 @@ The package includes built-in support for generating Mermaid diagrams from Saga
350321
```python
351322
from cqrs.saga.mermaid import SagaMermaid
352323

353-
# Create Mermaid generator from saga
324+
# Create Mermaid generator from saga class
325+
saga = OrderSaga()
354326
generator = SagaMermaid(saga)
355327

356328
# Generate Sequence diagram showing execution flow

0 commit comments

Comments
 (0)