33## Basic Saga Example
44
55``` python
6+ import dataclasses
7+ import uuid
8+ import di
9+
10+ import cqrs
11+ from cqrs.saga import bootstrap
612from cqrs.saga.saga import Saga
713from cqrs.saga.step import SagaStepHandler, SagaStepResult
814from cqrs.saga.storage.memory import MemorySagaStorage
915from cqrs.saga.models import SagaContext
10- import dataclasses
11- import uuid
16+ from cqrs.response import Response
1217
1318@dataclasses.dataclass
1419class OrderContext (SagaContext ):
@@ -35,17 +40,34 @@ class ReserveInventoryStep(SagaStepHandler[OrderContext, Response]):
3540 context.inventory_reservation_id
3641 )
3742
38- # Create and execute saga
43+ # Define saga class with steps
44+ class OrderSaga (Saga[OrderContext]):
45+ steps = [ReserveInventoryStep, ProcessPaymentStep]
46+
47+ # Setup DI container
48+ di_container = di.Container()
49+ # ... register services ...
50+
51+ # Create saga storage
3952storage = MemorySagaStorage()
40- saga = Saga(
41- steps = [ReserveInventoryStep, ProcessPaymentStep],
42- container = container,
43- storage = storage,
53+
54+ # Register saga in SagaMap
55+ def saga_mapper (mapper : cqrs.SagaMap) -> None :
56+ mapper.bind(OrderContext, OrderSaga)
57+
58+ # Create saga mediator using bootstrap
59+ mediator = bootstrap.bootstrap(
60+ di_container = di_container,
61+ sagas_mapper = saga_mapper,
62+ saga_storage = storage,
4463)
4564
46- async with saga.transaction(context = context, saga_id = uuid.uuid4()) as transaction:
47- async for step_result in transaction:
48- print (f " Step completed: { step_result.step_type.__name__ } " )
65+ # Execute saga
66+ context = OrderContext(order_id = " 123" , items = [" item_1" ], total_amount = 100.0 )
67+ saga_id = uuid.uuid4()
68+
69+ async for step_result in mediator.stream(context, saga_id = saga_id):
70+ print (f " Step completed: { step_result.step_type.__name__ } " )
4971```
5072
5173** Complete example:** [ ` examples/saga.py ` ] ( https://github.com/vadikko2/cqrs/blob/master/examples/saga.py )
@@ -57,11 +79,20 @@ async with saga.transaction(context=context, saga_id=uuid.uuid4()) as transactio
5779``` python
5880from cqrs.saga.recovery import recover_saga
5981
82+ # Get saga instance (or keep reference to saga class)
83+ saga = OrderSaga()
84+
6085# Recover interrupted saga
6186saga_id = uuid.UUID(" 550e8400-e29b-41d4-a716-446655440000" )
6287
6388try :
64- await recover_saga(saga, saga_id, OrderContext)
89+ await recover_saga(
90+ saga = saga,
91+ saga_id = saga_id,
92+ context_builder = OrderContext,
93+ container = di_container, # Same container used in bootstrap
94+ storage = storage,
95+ )
6596 print (" Saga recovered successfully!" )
6697except RuntimeError :
6798 # Expected if saga was in COMPENSATING/FAILED state
@@ -77,19 +108,30 @@ except RuntimeError:
77108``` python
78109import fastapi
79110import json
80- from cqrs.saga.saga import Saga
111+ import uuid
112+ from cqrs.saga import bootstrap
113+
114+ def mediator_factory () -> cqrs.SagaMediator:
115+ """ Create saga mediator using bootstrap."""
116+ return bootstrap.bootstrap(
117+ di_container = di_container,
118+ sagas_mapper = saga_mapper,
119+ saga_storage = storage,
120+ )
81121
82122@app.post (" /process-order" )
83- async def process_order (request : ProcessOrderRequest):
123+ async def process_order (
124+ request : ProcessOrderRequest,
125+ mediator : cqrs.SagaMediator = fastapi.Depends(mediator_factory),
126+ ):
84127 async def generate_sse ():
85- saga = Saga(steps = [... ], container = container, storage = storage)
86128 saga_id = uuid.uuid4()
129+ context = OrderContext(... )
87130
88131 yield f " data: { json.dumps({' type' : ' start' , ' saga_id' : str (saga_id)})} \n\n "
89132
90- async with saga.transaction(context = context, saga_id = saga_id) as transaction:
91- async for step_result in transaction:
92- yield f " data: { json.dumps({' type' : ' step_progress' , ' step' : step_result.step_type.__name__ })} \n\n "
133+ async for step_result in mediator.stream(context, saga_id = saga_id):
134+ yield f " data: { json.dumps({' type' : ' step_progress' , ' step' : step_result.step_type.__name__ })} \n\n "
93135
94136 yield f " data: { json.dumps({' type' : ' complete' })} \n\n "
95137
@@ -107,6 +149,7 @@ async def process_order(request: ProcessOrderRequest):
107149``` python
108150from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
109151from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage, Base
152+ from cqrs.saga import bootstrap
110153
111154# Setup
112155engine = create_async_engine(" postgresql+asyncpg://user:pass@localhost/db" )
@@ -117,11 +160,24 @@ async def create_storage() -> SqlAlchemySagaStorage:
117160
118161# Usage
119162storage = await create_storage()
120- saga = Saga(steps = [... ], container = container, storage = storage)
121163
122- async with saga.transaction(context = context, saga_id = uuid.uuid4()) as transaction:
123- async for step_result in transaction:
124- print (f " Step: { step_result.step_type.__name__ } " )
164+ # Register saga in SagaMap
165+ def saga_mapper (mapper : cqrs.SagaMap) -> None :
166+ mapper.bind(OrderContext, OrderSaga)
167+
168+ # Create saga mediator using bootstrap
169+ mediator = bootstrap.bootstrap(
170+ di_container = di_container,
171+ sagas_mapper = saga_mapper,
172+ saga_storage = storage,
173+ )
174+
175+ # Execute saga
176+ context = OrderContext(... )
177+ saga_id = uuid.uuid4()
178+
179+ async for step_result in mediator.stream(context, saga_id = saga_id):
180+ print (f " Step: { step_result.step_type.__name__ } " )
125181
126182await storage.session.commit()
127183```
@@ -130,15 +186,14 @@ await storage.session.commit()
130186
131187## Compensation Retry Configuration
132188
189+ Compensation retry configuration is handled at the transaction level. When using ` SagaMediator ` , retry settings can be configured when creating the saga transaction. However, the recommended approach is to configure retry settings in your saga class or use the default settings.
190+
191+ For advanced retry configuration, you can access the transaction directly:
192+
133193``` python
134- saga = Saga(
135- steps = [... ],
136- container = container,
137- storage = storage,
138- compensation_retry_count = 5 , # Retry up to 5 times
139- compensation_retry_delay = 2.0 , # Start with 2 second delay
140- compensation_retry_backoff = 1.5 , # Multiply delay by 1.5 each time
141- )
194+ # Note: Compensation retry is configured at the SagaTransaction level
195+ # When using mediator.stream(), default retry settings are used
196+ # For custom retry configuration, you may need to access the transaction directly
142197```
143198
144199## Background Recovery Job
@@ -148,11 +203,18 @@ import asyncio
148203from cqrs.saga.recovery import recover_saga
149204
150205async def recovery_job ():
206+ saga = OrderSaga() # Get saga instance
151207 while True :
152208 incomplete_sagas = await find_incomplete_sagas()
153209 for saga_id in incomplete_sagas:
154210 try :
155- await recover_saga(saga, saga_id, OrderContext)
211+ await recover_saga(
212+ saga = saga,
213+ saga_id = saga_id,
214+ context_builder = OrderContext,
215+ container = di_container,
216+ storage = storage,
217+ )
156218 except RuntimeError :
157219 pass # Compensation completed
158220 await asyncio.sleep(60 ) # Scan every minute
0 commit comments