Skip to content

Commit ce4dbdf

Browse files
vadikko2Вадим Козыревский
andauthored
[Feature] Add saga id for step result (#59)
* Add saga id for step result * Add saga id for step result * increase version --------- Co-authored-by: Вадим Козыревский <v.kozyrevskiy@timeweb.ru>
1 parent 976b3ba commit ce4dbdf

4 files changed

Lines changed: 24 additions & 28 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
3131
name = "python-cqrs"
3232
readme = "README.md"
3333
requires-python = ">=3.10"
34-
version = "4.8.0"
34+
version = "4.8.1"
3535

3636
[project.optional-dependencies]
3737
aiobreaker = ["aiobreaker>=0.3.0"]

src/cqrs/saga/saga.py

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import dataclasses
12
import logging
23
import types
34
import typing
@@ -102,9 +103,7 @@ def __init__(
102103
container,
103104
self._state_manager,
104105
)
105-
self._fallback_executor: FallbackStepExecutor[ContextT] = FallbackStepExecutor[
106-
ContextT
107-
](
106+
self._fallback_executor: FallbackStepExecutor[ContextT] = FallbackStepExecutor[ContextT](
108107
context,
109108
container,
110109
self._state_manager,
@@ -145,11 +144,7 @@ async def __aexit__(
145144
# If an exception occurred, compensate all completed steps.
146145
# Do not compensate on GeneratorExit: consumer stopped iteration intentionally
147146
# (e.g. to resume later), which is not a failure.
148-
if (
149-
exc_val is not None
150-
and exc_type is not GeneratorExit
151-
and not self._compensated
152-
):
147+
if exc_val is not None and exc_type is not GeneratorExit and not self._compensated:
153148
self._error = exc_val
154149
await self._compensate()
155150
return False # Don't suppress the exception
@@ -213,25 +208,19 @@ async def __aiter__(
213208
# POINT OF NO RETURN: Strict Backward Recovery Strategy
214209
if status in (SagaStatus.COMPENSATING, SagaStatus.FAILED):
215210
logger.warning(
216-
f"Saga {self._saga_id} is in {status} state. "
217-
"Resuming compensation immediately.",
211+
f"Saga {self._saga_id} is in {status} state. " "Resuming compensation immediately.",
218212
)
219213

220214
# Restore completed steps from history for compensation
221-
completed_act_steps = (
222-
await self._recovery_manager.load_completed_step_names()
223-
)
224-
reconstructed_steps = (
225-
await self._recovery_manager.reconstruct_completed_steps(
226-
completed_act_steps,
227-
)
215+
completed_act_steps = await self._recovery_manager.load_completed_step_names()
216+
reconstructed_steps = await self._recovery_manager.reconstruct_completed_steps(
217+
completed_act_steps,
228218
)
229219
# Type cast is safe here because steps are reconstructed from the same saga
230220
# that uses ContextT, so they have the correct context type
231221
# We need to rebuild the list to satisfy type checker's invariance requirements
232222
self._completed_steps = [
233-
typing.cast(SagaStepHandler[ContextT, typing.Any], step)
234-
for step in reconstructed_steps
223+
typing.cast(SagaStepHandler[ContextT, typing.Any], step) for step in reconstructed_steps
235224
]
236225

237226
if not self._completed_steps:
@@ -252,9 +241,7 @@ async def __aiter__(
252241
)
253242

254243
# For RUNNING/PENDING status, load history to skip completed steps
255-
completed_step_names = (
256-
await self._recovery_manager.load_completed_step_names()
257-
)
244+
completed_step_names = await self._recovery_manager.load_completed_step_names()
258245
except ValueError:
259246
# If loading fails but ID was provided, create it
260247
await self._state_manager.create_saga(
@@ -278,7 +265,10 @@ async def __aiter__(
278265
if step_result is not None and executed_step is not None:
279266
# Track completed step for compensation
280267
self._completed_steps.append(executed_step)
281-
yield step_result
268+
yield dataclasses.replace(
269+
step_result,
270+
saga_id=self._saga_id,
271+
)
282272
elif executed_step is None:
283273
# Step was skipped (already completed), restore it for compensation
284274
primary_name = step_item.step.__name__
@@ -313,7 +303,10 @@ async def __aiter__(
313303
step = await self._container.resolve(step_type)
314304
self._completed_steps.append(step)
315305

316-
yield step_result
306+
yield dataclasses.replace(
307+
step_result,
308+
saga_id=self._saga_id,
309+
)
317310

318311
# Update context one final time before marking as completed
319312
await self._state_manager.update_context(self._context)

src/cqrs/saga/step.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import abc
44
import dataclasses
55
import typing
6+
import uuid
67

78
from cqrs.events.event import IEvent
89
from cqrs.response import IResponse
@@ -17,8 +18,6 @@ class SagaStepResult(typing.Generic[ContextT, Resp]):
1718
Result of a saga step execution.
1819
1920
Contains the response from the step's act method and metadata about the step.
20-
The step_type field uses typing.Any for compatibility,
21-
but the actual runtime type is Type[SagaStepHandler[ContextT, Resp]].
2221
2322
This is an internal data structure used by the saga pattern implementation.
2423
@@ -29,6 +28,8 @@ class SagaStepResult(typing.Generic[ContextT, Resp]):
2928
error_message: Error message if with_error is True
3029
error_traceback: Error traceback lines if with_error is True
3130
error_type: Type of exception if with_error is True
31+
saga_id: ID of the saga this step belongs to (set by execution layer).
32+
Enables client code to trigger compensation immediately if the saga fails.
3233
3334
Example::
3435
@@ -40,11 +41,12 @@ class SagaStepResult(typing.Generic[ContextT, Resp]):
4041
"""
4142

4243
response: Resp
43-
step_type: typing.Any # type: ignore[assignment] # Actual type: Type[SagaStepHandler[ContextT, Resp]]
44+
step_type: type[SagaStepHandler[ContextT, Resp]]
4445
with_error: bool = False
4546
error_message: str | None = None
4647
error_traceback: list[str] | None = None
4748
error_type: typing.Type[Exception] | None = None
49+
saga_id: uuid.UUID | None = None
4850

4951

5052
class SagaStepHandler(abc.ABC, typing.Generic[ContextT, Resp]):

tests/unit/test_saga/test_saga_basic.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,4 @@ async def test_saga_step_result_contains_correct_metadata(
313313
assert step_result.error_message is None
314314
assert step_result.error_traceback is None
315315
assert step_result.error_type is None
316+
assert step_result.saga_id is not None

0 commit comments

Comments
 (0)