Skip to content

Commit 2738cbc

Browse files
author
Вадим Козыревский
committed
Fix deadlocks
1 parent cc4a048 commit 2738cbc

22 files changed

Lines changed: 1012 additions & 449 deletions

.github/workflows/codspeed.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ jobs:
4646
echo "MySQL did not become ready in time"
4747
exit 1
4848
49+
- name: Wait for PostgreSQL
50+
run: |
51+
for i in $(seq 1 30); do
52+
if docker compose -f docker-compose-test.yml exec -T postgres_tests pg_isready -h localhost -U cqrs -q 2>/dev/null; then
53+
echo "PostgreSQL is ready"
54+
exit 0
55+
fi
56+
echo "Waiting for PostgreSQL... ($i/30)"
57+
sleep 2
58+
done
59+
echo "PostgreSQL did not become ready in time"
60+
exit 1
61+
4962
- name: Wait for Redis
5063
run: |
5164
for i in $(seq 1 15); do

.github/workflows/tests.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ jobs:
6969
vermin --target=3.10- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests
7070
7171
test:
72+
name: test (py ${{ matrix.python-version }})
7273
runs-on: ubuntu-latest
7374
strategy:
7475
fail-fast: false
@@ -105,6 +106,19 @@ jobs:
105106
echo "MySQL did not become ready in time"
106107
exit 1
107108
109+
- name: Wait for PostgreSQL
110+
run: |
111+
for i in $(seq 1 30); do
112+
if docker compose -f docker-compose-test.yml exec -T postgres_tests pg_isready -h localhost -U cqrs -q 2>/dev/null; then
113+
echo "PostgreSQL is ready"
114+
exit 0
115+
fi
116+
echo "Waiting for PostgreSQL... ($i/30)"
117+
sleep 2
118+
done
119+
echo "PostgreSQL did not become ready in time"
120+
exit 1
121+
108122
- name: Wait for Redis
109123
run: |
110124
for i in $(seq 1 15); do
@@ -119,6 +133,10 @@ jobs:
119133
exit 1
120134
121135
- name: Run all tests with coverage
136+
env:
137+
DATABASE_DSN: mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs
138+
DATABASE_DSN_MYSQL: mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs
139+
DATABASE_DSN_POSTGRESQL: postgresql+asyncpg://cqrs:cqrs@localhost:5433/cqrs
122140
run: |
123141
pytest -c ./tests/pytest-config.ini --cov=src --cov-report=xml --cov-report=term -o cache_dir=/tmp/pytest_cache ./tests/unit ./tests/integration
124142

docker-compose-dev.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ services:
1515
command: --init-file /data/application/init.sql
1616
volumes:
1717
- ./tests/init_database.sql:/data/application/init.sql
18+
postgres_dev:
19+
image: postgres:latest
20+
hostname: postgres-dev
21+
restart: always
22+
environment:
23+
POSTGRES_USER: cqrs
24+
POSTGRES_PASSWORD: cqrs
25+
POSTGRES_DB: cqrs
26+
ports:
27+
- "5433:5432"
1828
kafka0:
1929
image: confluentinc/cp-kafka:7.2.1
2030
hostname: kafka0

docker-compose-test.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ services:
1515
command: --init-file /data/application/init.sql
1616
volumes:
1717
- ./tests/init_database.sql:/data/application/init.sql
18+
postgres_tests:
19+
image: postgres:latest
20+
hostname: postgres-test
21+
restart: always
22+
environment:
23+
POSTGRES_USER: cqrs
24+
POSTGRES_PASSWORD: cqrs
25+
POSTGRES_DB: cqrs
26+
ports:
27+
- "5433:5432"
1828
redis_tests:
1929
image: redis:7.2
2030
hostname: redis

examples/saga_recovery_scheduler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ async def recovery_loop(
561561
except asyncio.CancelledError:
562562
logger.info("Recovery loop cancelled.")
563563
raise
564-
except Exception as e:
564+
except Exception:
565565
logger.exception(f"Recovery iteration failed: {traceback.format_exc()}")
566566

567567
if max_iterations is not None and iteration >= max_iterations:
@@ -618,7 +618,7 @@ async def create_interrupted_saga(storage: MemorySagaStorage) -> uuid.UUID:
618618
async def main() -> None:
619619
"""
620620
Run the saga recovery scheduler demo and display its outcome.
621-
621+
622622
Sets up an in-memory saga storage, creates a simulated interrupted saga and marks it stale, runs the recovery loop for three iterations (using the module's recovery_loop and recovery configuration constants), then loads and prints the final saga state.
623623
"""
624624
print("\n" + "=" * 70)
@@ -660,4 +660,4 @@ async def main() -> None:
660660

661661

662662
if __name__ == "__main__":
663-
asyncio.run(main())
663+
asyncio.run(main())

examples/saga_sqlalchemy_storage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,4 +207,4 @@ def saga_mapper(mapper: cqrs.SagaMap) -> None:
207207

208208

209209
if __name__ == "__main__":
210-
asyncio.run(main())
210+
asyncio.run(main())

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ dev = [
5353
"pytest-env==0.6.2",
5454
"cryptography==42.0.2",
5555
"asyncmy==0.2.9",
56+
"asyncpg>=0.29.0",
5657
"redis>=5.0.0",
5758
# Circuit breaker for tests
5859
"aiobreaker>=0.3.0" # from aiobreaker

src/cqrs/saga/compensation.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(
2727
) -> None:
2828
"""
2929
Create a SagaCompensator configured to perform compensation of completed saga steps with retry and optional post-step callback.
30-
30+
3131
Parameters:
3232
saga_id: Identifier of the saga.
3333
context: Saga execution context passed to step compensation handlers.
@@ -51,12 +51,12 @@ async def compensate_steps(
5151
) -> None:
5252
"""
5353
Compensates completed saga steps in reverse order, applying retry logic and recording step statuses.
54-
54+
5555
Compensates each handler from last to first, skipping steps already recorded as compensated in the saga history. Updates the saga status to COMPENSATING at the start and logs per-step statuses (STARTED, COMPLETED, FAILED) in storage. After a step completes, the optional on_after_compensate_step callback (if provided) is awaited. If any step fails after all retry attempts, the saga is marked as FAILED. If no completed steps are provided, no compensation is attempted and the saga is marked as FAILED.
56-
56+
5757
Parameters:
5858
completed_steps (list[SagaStepHandler[ContextT, typing.Any]]): Handlers corresponding to steps that completed during the saga; these will be compensated in reverse order.
59-
59+
6060
Returns:
6161
None
6262
"""
@@ -184,4 +184,4 @@ async def _compensate_step_with_retry(
184184

185185
# If we get here, all retries failed
186186
if last_exception:
187-
raise last_exception
187+
raise last_exception

src/cqrs/saga/saga.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,10 @@ async def __aiter__(
157157
) -> typing.AsyncIterator[SagaStepResult[ContextT, typing.Any]]:
158158
"""
159159
Execute saga steps sequentially and yield each step result.
160-
160+
161161
Implements the Strict Backward Recovery strategy: if the saga is in COMPENSATING or FAILED status, forward execution is never resumed. When the underlying storage provides create_run(), execution is performed within a per-saga run with checkpoint commits; otherwise the legacy run-less path is used.
162162
Returns:
163-
AsyncIterator[SagaStepResult[ContextT, typing.Any]]: An async iterator that yields the result for each executed saga step in order.
163+
AsyncIterator[SagaStepResult[ContextT, typing.Any]]: An async iterator that yields the result for each executed saga step in order.
164164
"""
165165
try:
166166
run_cm = self._storage.create_run()
@@ -226,13 +226,13 @@ async def _execute(
226226
) -> typing.AsyncIterator[SagaStepResult[ContextT, typing.Any]]:
227227
"""
228228
Execute the saga's configured steps, using the provided storage run for checkpointed operations when available, and perform recovery and compensation as required.
229-
229+
230230
Parameters:
231231
run (SagaStorageRun | None): Optional per-saga storage run. When provided, the run is used for loading saga state, creating run-scoped managers/executors, and committing at checkpoint boundaries. When None, the transaction's internal managers and executors are used.
232-
232+
233233
Returns:
234234
Async iterator that yields SagaStepResult values for each step that completes; each yielded result will include the transaction's saga_id.
235-
235+
236236
Raises:
237237
RuntimeError: If the saga was recovered in COMPENSATING or FAILED state and compensation was completed, forward execution is not allowed.
238238
"""
@@ -258,6 +258,8 @@ async def _execute(
258258
self._saga.__class__.__name__,
259259
self._context,
260260
)
261+
if run is not None:
262+
await run.commit()
261263
await state_manager.update_status(SagaStatus.RUNNING)
262264
if run is not None:
263265
await run.commit()
@@ -308,10 +310,14 @@ async def _execute(
308310

309311
completed_step_names = await recovery_manager.load_completed_step_names()
310312
except ValueError:
313+
if run is not None:
314+
await run.rollback()
311315
await state_manager.create_saga(
312316
self._saga.__class__.__name__,
313317
self._context,
314318
)
319+
if run is not None:
320+
await run.commit()
315321
await state_manager.update_status(SagaStatus.RUNNING)
316322
if run is not None:
317323
await run.commit()
@@ -390,7 +396,7 @@ async def _execute(
390396
async def _compensate(self) -> None:
391397
"""
392398
Mark the transaction as compensated and run compensation for all completed steps in reverse order.
393-
399+
394400
Sets an internal flag to prevent repeated compensation and delegates to the compensator which applies the configured retry behavior.
395401
"""
396402
# Prevent double compensation
@@ -519,4 +525,4 @@ def transaction(
519525
compensation_retry_count=compensation_retry_count,
520526
compensation_retry_delay=compensation_retry_delay,
521527
compensation_retry_backoff=compensation_retry_backoff,
522-
)
528+
)

src/cqrs/saga/storage/protocol.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async def create_saga(
2222
) -> None:
2323
"""
2424
Create a new saga execution record with initial PENDING status and version 1.
25-
25+
2626
Parameters:
2727
saga_id (uuid.UUID): Unique identifier for the saga (primary key).
2828
name (str): Human-friendly name used for diagnostics and filtering.
@@ -37,13 +37,13 @@ async def update_context(
3737
) -> None:
3838
"""
3939
Persist a snapshot of the saga's execution context, optionally using optimistic locking.
40-
40+
4141
Parameters:
4242
saga_id (uuid.UUID): Identifier of the saga to update.
4343
context (dict[str, Any]): JSON-serializable context object to store as the new snapshot.
4444
current_version (int | None): If provided, perform an optimistic-locking update that succeeds only
4545
if the stored version matches this value; on success the stored version is incremented.
46-
46+
4747
Raises:
4848
SagaConcurrencyError: If `current_version` is provided and does not match the stored version.
4949
"""
@@ -55,11 +55,11 @@ async def update_status(
5555
) -> None:
5656
"""
5757
Set the global status for the saga identified by `saga_id`.
58-
58+
5959
Parameters:
6060
saga_id (uuid.UUID): Identifier of the saga to update.
6161
status (SagaStatus): New global status to persist (for example RUNNING, COMPLETED, COMPENSATING).
62-
62+
6363
Notes:
6464
This operation does not commit the storage session; the caller must call `commit()` on the active run or session to persist the change.
6565
"""
@@ -74,7 +74,7 @@ async def log_step(
7474
) -> None:
7575
"""
7676
Append a step transition to the saga's execution log.
77-
77+
7878
Parameters:
7979
saga_id (uuid.UUID): Identifier of the saga whose log will be appended.
8080
step_name (str): Logical name of the step (used for diagnostics and replay).
@@ -91,28 +91,30 @@ async def load_saga_state(
9191
) -> tuple[SagaStatus, dict[str, typing.Any], int]:
9292
"""
9393
Load the current saga execution state.
94-
94+
9595
Parameters:
9696
saga_id (uuid.UUID): Identifier of the saga to load.
9797
read_for_update (bool): If True, acquire a database lock for update to prevent concurrent modifications.
98-
98+
9999
Returns:
100100
tuple[SagaStatus, dict[str, Any], int]: A tuple containing the saga's global status, the latest persisted context (JSON-serializable), and the current optimistic-locking version number.
101101
"""
102+
...
102103

103104
async def get_step_history(
104105
self,
105106
saga_id: uuid.UUID,
106107
) -> list[SagaLogEntry]:
107108
"""
108109
Retrieve the chronological step log for a saga.
109-
110+
110111
Parameters:
111112
saga_id (uuid.UUID): Identifier of the saga whose step history to retrieve.
112-
113+
113114
Returns:
114115
list[SagaLogEntry]: Ordered list of step log entries for the saga, from oldest to newest.
115116
"""
117+
...
116118

117119
async def commit(self) -> None:
118120
"""
@@ -343,13 +345,13 @@ def create_run(
343345
) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]:
344346
"""
345347
Create a scoped async run context for a single saga execution session with checkpointed commits.
346-
348+
347349
The context manager yields a SagaStorageRun that provides the same mutation/read methods as the storage but does not commit automatically; the caller must call commit() or rollback() at desired checkpoints.
348-
350+
349351
Returns:
350352
contextlib.AbstractAsyncContextManager[SagaStorageRun]: Async context manager yielding a SagaStorageRun session.
351-
353+
352354
Raises:
353355
NotImplementedError: If the storage backend does not support scoped runs.
354356
"""
355-
raise NotImplementedError("This storage does not support create_run()")
357+
raise NotImplementedError("This storage does not support create_run()")

0 commit comments

Comments
 (0)