Skip to content

Commit cc4a048

Browse files
author
Вадим Козыревский
committed
Fixes after review
1 parent ed1b2ac commit cc4a048

14 files changed

Lines changed: 167 additions & 191 deletions

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
8787

8888
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
8989
self._meetings_api = meetings_api
90-
self.events: list[Event] = []
90+
self._events: list[Event] = []
9191

9292
@property
9393
def events(self) -> typing.List[events.Event]:
@@ -115,7 +115,7 @@ class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryR
115115

116116
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
117117
self._meetings_api = meetings_api
118-
self.events: list[Event] = []
118+
self._events: list[Event] = []
119119

120120
@property
121121
def events(self) -> typing.List[events.Event]:
@@ -323,7 +323,7 @@ def init_queries(mapper: requests.RequestMap) -> None:
323323
mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)
324324

325325
def init_events(mapper: events.EventMap) -> None:
326-
mapper.bind(events.NotificationEvent[events_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
326+
mapper.bind(events.NotificationEvent[event_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
327327
mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
328328
```
329329

examples/saga_recovery_scheduler.py

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import dataclasses
8080
import datetime
8181
import logging
82+
import traceback
8283
import typing
8384
import uuid
8485

@@ -263,12 +264,7 @@ async def create_shipment(
263264
shipment_id = f"shipment_{order_id}"
264265
tracking_number = f"TRACK{self._tracking_counter:08d}"
265266
self._shipments[shipment_id] = tracking_number
266-
logger.info(
267-
" ✓ Created shipment %s for order %s (tracking: %s)",
268-
shipment_id,
269-
order_id,
270-
tracking_number,
271-
)
267+
logger.info(f" ✓ Created shipment {shipment_id} for order {order_id} (tracking: {tracking_number})")
272268
return shipment_id, tracking_number
273269

274270
async def cancel_shipment(self, shipment_id: str) -> None:
@@ -515,19 +511,19 @@ async def run_recovery_iteration(
515511
processed = 0
516512
for saga_id in ids:
517513
try:
518-
logger.info("Recovering saga %s...", saga_id)
514+
logger.info(f"Recovering saga {saga_id}...")
519515
await recover_saga(saga, saga_id, context_builder, container, storage)
520-
logger.info("Saga %s recovered successfully.", saga_id)
516+
logger.info(f"Saga {saga_id} recovered successfully.")
521517
processed += 1
522518
except RuntimeError as e:
523519
if "recovered in" in str(e) and "state" in str(e):
524-
logger.info("Saga %s recovery completed compensation: %s", saga_id, e)
520+
logger.info(f"Saga {saga_id} recovery completed compensation: {traceback.format_exc()}")
525521
processed += 1
526522
else:
527-
logger.exception("Saga %s recovery failed: %s", saga_id, e)
523+
logger.exception(f"Saga {saga_id} recovery failed: {traceback.format_exc()}")
528524
processed += 1
529-
except Exception as e:
530-
logger.exception("Saga %s recovery failed: %s", saga_id, e)
525+
except Exception:
526+
logger.exception(f"Saga {saga_id} recovery failed: {traceback.format_exc()}")
531527
processed += 1
532528
return processed
533529

@@ -551,25 +547,25 @@ async def recovery_loop(
551547
iteration = 0
552548
while True:
553549
iteration += 1
554-
logger.info("Recovery iteration %s", iteration)
550+
logger.info(f"Recovery iteration {iteration}")
555551
try:
556552
processed = await run_recovery_iteration(
557553
storage,
558554
saga,
559555
OrderContext,
560556
)
561557
if processed > 0:
562-
logger.info("Processed %s saga(s) this iteration.", processed)
558+
logger.info(f"Processed {processed} saga(s) this iteration.")
563559
else:
564560
logger.debug("No sagas to recover.")
565561
except asyncio.CancelledError:
566562
logger.info("Recovery loop cancelled.")
567563
raise
568564
except Exception as e:
569-
logger.exception("Recovery iteration failed: %s", e)
565+
logger.exception(f"Recovery iteration failed: {traceback.format_exc()}")
570566

571567
if max_iterations is not None and iteration >= max_iterations:
572-
logger.info("Reached max_iterations=%s, stopping.", max_iterations)
568+
logger.info(f"Reached max_iterations={max_iterations}, stopping.")
573569
break
574570
await asyncio.sleep(interval_seconds)
575571

examples/saga_sqlalchemy_storage.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,13 @@ async def setup_database(engine: AsyncEngine) -> None:
140140

141141

142142
async def main() -> None:
143-
# 1. Create SQLAlchemy Engine with Connection Pool
144-
# SQLAlchemy creates a pool by default (QueuePool for most dialects, SingletonThreadPool for SQLite)
145143
"""
146144
Run a demonstration that executes an OrderSaga using an async SQLAlchemy engine and persistent SqlAlchemySagaStorage.
147-
145+
148146
Initializes a pooled async SQLAlchemy engine and schema, creates a session factory and SqlAlchemySagaStorage, bootstraps a mediator with a DI container and saga mapper, runs an OrderSaga while streaming step results to stdout, and then reloads and prints the persisted saga state and step history before disposing the engine.
149147
"""
148+
# 1. Create SQLAlchemy Engine with Connection Pool
149+
# SQLAlchemy creates a pool by default (QueuePool for most dialects, SingletonThreadPool for SQLite)
150150
engine = create_async_engine(
151151
DB_URL,
152152
echo=False, # Set to True to see SQL queries

src/cqrs/saga/compensation.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,6 @@ async def compensate_steps(
105105
"compensate",
106106
SagaStepStatus.COMPLETED,
107107
)
108-
if self._on_after_compensate_step is not None:
109-
await self._on_after_compensate_step()
110-
111108
except Exception as compensation_error:
112109
await self._storage.log_step(
113110
self._saga_id,
@@ -118,6 +115,19 @@ async def compensate_steps(
118115
)
119116
# Store both step and error for better error reporting
120117
compensation_errors.append((step, compensation_error))
118+
continue
119+
120+
# Callback only after successful step compensation; failures are not treated as step failure
121+
if self._on_after_compensate_step is not None:
122+
try:
123+
await self._on_after_compensate_step()
124+
except Exception as callback_error:
125+
logger.error(
126+
"on_after_compensate_step failed (e.g. run.commit): %s",
127+
callback_error,
128+
exc_info=callback_error,
129+
)
130+
raise
121131

122132
# If compensation failed after all retries
123133
if compensation_errors:

src/cqrs/saga/saga.py

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ def __init__(
8686
self._completed_steps: list[SagaStepHandler[ContextT, typing.Any]] = []
8787
self._error: BaseException | None = None
8888
self._compensated: bool = False
89+
self._comp_retry_count = compensation_retry_count
90+
self._comp_retry_delay = compensation_retry_delay
91+
self._comp_retry_backoff = compensation_retry_backoff
8992

9093
self._saga_id = saga_id or uuid.uuid4()
9194
self._is_new_saga = saga_id is None
@@ -112,9 +115,9 @@ def __init__(
112115
self._saga_id,
113116
context,
114117
storage,
115-
compensation_retry_count,
116-
compensation_retry_delay,
117-
compensation_retry_backoff,
118+
self._comp_retry_count,
119+
self._comp_retry_delay,
120+
self._comp_retry_backoff,
118121
)
119122

120123
@property
@@ -172,6 +175,51 @@ async def __aiter__(
172175
async for step_result in self._execute(None):
173176
yield step_result
174177

178+
def _build_run_scoped_components(
179+
self,
180+
run: SagaStorageRun,
181+
) -> tuple[
182+
SagaStateManager,
183+
SagaRecoveryManager,
184+
SagaStepExecutor[ContextT],
185+
FallbackStepExecutor[ContextT],
186+
SagaCompensator[ContextT],
187+
]:
188+
"""Build state manager, recovery manager, executors, and compensator for a storage run (checkpoint commits)."""
189+
state_manager = SagaStateManager(self._saga_id, run)
190+
recovery_manager = SagaRecoveryManager(
191+
self._saga_id,
192+
run,
193+
self._container,
194+
self._saga.steps,
195+
)
196+
step_executor = SagaStepExecutor(
197+
self._context,
198+
self._container,
199+
state_manager,
200+
)
201+
fallback_executor = FallbackStepExecutor(
202+
self._context,
203+
self._container,
204+
state_manager,
205+
)
206+
compensator = SagaCompensator(
207+
self._saga_id,
208+
self._context,
209+
run,
210+
self._comp_retry_count,
211+
self._comp_retry_delay,
212+
self._comp_retry_backoff,
213+
on_after_compensate_step=run.commit,
214+
)
215+
return (
216+
state_manager,
217+
recovery_manager,
218+
step_executor,
219+
fallback_executor,
220+
compensator,
221+
)
222+
175223
async def _execute(
176224
self,
177225
run: SagaStorageRun | None,
@@ -189,32 +237,13 @@ async def _execute(
189237
RuntimeError: If the saga was recovered in COMPENSATING or FAILED state and compensation was completed, forward execution is not allowed.
190238
"""
191239
if run is not None:
192-
state_manager = SagaStateManager(self._saga_id, run)
193-
recovery_manager = SagaRecoveryManager(
194-
self._saga_id,
195-
run,
196-
self._container,
197-
self._saga.steps,
198-
)
199-
step_executor = SagaStepExecutor(
200-
self._context,
201-
self._container,
202-
state_manager,
203-
)
204-
fallback_executor = FallbackStepExecutor(
205-
self._context,
206-
self._container,
240+
(
207241
state_manager,
208-
)
209-
compensator = SagaCompensator(
210-
self._saga_id,
211-
self._context,
212-
run,
213-
self._compensator._retry_count,
214-
self._compensator._retry_delay,
215-
self._compensator._retry_backoff,
216-
on_after_compensate_step=run.commit,
217-
)
242+
recovery_manager,
243+
step_executor,
244+
fallback_executor,
245+
compensator,
246+
) = self._build_run_scoped_components(run)
218247
else:
219248
state_manager = self._state_manager
220249
recovery_manager = self._recovery_manager

src/cqrs/saga/storage/protocol.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,23 @@ async def create_saga(
1919
saga_id: uuid.UUID,
2020
name: str,
2121
context: dict[str, typing.Any],
22-
) -> None: """
22+
) -> None:
23+
"""
2324
Create a new saga execution record with initial PENDING status and version 1.
2425
2526
Parameters:
2627
saga_id (uuid.UUID): Unique identifier for the saga (primary key).
2728
name (str): Human-friendly name used for diagnostics and filtering.
2829
context (dict[str, Any]): JSON-serializable initial saga context to persist.
2930
"""
30-
...
31+
3132
async def update_context(
3233
self,
3334
saga_id: uuid.UUID,
3435
context: dict[str, typing.Any],
3536
current_version: int | None = None,
36-
) -> None: """
37+
) -> None:
38+
"""
3739
Persist a snapshot of the saga's execution context, optionally using optimistic locking.
3840
3941
Parameters:
@@ -45,12 +47,13 @@ async def update_context(
4547
Raises:
4648
SagaConcurrencyError: If `current_version` is provided and does not match the stored version.
4749
"""
48-
...
50+
4951
async def update_status(
5052
self,
5153
saga_id: uuid.UUID,
5254
status: SagaStatus,
53-
) -> None: """
55+
) -> None:
56+
"""
5457
Set the global status for the saga identified by `saga_id`.
5558
5659
Parameters:
@@ -60,15 +63,16 @@ async def update_status(
6063
Notes:
6164
This operation does not commit the storage session; the caller must call `commit()` on the active run or session to persist the change.
6265
"""
63-
...
66+
6467
async def log_step(
6568
self,
6669
saga_id: uuid.UUID,
6770
step_name: str,
6871
action: typing.Literal["act", "compensate"],
6972
status: SagaStepStatus,
7073
details: str | None = None,
71-
) -> None: """
74+
) -> None:
75+
"""
7276
Append a step transition to the saga's execution log.
7377
7478
Parameters:
@@ -78,13 +82,14 @@ async def log_step(
7882
status (SagaStepStatus): The step transition status to record (e.g., started, completed, failed, compensated).
7983
details (str | None): Optional human-readable details or diagnostics about the transition.
8084
"""
81-
...
85+
8286
async def load_saga_state(
8387
self,
8488
saga_id: uuid.UUID,
8589
*,
8690
read_for_update: bool = False,
87-
) -> tuple[SagaStatus, dict[str, typing.Any], int]: """
91+
) -> tuple[SagaStatus, dict[str, typing.Any], int]:
92+
"""
8893
Load the current saga execution state.
8994
9095
Parameters:
@@ -94,11 +99,12 @@ async def load_saga_state(
9499
Returns:
95100
tuple[SagaStatus, dict[str, Any], int]: A tuple containing the saga's global status, the latest persisted context (JSON-serializable), and the current optimistic-locking version number.
96101
"""
97-
...
102+
98103
async def get_step_history(
99104
self,
100105
saga_id: uuid.UUID,
101-
) -> list[SagaLogEntry]: """
106+
) -> list[SagaLogEntry]:
107+
"""
102108
Retrieve the chronological step log for a saga.
103109
104110
Parameters:
@@ -107,19 +113,20 @@ async def get_step_history(
107113
Returns:
108114
list[SagaLogEntry]: Ordered list of step log entries for the saga, from oldest to newest.
109115
"""
110-
...
111-
async def commit(self) -> None: """
112-
Finalize the storage run by persisting and committing all pending changes made during this session.
113-
114-
This method makes the run's checkpointed changes durable; the caller is responsible for invoking commit at logical checkpoints to persist session state.
115-
"""
116-
...
117-
async def rollback(self) -> None: """
118-
Abort the current storage run and revert any uncommitted changes in the session.
119-
120-
This releases the run's transactional state without persisting pending updates so that the storage remains as it was before the run began.
121-
"""
122-
...
116+
117+
async def commit(self) -> None:
118+
"""
119+
Finalize the storage run by persisting and committing all pending changes made during this session.
120+
121+
This method makes the run's checkpointed changes durable; the caller is responsible for invoking commit at logical checkpoints to persist session state.
122+
"""
123+
124+
async def rollback(self) -> None:
125+
"""
126+
Abort the current storage run and revert any uncommitted changes in the session.
127+
128+
This releases the run's transactional state without persisting pending updates so that the storage remains as it was before the run began.
129+
"""
123130

124131

125132
class ISagaStorage(abc.ABC):

0 commit comments

Comments
 (0)