Skip to content

Commit 6a04f32

Browse files
author
Вадим Козыревский
committed
Fix fetching saga recovery candidates
1 parent a1427f3 commit 6a04f32

8 files changed

Lines changed: 30 additions & 12 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
3131
name = "python-cqrs"
3232
readme = "README.md"
3333
requires-python = ">=3.10"
34-
version = "4.7.0"
34+
version = "4.7.1"
3535

3636
[project.optional-dependencies]
3737
aiobreaker = ["aiobreaker>=0.3.0"]

src/cqrs/saga/compensation.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ async def compensate_steps(
5454
"""
5555
await self._storage.update_status(self._saga_id, SagaStatus.COMPENSATING)
5656

57+
if not completed_steps:
58+
logger.info(
59+
f"Saga {self._saga_id}: completed_steps is empty, "
60+
"skipping compensation (no step.compensate() will be called).",
61+
)
62+
await self._storage.update_status(self._saga_id, SagaStatus.FAILED)
63+
return
64+
5765
# Load history to skip already compensated steps
5866
history = await self._storage.get_step_history(self._saga_id)
5967
compensated_steps = {

src/cqrs/saga/saga.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,14 @@ async def __aiter__(
234234
for step in reconstructed_steps
235235
]
236236

237+
if not self._completed_steps:
238+
logger.warning(
239+
f"Saga {self._saga_id}: no completed steps to compensate "
240+
"(saga failed before any step finished 'act', or step names in "
241+
"storage do not match saga step class names). "
242+
"Marking as FAILED without calling compensate().",
243+
)
244+
237245
# Immediately proceed to compensation - no forward execution
238246
await self._compensate()
239247

src/cqrs/saga/storage/memory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async def get_sagas_for_recovery(
123123
max_recovery_attempts: int = 5,
124124
stale_after_seconds: int | None = None,
125125
) -> list[uuid.UUID]:
126-
recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING, SagaStatus.FAILED)
126+
recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING)
127127
now = datetime.datetime.now(datetime.timezone.utc)
128128
threshold = (
129129
(now - datetime.timedelta(seconds=stale_after_seconds))

src/cqrs/saga/storage/protocol.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,10 @@ async def get_sagas_for_recovery(
9090
updated). None means no staleness filter (backward compatible).
9191
9292
Returns:
93-
List of saga IDs (RUNNING, COMPENSATING, or FAILED), ordered by
94-
updated_at ascending, with recovery_attempts < max_recovery_attempts,
95-
and optionally updated_at older than the staleness threshold.
93+
List of saga IDs (RUNNING or COMPENSATING only; FAILED sagas are
94+
not included), ordered by updated_at ascending, with
95+
recovery_attempts < max_recovery_attempts, and optionally
96+
updated_at older than the staleness threshold.
9697
"""
9798

9899
@abc.abstractmethod

src/cqrs/saga/storage/sqlalchemy.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,6 @@ async def get_sagas_for_recovery(
321321
recoverable = (
322322
SagaStatus.RUNNING,
323323
SagaStatus.COMPENSATING,
324-
SagaStatus.FAILED,
325324
)
326325
async with self.session_factory() as session:
327326
stmt = (

tests/integration/test_saga_storage_memory.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
172172
storage: MemorySagaStorage,
173173
test_context: dict[str, str],
174174
) -> None:
175-
"""Positive: returns RUNNING, COMPENSATING, FAILED sagas only."""
175+
"""Positive: returns RUNNING and COMPENSATING sagas only; FAILED excluded."""
176176
id1, id2, id3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4()
177177
for sid in (id1, id2, id3):
178178
await storage.create_saga(saga_id=sid, name="saga", context=test_context)
@@ -181,8 +181,9 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
181181
await storage.update_status(id3, SagaStatus.FAILED)
182182

183183
ids = await storage.get_sagas_for_recovery(limit=10)
184-
assert set(ids) == {id1, id2, id3}
185-
assert len(ids) == 3
184+
assert set(ids) == {id1, id2}
185+
assert id3 not in ids
186+
assert len(ids) == 2
186187

187188
async def test_get_sagas_for_recovery_respects_limit(
188189
self,

tests/integration/test_saga_storage_sqlalchemy.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
297297
storage: SqlAlchemySagaStorage,
298298
test_context: dict[str, str],
299299
) -> None:
300-
"""Positive: returns RUNNING, COMPENSATING, FAILED sagas only."""
300+
"""Positive: returns RUNNING and COMPENSATING sagas only; FAILED excluded."""
301301
id1, id2, id3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4()
302302
for sid in (id1, id2, id3):
303303
await storage.create_saga(saga_id=sid, name="saga", context=test_context)
@@ -306,8 +306,9 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
306306
await storage.update_status(id3, SagaStatus.FAILED)
307307

308308
ids = await storage.get_sagas_for_recovery(limit=10)
309-
assert set(ids) == {id1, id2, id3}
310-
assert len(ids) == 3
309+
assert set(ids) == {id1, id2}
310+
assert id3 not in ids
311+
assert len(ids) == 2
311312

312313
async def test_get_sagas_for_recovery_respects_limit(
313314
self,

0 commit comments

Comments
 (0)