Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 10 minutes and 22 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughLowered minimum Python support to 3.9, expanded CI test matrix to cover 3.9–3.14 (+3.15-dev), standardized type hints by replacing PEP 604 unions with Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Merging this PR will not alter performance
Comparing Footnotes
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/cqrs/adapters/kafka.py (1)
48-53:⚠️ Potential issue | 🟠 MajorGuard invalid retry settings to prevent silent message drops.
If
retry_countis0(or negative), Line 70 performs no attempts andproduce()returns without sending. Also, negativeretry_delayfails only during retry sleep. Validate both in__init__.💡 Proposed fix
def __init__( self, producer: aiokafka.AIOKafkaProducer, retry_count: int = 3, retry_delay: int = 1, ): self._producer = producer + if retry_count < 1: + raise ValueError("retry_count must be >= 1") + if retry_delay < 0: + raise ValueError("retry_delay must be >= 0") self._retry_count = retry_count self._retry_delay = retry_delay🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/adapters/kafka.py` around lines 48 - 53, The constructor for the Kafka adapter accepts retry_count and retry_delay but does not validate them, allowing retry_count <= 0 which causes produce() to skip attempts and retry_delay < 0 which breaks during sleep; in __init__ of the class that sets self._producer, self._retry_count, self._retry_delay validate inputs and raise ValueError for invalid values (e.g., require retry_count >= 1 and retry_delay >= 0), and update any constructor docstring/comments to reflect these constraints so produce() never silently drops messages due to bad settings.src/cqrs/requests/mermaid.py (1)
196-207:⚠️ Potential issue | 🟡 MinorRemove the stray space in the rendered type name.
These literals now emit
typing.Optional[Response ]verbatim, so every generated class diagram shows a malformed type. Drop the space before]to avoid baking a typo into the docs output.Small fix
- lines.append(" +handle(request) typing.Optional[Response ]") - lines.append(" +next(request) typing.Optional[Response ]") + lines.append(" +handle(request) typing.Optional[Response]") + lines.append(" +next(request) typing.Optional[Response]") @@ - lines.append(" +handle(request) typing.Optional[Response ]") - lines.append(" +next(request) typing.Optional[Response ]") + lines.append(" +handle(request) typing.Optional[Response]") + lines.append(" +next(request) typing.Optional[Response]")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/requests/mermaid.py` around lines 196 - 207, The generated Mermaid output contains a stray space before the closing bracket in the optional response type; update the string literals used to render method signatures so they use "typing.Optional[Response]" (no space) instead of "typing.Optional[Response ]". Specifically, change the lines.append calls that add method signatures for the CORRequestHandler class and the per-handler classes (the calls that currently append "+handle(request) typing.Optional[Response ]" and "+next(request) typing.Optional[Response ]") to remove the extra space; also check any other similar literals built from handler_info/handler_name and fix them consistently.src/cqrs/dispatcher/request.py (1)
83-123:⚠️ Potential issue | 🟠 MajorReject streaming fallbacks before this cast.
RequestHandlerFallbackstill admitsStreamingRequestHandler, but this path casts both.handlemethods toHandleType(a coroutine callable) without validation and laterawaits them. Streaming handlers return async generators, not coroutines, so this will fail at runtime. Please add type guards before the casts to reject streaming handlers in this dispatcher path:Suggested fix
-from cqrs.requests.request_handler import RequestHandler +from cqrs.requests.request_handler import RequestHandler, StreamingRequestHandler @@ primary = await self._container.resolve(fallback_config.primary) + if isinstance(primary, StreamingRequestHandler): + raise RequestHandlerTypeError( + "RequestDispatcher does not support StreamingRequestHandler fallbacks", + ) try: wrapped_primary = self._middleware_chain.wrap( typing.cast(HandleType, primary.handle), ) @@ fallback_handler = await self._container.resolve(fallback_config.fallback) + if isinstance(fallback_handler, StreamingRequestHandler): + raise RequestHandlerTypeError( + "RequestDispatcher does not support StreamingRequestHandler fallbacks", + ) wrapped_fallback = self._middleware_chain.wrap( typing.cast(HandleType, fallback_handler.handle), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/dispatcher/request.py` around lines 83 - 123, The dispatcher currently casts primary and fallback handlers' .handle to HandleType and awaits them without checking for streaming handlers; add type guards to reject instances of StreamingRequestHandler before casting/ wrapping. Specifically, in the request dispatch path where wrapped_primary is created (using self._middleware_chain.wrap and typing.cast(HandleType, primary.handle)) validate that primary is not a StreamingRequestHandler and raise a clear error if it is; likewise, after resolving fallback_handler via self._container.resolve(fallback_config.fallback) validate fallback_handler is not a StreamingRequestHandler before casting to HandleType and wrapping into wrapped_fallback. Ensure the error message references RequestHandlerFallback/StreamingRequestHandler and prevents async-generator handlers from being awaited.
🧹 Nitpick comments (5)
.pre-commit-config.yaml (1)
56-56: Avoid hard-coding./venv/bin/pythonin hook entry commands.Lines 56 and 63 use a hard-coded virtualenv path that breaks across different developer setups (standard Python, Poetry, uv, etc.) and CI environments. For
language: systemhooks, usepython -m pytestto resolve from the active environment's PATH.Proposed diff
- entry: env PYTHONPATH=src ./venv/bin/python -m pytest -c ./tests/pytest-config.ini ./tests/unit + entry: env PYTHONPATH=src python -m pytest -c ./tests/pytest-config.ini ./tests/unitAlso applies to line 63.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.pre-commit-config.yaml at line 56, Replace hard-coded virtualenv paths in the pre-commit hook entry commands with invocation that resolves to the active environment; locate the pytest hook entries where the entry string sets PYTHONPATH and calls a specific venv binary (the entry property in .pre-commit-config.yaml used for running pytest) and change them to use the system-resolved Python (e.g., use "python -m pytest" while preserving PYTHONPATH and pytest flags) so the hook works across different developer environments and CI.src/cqrs/adapters/kafka.py (1)
70-77: Add focused tests for retry loop behavior.This retry path is not effectively covered by current tests because the Kafka producer is mocked at a higher level (
tests/conftest.py, Line 24 andtests/integration/test_kafka_producer.py, Line 70). Please add unit tests that assert retry count, backoff, and final re-raise on last attempt.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/adapters/kafka.py` around lines 70 - 77, Add focused unit tests for the retry loop around await self._produce by patching/monkeypatching the adapter's _produce to raise a _RETRYABLE_KAFKA_EXCEPTIONS subclass for the first N-1 calls and then succeed (to assert it retried the expected number of times and called asyncio.sleep with self._retry_delay each retry), and a separate test where _produce raises on every attempt to assert the final exception is re-raised after self._retry_count attempts; spy on asyncio.sleep to verify backoff calls and inspect call count to verify attempt counting, referencing the adapter's _produce method and the attributes _retry_count and _retry_delay.src/cqrs/deserializers/exceptions.py (1)
18-18: Simplify the nested Union type annotation.The annotation
typing.Union[str, typing.Optional[bytes]]contains redundant nesting.typing.Optional[bytes]expands totyping.Union[bytes, None], making the full typetyping.Union[str, bytes, None].Consider simplifying to one of these equivalent forms:
typing.Union[str, bytes, None](preferred for clarity)typing.Optional[typing.Union[str, bytes]]Additionally, based on the relevant code snippet showing that
message_datais only assigned whendatais notNone(line 96-97 early return), the actual runtime type should betyping.Union[str, bytes](non-nullable). However, this may be a pre-existing type accuracy issue beyond the scope of this PR.♻️ Proposed simplification
- message_data: typing.Union[str, typing.Optional[bytes]] + message_data: typing.Union[str, bytes, None]Or if you want to reflect the actual runtime behavior (non-nullable):
- message_data: typing.Union[str, typing.Optional[bytes]] + message_data: typing.Union[str, bytes]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/deserializers/exceptions.py` at line 18, The type annotation for message_data uses a redundant nested Union: replace typing.Union[str, typing.Optional[bytes]] in the exceptions module with a simplified type such as typing.Union[str, bytes, None] (or typing.Optional[typing.Union[str, bytes]] if you prefer) to remove the unnecessary nesting; note that message_data at runtime may be non-nullable, but updating the annotation to one of the simplified forms on the message_data declaration (in src/cqrs/deserializers/exceptions.py) is sufficient for this PR.src/cqrs/deserializers/json.py (1)
83-86: Consider using canonical Union forms for clarity.The nested
typing.Union[str, typing.Optional[bytes]]is non-canonical and harder to read. While type checkers normalize these forms, the intent is clearer with flat unions:✨ Suggested improvement
def __call__( self, - data: typing.Union[str, typing.Optional[bytes]], - ) -> typing.Union[typing.Optional[_T], DeserializeJsonError]: + data: typing.Optional[typing.Union[str, bytes]], + ) -> typing.Union[_T, DeserializeJsonError, None]:Note: Both forms are semantically equivalent (
Union[str, Optional[bytes]]normalizes toUnion[str, bytes, None]), but the suggested form more clearly expresses the intent: "accepts str, bytes, or None".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cqrs/deserializers/json.py` around lines 83 - 86, The __call__ method's parameter type uses a nested non-canonical union `typing.Union[str, typing.Optional[bytes]]`; change it to a flat, clearer union such as `typing.Union[str, bytes, None]` (or equivalently `typing.Optional[typing.Union[str, bytes]]`) in the __call__ signature in src/cqrs/deserializers/json.py so the parameter explicitly accepts str, bytes, or None and is easier to read for maintainers and type checkers..github/workflows/tests.yml (1)
96-104: Consider narrowing the docker-backed test matrix.This now boots MySQL, PostgreSQL, and Redis for seven Python versions on every PR. If CI time starts to hurt, a common compromise is unit/lint on all interpreters and the full integration suite on one or two representative versions.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.github/workflows/tests.yml around lines 96 - 104, The test matrix currently boots databases for every Python version; change it to run lightweight jobs (unit/lint) across all python-version entries and only run the docker-backed integration matrix for a small subset (e.g., python-version: "3.11" and "3.15-dev"). Implement this by adding a TEST_SCOPE matrix axis or separate jobs: keep the existing python-version values for the unit/lint job (strategy.matrix.include entries unchanged) and create a second integration job that uses a reduced strategy.matrix.include with only the chosen versions (reference the matrix keys and the docker-backed integration steps that currently rely on MySQL/Postgres/Redis); add a conditional (if: matrix.TEST_SCOPE == 'integration' or job-level if) so the integration job boots containers only for those selected versions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cqrs/mediator.py`:
- Line 192: The stream return type is narrowed to AsyncIterator[IResponse] but
handlers can yield None; update the signatures of mediator.stream and
mediator._stream_impl to return typing.AsyncIterator[typing.Optional[IResponse]]
(and any related type vars or annotations) so the async iterator reflects
optional responses; ensure any internal uses/annotations (e.g., where
StreamHandler.handle, ResT, and RequestDispatchResult interact) are adjusted to
accept Optional[IResponse] to keep types consistent.
In `@tests/integration/fixtures.py`:
- Around line 41-45: The fixture creates an engine via engine_factory() and only
closes db_session, leaking the engine; update the session fixture to capture the
engine (e.g., engine = engine_factory()), create the async_sessionmaker from
that engine and yield db_session, then in the finally block await
db_session.close() and also await engine.dispose() so the engine and its
connection pool are properly disposed; reference the async_sessionmaker,
engine_factory, and db_session symbols when making the change.
---
Outside diff comments:
In `@src/cqrs/adapters/kafka.py`:
- Around line 48-53: The constructor for the Kafka adapter accepts retry_count
and retry_delay but does not validate them, allowing retry_count <= 0 which
causes produce() to skip attempts and retry_delay < 0 which breaks during sleep;
in __init__ of the class that sets self._producer, self._retry_count,
self._retry_delay validate inputs and raise ValueError for invalid values (e.g.,
require retry_count >= 1 and retry_delay >= 0), and update any constructor
docstring/comments to reflect these constraints so produce() never silently
drops messages due to bad settings.
In `@src/cqrs/dispatcher/request.py`:
- Around line 83-123: The dispatcher currently casts primary and fallback
handlers' .handle to HandleType and awaits them without checking for streaming
handlers; add type guards to reject instances of StreamingRequestHandler before
casting/ wrapping. Specifically, in the request dispatch path where
wrapped_primary is created (using self._middleware_chain.wrap and
typing.cast(HandleType, primary.handle)) validate that primary is not a
StreamingRequestHandler and raise a clear error if it is; likewise, after
resolving fallback_handler via self._container.resolve(fallback_config.fallback)
validate fallback_handler is not a StreamingRequestHandler before casting to
HandleType and wrapping into wrapped_fallback. Ensure the error message
references RequestHandlerFallback/StreamingRequestHandler and prevents
async-generator handlers from being awaited.
In `@src/cqrs/requests/mermaid.py`:
- Around line 196-207: The generated Mermaid output contains a stray space
before the closing bracket in the optional response type; update the string
literals used to render method signatures so they use
"typing.Optional[Response]" (no space) instead of "typing.Optional[Response ]".
Specifically, change the lines.append calls that add method signatures for the
CORRequestHandler class and the per-handler classes (the calls that currently
append "+handle(request) typing.Optional[Response ]" and "+next(request)
typing.Optional[Response ]") to remove the extra space; also check any other
similar literals built from handler_info/handler_name and fix them consistently.
---
Nitpick comments:
In @.github/workflows/tests.yml:
- Around line 96-104: The test matrix currently boots databases for every Python
version; change it to run lightweight jobs (unit/lint) across all python-version
entries and only run the docker-backed integration matrix for a small subset
(e.g., python-version: "3.11" and "3.15-dev"). Implement this by adding a
TEST_SCOPE matrix axis or separate jobs: keep the existing python-version values
for the unit/lint job (strategy.matrix.include entries unchanged) and create a
second integration job that uses a reduced strategy.matrix.include with only the
chosen versions (reference the matrix keys and the docker-backed integration
steps that currently rely on MySQL/Postgres/Redis); add a conditional (if:
matrix.TEST_SCOPE == 'integration' or job-level if) so the integration job boots
containers only for those selected versions.
In @.pre-commit-config.yaml:
- Line 56: Replace hard-coded virtualenv paths in the pre-commit hook entry
commands with invocation that resolves to the active environment; locate the
pytest hook entries where the entry string sets PYTHONPATH and calls a specific
venv binary (the entry property in .pre-commit-config.yaml used for running
pytest) and change them to use the system-resolved Python (e.g., use "python -m
pytest" while preserving PYTHONPATH and pytest flags) so the hook works across
different developer environments and CI.
In `@src/cqrs/adapters/kafka.py`:
- Around line 70-77: Add focused unit tests for the retry loop around await
self._produce by patching/monkeypatching the adapter's _produce to raise a
_RETRYABLE_KAFKA_EXCEPTIONS subclass for the first N-1 calls and then succeed
(to assert it retried the expected number of times and called asyncio.sleep with
self._retry_delay each retry), and a separate test where _produce raises on
every attempt to assert the final exception is re-raised after self._retry_count
attempts; spy on asyncio.sleep to verify backoff calls and inspect call count to
verify attempt counting, referencing the adapter's _produce method and the
attributes _retry_count and _retry_delay.
In `@src/cqrs/deserializers/exceptions.py`:
- Line 18: The type annotation for message_data uses a redundant nested Union:
replace typing.Union[str, typing.Optional[bytes]] in the exceptions module with
a simplified type such as typing.Union[str, bytes, None] (or
typing.Optional[typing.Union[str, bytes]] if you prefer) to remove the
unnecessary nesting; note that message_data at runtime may be non-nullable, but
updating the annotation to one of the simplified forms on the message_data
declaration (in src/cqrs/deserializers/exceptions.py) is sufficient for this PR.
In `@src/cqrs/deserializers/json.py`:
- Around line 83-86: The __call__ method's parameter type uses a nested
non-canonical union `typing.Union[str, typing.Optional[bytes]]`; change it to a
flat, clearer union such as `typing.Union[str, bytes, None]` (or equivalently
`typing.Optional[typing.Union[str, bytes]]`) in the __call__ signature in
src/cqrs/deserializers/json.py so the parameter explicitly accepts str, bytes,
or None and is easier to read for maintainers and type checkers.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 63472567-7cc1-47ea-bcd1-de3086ac140a
📒 Files selected for processing (82)
.github/workflows/python-publish.yml.github/workflows/tests.yml.pre-commit-config.yamlREADME.mddocker-compose-dev.ymlexamples/cor_mermaid.pyexamples/cor_request_fallback.pyexamples/cor_request_handler.pyexamples/kafka_event_consuming.pyexamples/saga.pyexamples/saga_fallback.pyexamples/saga_fastapi_sse.pyexamples/saga_mermaid.pyexamples/saga_recovery.pyexamples/saga_recovery_scheduler.pypyproject.tomlpyrightconfig.jsonruff.tomlsrc/cqrs/adapters/circuit_breaker.pysrc/cqrs/adapters/kafka.pysrc/cqrs/circuit_breaker.pysrc/cqrs/deserializers/exceptions.pysrc/cqrs/deserializers/json.pysrc/cqrs/dispatcher/event.pysrc/cqrs/dispatcher/models.pysrc/cqrs/dispatcher/request.pysrc/cqrs/dispatcher/saga.pysrc/cqrs/dispatcher/streaming.pysrc/cqrs/events/bootstrap.pysrc/cqrs/events/event_emitter.pysrc/cqrs/events/event_processor.pysrc/cqrs/events/fallback.pysrc/cqrs/events/map.pysrc/cqrs/generic_utils.pysrc/cqrs/mediator.pysrc/cqrs/middlewares/base.pysrc/cqrs/middlewares/logging.pysrc/cqrs/outbox/map.pysrc/cqrs/outbox/mock.pysrc/cqrs/outbox/repository.pysrc/cqrs/outbox/sqlalchemy.pysrc/cqrs/producer.pysrc/cqrs/requests/bootstrap.pysrc/cqrs/requests/cor_request_handler.pysrc/cqrs/requests/fallback.pysrc/cqrs/requests/map.pysrc/cqrs/requests/mermaid.pysrc/cqrs/requests/request.pysrc/cqrs/saga/bootstrap.pysrc/cqrs/saga/compensation.pysrc/cqrs/saga/execution.pysrc/cqrs/saga/fallback.pysrc/cqrs/saga/mermaid.pysrc/cqrs/saga/saga.pysrc/cqrs/saga/step.pysrc/cqrs/saga/storage/memory.pysrc/cqrs/saga/storage/models.pysrc/cqrs/saga/storage/protocol.pysrc/cqrs/saga/storage/sqlalchemy.pysrc/cqrs/saga/validation.pytests/benchmarks/conftest.pytests/benchmarks/dataclasses/test_benchmark_cor_request_handler.pytests/benchmarks/dataclasses/test_benchmark_event_handler_chain.pytests/benchmarks/default/test_benchmark_cor_request_handler.pytests/benchmarks/default/test_benchmark_event_handler_chain.pytests/integration/fixtures.pytests/integration/test_decompression.pytests/integration/test_event_outbox.pytests/integration/test_kafka_producer.pytests/integration/test_saga_mediator_memory.pytests/integration/test_streaming_mediator.pytests/unit/test_cor_mermaid.pytests/unit/test_cor_request_handler.pytests/unit/test_dispatcher.pytests/unit/test_event_processor.pytests/unit/test_multi_level_events_parameterized.pytests/unit/test_request_response.pytests/unit/test_saga/conftest.pytests/unit/test_saga/test_fallback.pytests/unit/test_saga/test_saga_storage_run.pytests/unit/test_saga/test_saga_to_mermaid.pytests/unit/test_streaming_outbox_background_processing.py
💤 Files with no reviewable changes (1)
- docker-compose-dev.yml
| self, | ||
| request: IRequest, | ||
| ) -> typing.AsyncIterator[IResponse | None]: | ||
| ) -> typing.AsyncIterator[IResponse]: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) ResT allows Optional[IResponse]
rg -n -C2 'ResT = typing.TypeVar\("ResT".*Optional\[IResponse\]' src/cqrs/requests/request.py
# 2) Streaming handlers yield AsyncIterator[ResT]
rg -n -C3 'def handle\(self, request: ReqT\) -> typing.AsyncIterator\[ResT\]' src/cqrs/requests/request_handler.py
# 3) Dispatcher forwards yielded response directly into RequestDispatchResult
rg -n -C3 'yield RequestDispatchResult\(response=response, events=events\)' src/cqrs/dispatcher/streaming.py
# 4) Mediator currently advertises non-optional stream response
rg -n -C3 'def stream\(|def _stream_impl\(|AsyncIterator\[IResponse\]' src/cqrs/mediator.pyRepository: pypatterns/python-cqrs
Length of output: 1914
Return type was narrowed unsafely for streaming responses.
The stream method at line 192 and _stream_impl method at line 210 declare typing.AsyncIterator[IResponse], but the underlying type chain allows optional responses:
ResTis bound totyping.Optional[IResponse](request.py:57)StreamHandler.handle()yieldstyping.AsyncIterator[ResT](request_handler.py:92)Dispatcherpasses responses directly through toRequestDispatchResult(streaming.py:64)
This creates a false non-null contract—callers expect non-null responses but handlers can yield None.
💡 Proposed fix
Change both occurrences to:
- ) -> typing.AsyncIterator[IResponse]:
+ ) -> typing.AsyncIterator[typing.Optional[IResponse]]:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ) -> typing.AsyncIterator[IResponse]: | |
| ) -> typing.AsyncIterator[typing.Optional[IResponse]]: |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/cqrs/mediator.py` at line 192, The stream return type is narrowed to
AsyncIterator[IResponse] but handlers can yield None; update the signatures of
mediator.stream and mediator._stream_impl to return
typing.AsyncIterator[typing.Optional[IResponse]] (and any related type vars or
annotations) so the async iterator reflects optional responses; ensure any
internal uses/annotations (e.g., where StreamHandler.handle, ResT, and
RequestDispatchResult interact) are adjusted to accept Optional[IResponse] to
keep types consistent.
| db_session = async_sessionmaker(engine_factory())() | ||
| try: | ||
| yield db_session | ||
| finally: | ||
| await db_session.close() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 'async def session|db_session = async_sessionmaker|await db_session.close|engine.dispose' tests/integration/fixtures.pyRepository: pypatterns/python-cqrs
Length of output: 982
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if session fixture is used elsewhere and if there are other session-related fixtures
echo "=== Usage of 'session' fixture in tests ==="
rg -n "session" tests/integration/fixtures.py | head -20
echo "=== Check for other sessionmaker patterns ==="
rg -n "sessionmaker|Session" tests/integration/ | grep -v "db_session\|session()" | head -15Repository: pypatterns/python-cqrs
Length of output: 2655
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check tests that use the 'session' fixture directly (not the saga_session_factory ones)
echo "=== Tests using 'session' fixture ==="
rg -l "def.*session" tests/integration/test_*.py | while read file; do
if grep -q "def.*session[^_]" "$file"; then
echo "File: $file"
grep -n "session:" "$file" | head -5
fi
done
# Also check fixture dependencies
echo ""
echo "=== Fixture definitions for session ==="
rg -B2 -A8 "^async def session\(" tests/integration/fixtures.pyRepository: pypatterns/python-cqrs
Length of output: 362
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if 'session' fixture is actually imported/used in any test file
echo "=== Grep for 'session' parameter in test functions ==="
rg -n "def test.*session[,)]" tests/integration/
echo ""
echo "=== Check all fixtures in fixtures.py with scope and names ==="
rg -B1 "@pytest.fixture" tests/integration/fixtures.py | grep -E "fixture|async def|def"Repository: pypatterns/python-cqrs
Length of output: 1639
🏁 Script executed:
#!/bin/bash
set -euo pipefixo pipefail
# Verify the exact structure of the session fixture and surrounding code
echo "=== Full session fixture context ==="
sed -n '32,50p' tests/integration/fixtures.py
echo ""
echo "=== Verify engine_factory is a partial function ==="
sed -n '36,40p' tests/integration/fixtures.pyRepository: pypatterns/python-cqrs
Length of output: 119
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify the exact structure of the session fixture and surrounding code
echo "=== Full session fixture context ==="
sed -n '32,50p' tests/integration/fixtures.py
echo ""
echo "=== Verify engine_factory is a partial function ==="
sed -n '36,40p' tests/integration/fixtures.pyRepository: pypatterns/python-cqrs
Length of output: 726
Dispose the async engine in fixture teardown to avoid connection leakage.
The session fixture creates an engine instance on line 41, but teardown (line 45) only closes db_session. With scope="function", each test spawns a new engine that remains undisposed. Over many test runs, this leaves connection pool resources open longer than intended. Other fixtures in the same file properly call await engine.dispose() for comparison.
Proposed diff
- db_session = async_sessionmaker(engine_factory())()
+ engine = engine_factory()
+ db_session = async_sessionmaker(engine)()
try:
yield db_session
finally:
- await db_session.close()
+ try:
+ await db_session.close()
+ finally:
+ await engine.dispose()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| db_session = async_sessionmaker(engine_factory())() | |
| try: | |
| yield db_session | |
| finally: | |
| await db_session.close() | |
| engine = engine_factory() | |
| db_session = async_sessionmaker(engine)() | |
| try: | |
| yield db_session | |
| finally: | |
| try: | |
| await db_session.close() | |
| finally: | |
| await engine.dispose() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/integration/fixtures.py` around lines 41 - 45, The fixture creates an
engine via engine_factory() and only closes db_session, leaking the engine;
update the session fixture to capture the engine (e.g., engine =
engine_factory()), create the async_sessionmaker from that engine and yield
db_session, then in the finally block await db_session.close() and also await
engine.dispose() so the engine and its connection pool are properly disposed;
reference the async_sessionmaker, engine_factory, and db_session symbols when
making the change.
| self.events: typing.List[IEvent] = [] | ||
|
|
||
| async def handle(self, request: LoginCommand) -> None | None: | ||
| async def handle(self, request: LoginCommand) -> typing.Optional[None ]: |
There was a problem hiding this comment.
галлюны LLM, я думаю)
называется недоверие к самому себе ( в целом подходит к определению шизы))) )
There was a problem hiding this comment.
А есть ли вообще смысл в typing.Optional, раз только None?
|
Approve |
2 similar comments
|
Approve |
|
Approve |

Summary by CodeRabbit
New Features
Bug Fixes
Chores