Skip to content

Commit 7997533

Browse files
📝 Add docstrings to reduce-saga-storage-overhead (#62)
1 parent b92e39f commit 7997533

16 files changed

Lines changed: 734 additions & 114 deletions

examples/saga.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,20 @@ async def create_shipment(
275275
items: list[str],
276276
address: str,
277277
) -> tuple[str, str]:
278-
"""Create a shipment for the order."""
278+
"""
279+
Create a shipment for an order and record its tracking number.
280+
281+
Parameters:
282+
order_id (str): Identifier of the order to ship.
283+
items (list[str]): List of item identifiers included in the shipment.
284+
address (str): Shipping address; must not be empty.
285+
286+
Returns:
287+
tuple[str, str]: A tuple containing the created `shipment_id` and its `tracking_number`.
288+
289+
Raises:
290+
ValueError: If `address` is empty.
291+
"""
279292
if not address:
280293
raise ValueError("Shipping address is required")
281294

@@ -469,7 +482,11 @@ class OrderSaga(Saga[OrderContext]):
469482

470483

471484
async def run_successful_saga() -> None:
472-
"""Demonstrate a successful saga execution."""
485+
"""
486+
Run an example order-processing saga and print the per-step progress and final results.
487+
488+
Sets up mock services, dependency injection, and in-memory saga storage; executes the OrderSaga with a generated saga ID, prints each completed step, then prints the final saga status, context fields (inventory reservation, payment ID, shipment ID) and the persisted execution log. If saga execution fails, the failure is printed and the exception is re-raised.
489+
"""
473490
print("\n" + "=" * 70)
474491
print("SCENARIO 1: Successful Order Processing Saga")
475492
print("=" * 70)
@@ -736,4 +753,4 @@ async def main() -> None:
736753

737754

738755
if __name__ == "__main__":
739-
asyncio.run(main())
756+
asyncio.run(main())

examples/saga_fallback.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,17 @@ async def act(
144144
self,
145145
context: OrderContext,
146146
) -> SagaStepResult[OrderContext, ReserveInventoryResponse]:
147-
"""Primary step that always raises an error."""
147+
"""
148+
Simulate a failing primary reservation step for the saga.
149+
150+
This action always raises a RuntimeError to emulate an unavailable downstream service and trigger fallback or compensation behavior.
151+
152+
Parameters:
153+
context (OrderContext): Shared saga context containing order details (e.g., order_id, user_id, amount, reservation_id).
154+
155+
Raises:
156+
RuntimeError: Indicates the primary step failed (service unavailable).
157+
"""
148158
self._call_count += 1
149159
logger.info(
150160
f" [PrimaryStep] Executing act() for order {context.order_id} " f"(call #{self._call_count})...",
@@ -271,7 +281,11 @@ async def run_saga(
271281

272282

273283
async def main() -> None:
274-
"""Run saga fallback example."""
284+
"""
285+
Run an interactive demonstration of the saga fallback pattern with a circuit breaker.
286+
287+
Executes three scenarios that show a failing primary step with an automatic fallback, the circuit breaker opening after a configurable number of failures, and fail-fast behavior when the circuit is open. Also conditionally demonstrates configuring a Redis-backed circuit breaker storage, prints per-scenario results and a summary, and informs about missing optional dependencies.
288+
"""
275289
print("\n" + "=" * 80)
276290
print("SAGA FALLBACK PATTERN WITH CIRCUIT BREAKER EXAMPLE")
277291
print("=" * 80)
@@ -419,4 +433,4 @@ class OrderSagaWithRedisBreaker(Saga[OrderContext]):
419433

420434

421435
if __name__ == "__main__":
422-
asyncio.run(main())
436+
asyncio.run(main())

examples/saga_recovery.py

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,20 @@ async def create_shipment(
281281
items: list[str],
282282
address: str,
283283
) -> tuple[str, str]:
284-
"""Create a shipment for the order."""
284+
"""
285+
Create a shipment record for the given order and generate a tracking number.
286+
287+
Parameters:
288+
order_id (str): Identifier of the order.
289+
items (list[str]): Items included in the shipment.
290+
address (str): Destination shipping address.
291+
292+
Returns:
293+
tuple[str, str]: A tuple containing the shipment ID and the tracking number.
294+
295+
Raises:
296+
ValueError: If `address` is empty.
297+
"""
285298
if not address:
286299
raise ValueError("Shipping address is required")
287300

@@ -519,16 +532,12 @@ async def resolve(self, type_: type) -> typing.Any:
519532

520533
async def simulate_interrupted_saga() -> tuple[uuid.UUID, MemorySagaStorage]:
521534
"""
522-
Simulate a saga that gets interrupted after the first step.
523-
524-
This simulates what happens when:
525-
- Server crashes after completing ReserveInventoryStep
526-
- Network timeout occurs
527-
- Process is killed during execution
528-
- Database connection is lost
529-
535+
Simulate a saga that is interrupted after the inventory reservation step to produce a recoverable persisted state.
536+
530537
Returns:
531-
Tuple of (saga_id, storage) so we can recover it later.
538+
tuple:
539+
saga_id (uuid.UUID): Identifier of the created saga.
540+
storage (MemorySagaStorage): In-memory storage containing the persisted saga state and step history for recovery.
532541
"""
533542
print("\n" + "=" * 70)
534543
print("SCENARIO 1: Simulating Interrupted Saga")
@@ -622,13 +631,13 @@ async def recover_interrupted_saga(
622631
storage: MemorySagaStorage,
623632
) -> None:
624633
"""
625-
Recover and complete the interrupted saga.
626-
627-
This demonstrates how recovery ensures eventual consistency by:
628-
1. Loading saga state from storage
629-
2. Reconstructing context
630-
3. Resuming execution from last completed step
631-
4. Completing remaining steps
634+
Recover and complete an interrupted saga using persisted state.
635+
636+
Loads the saga state from storage, reconstructs the saga context, resumes execution from the last completed step, and completes any remaining steps to restore eventual consistency.
637+
638+
Parameters:
639+
saga_id (uuid.UUID): Identifier of the saga instance to recover.
640+
storage (MemorySagaStorage): Durable storage containing the saga's persisted state and step history.
632641
"""
633642
print("\n" + "=" * 70)
634643
print("SCENARIO 2: Recovering Interrupted Saga")
@@ -688,10 +697,12 @@ async def recover_interrupted_saga(
688697

689698
async def simulate_interrupted_compensation() -> tuple[uuid.UUID, MemorySagaStorage]:
690699
"""
691-
Simulate a saga that fails and gets interrupted during compensation.
692-
693-
This shows recovery of compensation logic, which is critical for
694-
maintaining consistency when rollback is interrupted.
700+
Simulate a saga that fails and is interrupted during compensation.
701+
702+
Sets up services, a saga, and a failing shipment step to trigger compensation that is then artificially interrupted; returns identifiers and storage state for performing recovery in a separate run.
703+
704+
Returns:
705+
tuple[uuid.UUID, MemorySagaStorage]: The saga ID and the in-memory storage containing the persisted saga state and step history after the simulated interruption.
695706
"""
696707
print("\n" + "=" * 70)
697708
print("SCENARIO 3: Simulating Interrupted Compensation")
@@ -801,10 +812,13 @@ async def recover_interrupted_compensation(
801812
storage: MemorySagaStorage,
802813
) -> None:
803814
"""
804-
Recover and complete the interrupted compensation.
805-
806-
This ensures that even if compensation is interrupted, it will
807-
eventually complete, releasing all resources.
815+
Recover and complete an interrupted compensation for a saga.
816+
817+
Loads the saga state from the provided storage using the given saga identifier and drives any incomplete compensation steps to completion, ensuring resources (inventory, payments, shipments) are released and the system reaches a consistent state. Progress and final status are printed to stdout.
818+
819+
Parameters:
820+
saga_id (uuid.UUID): Identifier of the saga to recover.
821+
storage (MemorySagaStorage): Persistent storage containing the saga state and step history.
808822
"""
809823
print("\n" + "=" * 70)
810824
print("SCENARIO 4: Recovering Interrupted Compensation")
@@ -910,4 +924,4 @@ async def main() -> None:
910924

911925

912926
if __name__ == "__main__":
913-
asyncio.run(main())
927+
asyncio.run(main())

examples/saga_recovery_scheduler.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,11 @@ async def create_interrupted_saga(storage: MemorySagaStorage) -> uuid.UUID:
620620

621621

622622
async def main() -> None:
623-
"""Run the recovery scheduler example."""
623+
"""
624+
Run the saga recovery scheduler demo and display its outcome.
625+
626+
Sets up an in-memory saga storage, creates a simulated interrupted saga and marks it stale, runs the recovery loop for three iterations (using the module's recovery_loop and recovery configuration constants), then loads and prints the final saga state.
627+
"""
624628
print("\n" + "=" * 70)
625629
print("SAGA RECOVERY SCHEDULER EXAMPLE")
626630
print("=" * 70)
@@ -660,4 +664,4 @@ async def main() -> None:
660664

661665

662666
if __name__ == "__main__":
663-
asyncio.run(main())
667+
asyncio.run(main())

examples/saga_sqlalchemy_storage.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ async def setup_database(engine: AsyncEngine) -> None:
142142
async def main() -> None:
143143
# 1. Create SQLAlchemy Engine with Connection Pool
144144
# SQLAlchemy creates a pool by default (QueuePool for most dialects, SingletonThreadPool for SQLite)
145+
"""
146+
Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage.
147+
148+
Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine.
149+
"""
145150
engine = create_async_engine(
146151
DB_URL,
147152
echo=False, # Set to True to see SQL queries
@@ -202,4 +207,4 @@ def saga_mapper(mapper: cqrs.SagaMap) -> None:
202207

203208

204209
if __name__ == "__main__":
205-
asyncio.run(main())
210+
asyncio.run(main())

src/cqrs/saga/compensation.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,16 @@ def __init__(
2626
on_after_compensate_step: typing.Callable[[], typing.Awaitable[None]] | None = None,
2727
) -> None:
2828
"""
29-
Initialize compensator.
30-
31-
Args:
32-
saga_id: UUID of the saga
33-
context: Saga context
34-
storage: Saga storage implementation (or run object with same interface)
35-
retry_count: Number of retry attempts for compensation
36-
retry_delay: Initial delay between retries in seconds
37-
retry_backoff: Backoff multiplier for exponential delay
38-
on_after_compensate_step: Optional async callback after each successfully
39-
compensated step (e.g. run.commit() for checkpoint).
29+
Create a SagaCompensator configured to perform compensation of completed saga steps with retry and optional post-step callback.
30+
31+
Parameters:
32+
saga_id: Identifier of the saga.
33+
context: Saga execution context passed to step compensation handlers.
34+
storage: Storage or run object implementing saga persistence operations.
35+
retry_count: Maximum number of attempts per step before giving up.
36+
retry_delay: Initial delay in seconds before the first retry.
37+
retry_backoff: Multiplier applied to the delay for each successive retry (exponential backoff).
38+
on_after_compensate_step: Optional async callback invoked after each step is successfully compensated.
4039
"""
4140
self._saga_id = saga_id
4241
self._context = context
@@ -51,10 +50,15 @@ async def compensate_steps(
5150
completed_steps: list[SagaStepHandler[ContextT, typing.Any]],
5251
) -> None:
5352
"""
54-
Compensate all completed steps in reverse order with retry mechanism.
55-
56-
Args:
57-
completed_steps: List of completed step handlers to compensate
53+
Compensates completed saga steps in reverse order, applying retry logic and recording step statuses.
54+
55+
Compensates each handler from last to first, skipping steps already recorded as compensated in the saga history. Updates the saga status to COMPENSATING at the start and logs per-step statuses (STARTED, COMPLETED, FAILED) in storage. After a step completes, the optional on_after_compensate_step callback (if provided) is awaited. If any step fails after all retry attempts, the saga is marked as FAILED. If no completed steps are provided, no compensation is attempted and the saga is marked as FAILED.
56+
57+
Parameters:
58+
completed_steps (list[SagaStepHandler[ContextT, typing.Any]]): Handlers corresponding to steps that completed during the saga; these will be compensated in reverse order.
59+
60+
Returns:
61+
None
5862
"""
5963
await self._storage.update_status(self._saga_id, SagaStatus.COMPENSATING)
6064

@@ -170,4 +174,4 @@ async def _compensate_step_with_retry(
170174

171175
# If we get here, all retries failed
172176
if last_exception:
173-
raise last_exception
177+
raise last_exception

src/cqrs/saga/execution.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ def __init__(
2424
storage: ISagaStorage | SagaStorageRun,
2525
) -> None:
2626
"""
27-
Initialize state manager.
28-
29-
Args:
30-
saga_id: UUID of the saga
31-
storage: Saga storage implementation
27+
Create a SagaStateManager bound to a specific saga identifier and storage backend.
28+
29+
Parameters:
30+
saga_id: Identifier for the saga instance.
31+
storage: Storage backend implementing ISagaStorage or SagaStorageRun used to persist saga state and history.
3232
"""
3333
self._saga_id = saga_id
3434
self._storage = storage
@@ -84,13 +84,13 @@ def __init__(
8484
saga_steps: list[type[SagaStepHandler] | Fallback],
8585
) -> None:
8686
"""
87-
Initialize recovery manager.
88-
89-
Args:
90-
saga_id: UUID of the saga
91-
storage: Saga storage implementation
92-
container: DI container for resolving step handlers
93-
saga_steps: List of saga steps
87+
Construct a SagaRecoveryManager that holds the identifiers, storage, DI container, and configured saga steps required to reconstruct a saga's execution state.
88+
89+
Parameters:
90+
saga_id: Identifier for the saga instance (e.g., UUID or other unique value).
91+
storage: Persistence backend implementing saga history operations (ISagaStorage or SagaStorageRun).
92+
container: Dependency injection container used to resolve step handler instances.
93+
saga_steps: Ordered list of saga step types or Fallback wrappers that define the saga's execution sequence.
9494
"""
9595
self._saga_id = saga_id
9696
self._storage = storage
@@ -99,10 +99,10 @@ def __init__(
9999

100100
async def load_completed_step_names(self) -> set[str]:
101101
"""
102-
Load set of completed step names from history.
103-
102+
Return the names of saga steps that completed their primary ("act") action.
103+
104104
Returns:
105-
Set of step names that have been completed
105+
set[str]: Step names recorded with status `SagaStepStatus.COMPLETED` and action `"act"`.
106106
"""
107107
history = await self._storage.get_step_history(self._saga_id)
108108
return {e.step_name for e in history if e.status == SagaStepStatus.COMPLETED and e.action == "act"}
@@ -112,13 +112,13 @@ async def reconstruct_completed_steps(
112112
completed_step_names: set[str],
113113
) -> list[SagaStepHandler[SagaContext, typing.Any]]:
114114
"""
115-
Reconstruct list of completed step handlers from history.
116-
117-
Args:
118-
completed_step_names: Set of completed step names
119-
115+
Reconstructs and returns the resolved step handler instances corresponding to the completed steps, preserving saga execution order.
116+
117+
Parameters:
118+
completed_step_names (set[str]): Names of steps that completed the "act" action.
119+
120120
Returns:
121-
List of step handlers in execution order
121+
list[SagaStepHandler[SagaContext, typing.Any]]: Resolved step handler instances in execution order. For Fallback wrappers, the primary handler is chosen if its name appears in completed_step_names; otherwise the fallback handler is chosen when present.
122122
"""
123123
completed_steps: list[SagaStepHandler[SagaContext, typing.Any]] = []
124124

@@ -365,4 +365,4 @@ async def execute_fallback_step(
365365
raise fallback_error
366366
else:
367367
# Should not fallback, re-raise original error
368-
raise primary_error
368+
raise primary_error

0 commit comments

Comments
 (0)