diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index e34785b..18116e0 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -13,12 +13,12 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ "3.10", "3.11", "3.12" ] + python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13", "3.14" ] steps: - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies @@ -28,6 +28,6 @@ jobs: - name: Build package run: python -m build - name: Publish package - if: success() && github.event_name == 'release' && matrix.python-version == '3.12' + if: success() && github.event_name == 'release' && matrix.python-version == '3.14' run: | twine upload dist/* --username __token__ --password ${{ secrets.PYPI_API_TOKEN }} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3f69bfc..0d6a3e5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,7 +12,19 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10", "3.11", "3.12"] + include: + - python-version: "3.9" + pyright-version: "3.9" + - python-version: "3.10" + pyright-version: "3.10" + - python-version: "3.11" + pyright-version: "3.11" + - python-version: "3.12" + pyright-version: "3.12" + - python-version: "3.13" + pyright-version: "3.13" + - python-version: "3.14" + pyright-version: "3.14" steps: - uses: actions/checkout@v4 @@ -62,11 +74,11 @@ jobs: - name: Run pyright run: | - pyright --pythonversion ${{ matrix.python-version }} src tests examples + pyright --pythonversion ${{ matrix.pyright-version }} src - name: Check minimum Python version (vermin) run: | - vermin --target=3.10- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests + vermin --target=3.9- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests test: name: test (py ${{ matrix.python-version }}) @@ -74,7 +86,13 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10", "3.11", "3.12"] + include: + - python-version: "3.9" + - python-version: "3.10" + - python-version: "3.11" + - python-version: "3.12" + - python-version: "3.13" + - python-version: "3.14" steps: - uses: actions/checkout@v4 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 67e2f15..4039593 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -53,14 +53,14 @@ repos: hooks: - id: pytest-unit name: unit tests - entry: pytest -c ./tests/pytest-config.ini ./tests/unit + entry: env PYTHONPATH=src ./venv/bin/python -m pytest -c ./tests/pytest-config.ini ./tests/unit language: system types: [python] pass_filenames: false always_run: true - id: pytest-integration name: integration tests - entry: pytest -c ./tests/pytest-config.ini ./tests/integration + entry: env PYTHONPATH=src ./venv/bin/python -m pytest -c ./tests/pytest-config.ini ./tests/integration language: system types: [python] pass_filenames: false diff --git a/README.md b/README.md index 0e979d0..56a99c3 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@

Python CQRS

Event-Driven Architecture Framework for Distributed Systems

- Python 3.10+ · Full documentation: mkdocs.python-cqrs.dev + Python 3.9+ · Full documentation: mkdocs.python-cqrs.dev

@@ -88,7 +88,7 @@ project ([documentation](https://akhundmurad.github.io/diator/)) with several en ## Installation -**Python 3.10+** is required. +**Python 3.9+** is required. CI runs on Python **3.9-3.14**. ```bash pip install python-cqrs diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index fdb209c..7b48fc5 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -1,4 +1,3 @@ -version: '3' services: mysql_dev: image: mysql:8.3.0 diff --git a/examples/cor_mermaid.py b/examples/cor_mermaid.py index 5aeb408..2320a08 100644 --- a/examples/cor_mermaid.py +++ b/examples/cor_mermaid.py @@ -81,7 +81,7 @@ class PaymentResult(cqrs.Response): """Payment processing result.""" success: bool - transaction_id: str | None = None + transaction_id: typing.Optional[str] = None message: str = "" @@ -109,7 +109,7 @@ def __init__(self) -> None: def events(self) -> typing.List[Event]: return self._events.copy() - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: """Process credit card payment.""" if request.payment_method == "credit_card": transaction_id = f"cc_{request.user_id}_{int(request.amount * 100)}" @@ -143,7 +143,7 @@ def __init__(self) -> None: def events(self) -> typing.List[Event]: return self._events.copy() - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: """Process PayPal payment.""" if request.payment_method == "paypal": transaction_id = f"pp_{request.user_id}_{int(request.amount * 100)}" @@ -177,7 +177,7 @@ def __init__(self) -> None: def events(self) -> typing.List[Event]: return self._events.copy() - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: """Process bank transfer payment.""" if request.payment_method == "bank_transfer": transaction_id = f"bt_{request.user_id}_{int(request.amount * 100)}" @@ -207,7 +207,7 @@ class DefaultPaymentHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResu def events(self) -> typing.List[Event]: return [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: """Handle unsupported payment methods.""" return PaymentResult( success=False, diff --git a/examples/cor_request_fallback.py b/examples/cor_request_fallback.py index 0a193cc..3fa7d4b 100644 --- a/examples/cor_request_fallback.py +++ b/examples/cor_request_fallback.py @@ -55,6 +55,7 @@ import asyncio import logging +import typing import di from di import dependent @@ -96,7 +97,7 @@ class SourceAHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]): def events(self) -> list[cqrs.Event]: return [] - async def handle(self, request: FetchDataCommand) -> FetchDataResult | None: + async def handle(self, request: FetchDataCommand) -> typing.Optional[FetchDataResult]: if request.source == "a": logger.info("COR chain: SourceAHandler handled source=a") HANDLER_SOURCE.append("chain") @@ -109,7 +110,7 @@ class SourceBHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]): def events(self) -> list[cqrs.Event]: return [] - async def handle(self, request: FetchDataCommand) -> FetchDataResult | None: + async def handle(self, request: FetchDataCommand) -> typing.Optional[FetchDataResult]: if request.source == "b": logger.info("COR chain: SourceBHandler handled source=b") HANDLER_SOURCE.append("chain") @@ -124,7 +125,7 @@ class DefaultChainHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]): def events(self) -> list[cqrs.Event]: return [] - async def handle(self, request: FetchDataCommand) -> FetchDataResult | None: + async def handle(self, request: FetchDataCommand) -> typing.Optional[FetchDataResult]: if request.source == "error": logger.info("COR chain: DefaultChainHandler raising ConnectionError for source=error") raise ConnectionError("Downstream service unavailable") diff --git a/examples/cor_request_handler.py b/examples/cor_request_handler.py index a69f5b5..7fc23e2 100644 --- a/examples/cor_request_handler.py +++ b/examples/cor_request_handler.py @@ -82,7 +82,7 @@ class ProcessPaymentCommand(cqrs.Request): class PaymentResult(cqrs.Response): success: bool - transaction_id: str | None = None + transaction_id: typing.Optional[str] = None message: str = "" @@ -101,7 +101,7 @@ def __init__(self) -> None: super().__init__() self._events: typing.List[cqrs.Event] = [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: if request.payment_method == "credit_card": transaction_id = f"cc_{request.user_id}_{int(request.amount * 100)}" TRANSACTIONS["credit_card"].append(transaction_id) @@ -134,7 +134,7 @@ def __init__(self) -> None: super().__init__() self._events: typing.List[cqrs.Event] = [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: if request.payment_method == "paypal": transaction_id = f"pp_{request.user_id}_{int(request.amount * 100)}" TRANSACTIONS["paypal"].append(transaction_id) @@ -167,7 +167,7 @@ def __init__(self) -> None: super().__init__() self._events: typing.List[cqrs.Event] = [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: if request.payment_method == "bank_transfer": transaction_id = f"bt_{request.user_id}_{int(request.amount * 100)}" TRANSACTIONS["bank_transfer"].append(transaction_id) @@ -198,7 +198,7 @@ class DefaultPaymentHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResu def events(self) -> typing.List[cqrs.Event]: return [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: # Default handler always handles the request (end of chain) print( f"Default: Unsupported payment method '{request.payment_method}' for user {request.user_id}", diff --git a/examples/kafka_event_consuming.py b/examples/kafka_event_consuming.py index b6f7a89..2d7bf87 100644 --- a/examples/kafka_event_consuming.py +++ b/examples/kafka_event_consuming.py @@ -114,7 +114,7 @@ async def empty_message_decoder( [kafka.KafkaMessage], typing.Awaitable[types.DecodedMessage], ], -) -> types.DecodedMessage | None: +) -> typing.Optional[types.DecodedMessage]: """ Decode a kafka message and return it if it is not empty. """ @@ -158,7 +158,7 @@ def mediator_factory() -> cqrs.EventMediator: decoder=empty_message_decoder, ) async def hello_world_event_handler( - body: cqrs.NotificationEvent[HelloWorldPayload] | deserializers.DeserializeJsonError | None, + body: typing.Union[cqrs.NotificationEvent[HelloWorldPayload], typing.Optional[deserializers.DeserializeJsonError]], msg: kafka.KafkaMessage, mediator: cqrs.EventMediator = faststream.Depends(mediator_factory), ): diff --git a/examples/saga.py b/examples/saga.py index 417eb54..2be0eeb 100644 --- a/examples/saga.py +++ b/examples/saga.py @@ -86,6 +86,7 @@ import asyncio import dataclasses import logging +import typing import uuid import di @@ -119,9 +120,9 @@ class OrderContext(SagaContext): shipping_address: str # These fields are populated by steps during execution - inventory_reservation_id: str | None = None - payment_id: str | None = None - shipment_id: str | None = None + inventory_reservation_id: typing.Optional[str] = None + payment_id: typing.Optional[str] = None + shipment_id: typing.Optional[str] = None # ============================================================================ diff --git a/examples/saga_fallback.py b/examples/saga_fallback.py index 78a8fa9..683ba18 100644 --- a/examples/saga_fallback.py +++ b/examples/saga_fallback.py @@ -70,6 +70,7 @@ import asyncio import dataclasses import logging +import typing import uuid import di @@ -104,7 +105,7 @@ class OrderContext(SagaContext): amount: float # This field is populated by step during execution - reservation_id: str | None = None + reservation_id: typing.Optional[str] = None # ============================================================================ diff --git a/examples/saga_fastapi_sse.py b/examples/saga_fastapi_sse.py index 8cb8b86..5d2a766 100644 --- a/examples/saga_fastapi_sse.py +++ b/examples/saga_fastapi_sse.py @@ -179,9 +179,9 @@ class OrderContext(SagaContext): total_amount: float shipping_address: str - inventory_reservation_id: str | None = None - payment_id: str | None = None - shipment_id: str | None = None + inventory_reservation_id: typing.Optional[str] = None + payment_id: typing.Optional[str] = None + shipment_id: typing.Optional[str] = None class ProcessOrderRequest(pydantic.BaseModel): @@ -448,7 +448,7 @@ def mediator_factory() -> cqrs.SagaMediator: ) -def serialize_response(response: Response | None) -> dict[str, typing.Any]: +def serialize_response(response: typing.Optional[Response]) -> dict[str, typing.Any]: if response is None: return {} return response.to_dict() diff --git a/examples/saga_mermaid.py b/examples/saga_mermaid.py index 84252f7..f3758b9 100644 --- a/examples/saga_mermaid.py +++ b/examples/saga_mermaid.py @@ -87,9 +87,9 @@ class OrderContext(SagaContext): total_amount: float shipping_address: str - inventory_reservation_id: str | None = None - payment_id: str | None = None - shipment_id: str | None = None + inventory_reservation_id: typing.Optional[str] = None + payment_id: typing.Optional[str] = None + shipment_id: typing.Optional[str] = None class ReserveInventoryResponse(Response): diff --git a/examples/saga_recovery.py b/examples/saga_recovery.py index a7848be..b117bbb 100644 --- a/examples/saga_recovery.py +++ b/examples/saga_recovery.py @@ -127,9 +127,9 @@ class OrderContext(SagaContext): shipping_address: str # These fields are populated by steps during execution - inventory_reservation_id: str | None = None - payment_id: str | None = None - shipment_id: str | None = None + inventory_reservation_id: typing.Optional[str] = None + payment_id: typing.Optional[str] = None + shipment_id: typing.Optional[str] = None # ============================================================================ diff --git a/examples/saga_recovery_scheduler.py b/examples/saga_recovery_scheduler.py index 16d6d5e..89318e9 100644 --- a/examples/saga_recovery_scheduler.py +++ b/examples/saga_recovery_scheduler.py @@ -112,9 +112,9 @@ class OrderContext(SagaContext): total_amount: float shipping_address: str - inventory_reservation_id: str | None = None - payment_id: str | None = None - shipment_id: str | None = None + inventory_reservation_id: typing.Optional[str] = None + payment_id: typing.Optional[str] = None + shipment_id: typing.Optional[str] = None # ============================================================================ @@ -536,7 +536,7 @@ async def recovery_loop( storage: ISagaStorage, *, interval_seconds: float = RECOVERY_INTERVAL_SECONDS, - max_iterations: int | None = None, + max_iterations: typing.Optional[int] = None, ) -> None: """ Run the recovery scheduler loop. diff --git a/pyproject.toml b/pyproject.toml index 9896e07..d750e00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,9 +11,13 @@ classifiers = [ "Development Status :: 4 - Beta", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12" + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14" ] dependencies = [ "dataclass-wizard==0.*", @@ -22,15 +26,14 @@ dependencies = [ "orjson==3.*", "pydantic==2.*", "sqlalchemy[asyncio]==2.0.*", - "python-dotenv==1.*", - "retry-async==0.1.*", + "python-dotenv>=0.21,<2", "typing-extensions>=4.0" ] description = "Event-Driven Architecture Framework for Distributed Systems" maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" -requires-python = ">=3.10" +requires-python = ">=3.9,<3.15" version = "4.10.1" [project.optional-dependencies] diff --git a/pyrightconfig.json b/pyrightconfig.json index 2b2abd5..067c3d3 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -8,18 +8,18 @@ "defineConstant": { "DEBUG": true }, - "pythonVersion": "3.10", + "pythonVersion": "3.9", "pythonPlatform": "Linux", "executionEnvironments": [ { "root": "./", - "pythonVersion": "3.10", + "pythonVersion": "3.9", "pythonPlatform": "Linux", "reportMissingImports": "error" }, { "root": "./examples", - "pythonVersion": "3.10", + "pythonVersion": "3.9", "pythonPlatform": "Linux", "reportMissingImports": "warning" } diff --git a/ruff.toml b/ruff.toml index 0fba781..0f23677 100644 --- a/ruff.toml +++ b/ruff.toml @@ -32,8 +32,8 @@ exclude = [ line-length = 120 indent-width = 4 -# Assume Python 3.10 -target-version = "py310" +# Assume Python 3.9 +target-version = "py39" [lint] # Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. diff --git a/src/cqrs/adapters/circuit_breaker.py b/src/cqrs/adapters/circuit_breaker.py index 46c5218..ca063c5 100644 --- a/src/cqrs/adapters/circuit_breaker.py +++ b/src/cqrs/adapters/circuit_breaker.py @@ -105,7 +105,7 @@ def default_memory_storage_factory(name: str) -> _CircuitBreakerStorage: return CircuitMemoryStorage(state=aiobreaker.CircuitBreakerState.CLOSED) -def _identifier_to_name(identifier: type | str) -> str: +def _identifier_to_name(identifier: typing.Union[type, str]) -> str: """Build circuit breaker namespace from type or string.""" if isinstance(identifier, str): return identifier @@ -142,8 +142,8 @@ def __init__( self, fail_max: int = 5, timeout_duration: int = 60, - exclude: list[type[Exception]] | None = None, - storage_factory: StorageFactory | None = None, + exclude: typing.Optional[list[type[Exception]]] = None, + storage_factory: typing.Optional[StorageFactory] = None, ) -> None: if CircuitBreaker is None: raise ImportError( @@ -158,7 +158,7 @@ def __init__( # Dictionary to store circuit breakers per identifier (type or str) self._breakers: dict[str, typing.Any] = {} # type: ignore[type-arg] - def _get_breaker(self, identifier: type | str) -> typing.Any: # type: ignore[return-type] + def _get_breaker(self, identifier: typing.Union[type, str]) -> typing.Any: # type: ignore[return-type] """ Get or create circuit breaker for an identifier (type or string). @@ -208,7 +208,7 @@ def _create_breaker(self, name: str) -> typing.Any: # type: ignore[return-type] async def call( self, - identifier: type | str, + identifier: typing.Union[type, str], func: typing.Callable[..., typing.Awaitable[typing.Any]], *args: typing.Any, **kwargs: typing.Any, diff --git a/src/cqrs/adapters/kafka.py b/src/cqrs/adapters/kafka.py index ceb6c8f..b507e46 100644 --- a/src/cqrs/adapters/kafka.py +++ b/src/cqrs/adapters/kafka.py @@ -1,5 +1,4 @@ import asyncio -import functools import logging import ssl import typing @@ -8,7 +7,6 @@ from cqrs.serializers import default import aiokafka -import retry_async from aiokafka import errors @@ -17,23 +15,19 @@ "kafka_producer_factory", ) -_retry = functools.partial( - retry_async.retry, - exceptions=( - errors.KafkaConnectionError, - errors.NodeNotReadyError, - errors.RequestTimedOutError, - ), - is_async=True, +_RETRYABLE_KAFKA_EXCEPTIONS = ( + errors.KafkaConnectionError, + errors.NodeNotReadyError, + errors.RequestTimedOutError, ) -SecurityProtocol: typing.TypeAlias = typing.Literal[ +SecurityProtocol = typing.Literal[ "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL", ] -SaslMechanism: typing.TypeAlias = typing.Literal[ +SaslMechanism = typing.Literal[ "PLAIN", "GSSAPI", "SCRAM-SHA-256", @@ -44,7 +38,7 @@ logger = logging.getLogger("cqrs") logger.setLevel(logging.DEBUG) -Serializer = typing.Callable[[typing.Any], typing.ByteString | None] +Serializer = typing.Callable[[typing.Any], typing.Optional[typing.ByteString]] class KafkaProducer(protocol.KafkaProducer): @@ -73,26 +67,27 @@ async def produce(self, topic: typing.Text, message: typing.Any): Produces event to kafka broker. Tries to reconnect if connect has been lost or has not been opened. """ - await _retry(tries=self._retry_count, delay=self._retry_delay)(self._produce)( - topic, - message, - ) + for attempt in range(1, self._retry_count + 1): + try: + await self._produce(topic, message) + return + except _RETRYABLE_KAFKA_EXCEPTIONS: + if attempt == self._retry_count: + raise + await asyncio.sleep(self._retry_delay) def kafka_producer_factory( dsn: typing.Text, security_protocol: SecurityProtocol = "PLAINTEXT", sasl_mechanism: SaslMechanism = "PLAIN", - ssl_context: ssl.SSLContext | None = None, + ssl_context: typing.Optional[ssl.SSLContext] = None, retry_count: int = 3, retry_delay: int = 1, - user: typing.Text | None = None, - password: typing.Text | None = None, - value_serializer: Serializer | None = None, + user: typing.Optional[typing.Text] = None, + password: typing.Optional[typing.Text] = None, + value_serializer: typing.Optional[Serializer] = None, ) -> KafkaProducer: - loop = asyncio.get_event_loop() - asyncio.set_event_loop(loop) - producer = aiokafka.AIOKafkaProducer( bootstrap_servers=dsn, value_serializer=value_serializer or default.default_serializer, @@ -101,7 +96,6 @@ def kafka_producer_factory( sasl_plain_username=user, sasl_plain_password=password, ssl_context=ssl_context, - loop=loop, ) return KafkaProducer( producer=producer, diff --git a/src/cqrs/circuit_breaker.py b/src/cqrs/circuit_breaker.py index 172eab6..e11d0b7 100644 --- a/src/cqrs/circuit_breaker.py +++ b/src/cqrs/circuit_breaker.py @@ -18,7 +18,7 @@ class ICircuitBreaker(typing.Protocol): async def call( self, - identifier: type | str, + identifier: typing.Union[type, str], func: typing.Callable[..., typing.Awaitable[typing.Any]], *args: typing.Any, **kwargs: typing.Any, @@ -56,7 +56,7 @@ def is_circuit_breaker_error(self, exc: Exception) -> bool: def should_use_fallback( primary_error: Exception, - circuit_breaker: ICircuitBreaker | None, + circuit_breaker: typing.Optional[ICircuitBreaker], failure_exceptions: tuple[type[Exception], ...], ) -> bool: """ diff --git a/src/cqrs/deserializers/exceptions.py b/src/cqrs/deserializers/exceptions.py index 4cb2d0a..dcb58ad 100644 --- a/src/cqrs/deserializers/exceptions.py +++ b/src/cqrs/deserializers/exceptions.py @@ -15,4 +15,4 @@ class DeserializeJsonError: error_message: str error_type: typing.Type[Exception] - message_data: str | bytes | None + message_data: typing.Union[str, typing.Optional[bytes]] diff --git a/src/cqrs/deserializers/json.py b/src/cqrs/deserializers/json.py index 9f25565..6782d26 100644 --- a/src/cqrs/deserializers/json.py +++ b/src/cqrs/deserializers/json.py @@ -80,7 +80,10 @@ def __init__(self, model: typing.Type[typing.Any]): # Store model - type is preserved through generic parameter _T for return type self._model: typing.Type[typing.Any] = model - def __call__(self, data: str | bytes | None) -> _T | None | DeserializeJsonError: + def __call__( + self, + data: typing.Union[str, typing.Optional[bytes]], + ) -> typing.Union[typing.Optional[_T], DeserializeJsonError]: """ Deserialize JSON data into model instance. diff --git a/src/cqrs/dispatcher/event.py b/src/cqrs/dispatcher/event.py index a6b0169..a383cf1 100644 --- a/src/cqrs/dispatcher/event.py +++ b/src/cqrs/dispatcher/event.py @@ -8,7 +8,7 @@ from cqrs.events.map import EventMap from cqrs.middlewares.base import MiddlewareChain -_EventHandler: typing.TypeAlias = EventHandler +_EventHandler = EventHandler logger = logging.getLogger("cqrs") @@ -18,7 +18,7 @@ def __init__( self, event_map: EventMap, container: Container, - middleware_chain: MiddlewareChain | None = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, ) -> None: self._event_map = event_map self._container = container diff --git a/src/cqrs/dispatcher/models.py b/src/cqrs/dispatcher/models.py index 5db8e44..808c725 100644 --- a/src/cqrs/dispatcher/models.py +++ b/src/cqrs/dispatcher/models.py @@ -25,4 +25,4 @@ class SagaDispatchResult: step_result: SagaStepResult events: typing.List[IEvent] = dataclasses.field(default_factory=list) - saga_id: str | None = None + saga_id: typing.Optional[str] = None diff --git a/src/cqrs/dispatcher/request.py b/src/cqrs/dispatcher/request.py index 8e9b5cd..dd4583f 100644 --- a/src/cqrs/dispatcher/request.py +++ b/src/cqrs/dispatcher/request.py @@ -10,7 +10,7 @@ RequestHandlerTypeError, ) from cqrs.dispatcher.models import RequestDispatchResult -from cqrs.middlewares.base import MiddlewareChain +from cqrs.middlewares.base import HandleType, MiddlewareChain from cqrs.requests.cor_request_handler import ( CORRequestHandler, build_chain, @@ -23,7 +23,7 @@ logger = logging.getLogger("cqrs") -_RequestHandler: typing.TypeAlias = RequestHandler | CORRequestHandler +_RequestHandler = typing.Union[RequestHandler, CORRequestHandler] class RequestDispatcher: @@ -31,7 +31,7 @@ def __init__( self, request_map: RequestMap, container: Container, - middleware_chain: MiddlewareChain | None = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, ) -> None: self._request_map = request_map self._container = container @@ -80,7 +80,9 @@ async def _dispatch_fallback( """Dispatch using primary handler with fallback on failure.""" primary = await self._container.resolve(fallback_config.primary) try: - wrapped_primary = self._middleware_chain.wrap(primary.handle) + wrapped_primary = self._middleware_chain.wrap( + typing.cast(HandleType, primary.handle), + ) if fallback_config.circuit_breaker is not None: response = await fallback_config.circuit_breaker.call( fallback_config.primary, @@ -116,7 +118,9 @@ async def _dispatch_fallback( fallback_config.fallback.__name__, ) fallback_handler = await self._container.resolve(fallback_config.fallback) - wrapped_fallback = self._middleware_chain.wrap(fallback_handler.handle) + wrapped_fallback = self._middleware_chain.wrap( + typing.cast(HandleType, fallback_handler.handle), + ) response = await wrapped_fallback(request) return RequestDispatchResult( response=response, diff --git a/src/cqrs/dispatcher/saga.py b/src/cqrs/dispatcher/saga.py index d3e3202..ea41e60 100644 --- a/src/cqrs/dispatcher/saga.py +++ b/src/cqrs/dispatcher/saga.py @@ -27,8 +27,8 @@ def __init__( self, saga_map: SagaMap, container: Container, - storage: ISagaStorage | None = None, - middleware_chain: MiddlewareChain | None = None, + storage: typing.Optional[ISagaStorage] = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, compensation_retry_count: int = 3, compensation_retry_delay: float = 1.0, compensation_retry_backoff: float = 2.0, @@ -56,7 +56,7 @@ def __init__( def dispatch( self, context: SagaContext, - saga_id: uuid.UUID | None = None, + saga_id: typing.Optional[uuid.UUID] = None, ) -> typing.AsyncIterator[SagaDispatchResult]: """ Dispatch a saga execution for the given context. @@ -81,7 +81,7 @@ def dispatch( async def _dispatch_impl( self, context: SagaContext, - saga_id: uuid.UUID | None = None, + saga_id: typing.Optional[uuid.UUID] = None, ) -> typing.AsyncIterator[SagaDispatchResult]: # Find saga type by context type saga_type = self._saga_map.get(type(context)) diff --git a/src/cqrs/dispatcher/streaming.py b/src/cqrs/dispatcher/streaming.py index 2caabec..4900651 100644 --- a/src/cqrs/dispatcher/streaming.py +++ b/src/cqrs/dispatcher/streaming.py @@ -34,7 +34,7 @@ def __init__( self, request_map: RequestMap, container: Container, - middleware_chain: MiddlewareChain | None = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, ) -> None: self._request_map = request_map self._container = container diff --git a/src/cqrs/events/bootstrap.py b/src/cqrs/events/bootstrap.py index 36da6b1..5bfccd8 100644 --- a/src/cqrs/events/bootstrap.py +++ b/src/cqrs/events/bootstrap.py @@ -14,7 +14,7 @@ def setup_mediator( container: di_container_impl.DIContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - events_mapper: typing.Callable[[events.EventMap], None] | None = None, + events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, ) -> cqrs.EventMediator: ... @@ -22,14 +22,14 @@ def setup_mediator( def setup_mediator( container: CQRSContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - events_mapper: typing.Callable[[events.EventMap], None] | None = None, + events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, ) -> cqrs.EventMediator: ... def setup_mediator( - container: di_container_impl.DIContainer | CQRSContainer, + container: typing.Union[di_container_impl.DIContainer, CQRSContainer], middlewares: typing.Iterable[mediator_middlewares.Middleware], - events_mapper: typing.Callable[[events.EventMap], None] | None = None, + events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, ) -> cqrs.EventMediator: """ Create an event mediator with the given container and middlewares. @@ -75,26 +75,26 @@ def bind_events(event_map: events.EventMap) -> None: @overload def bootstrap( di_container: di.Container, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, ) -> cqrs.EventMediator: ... @overload def bootstrap( di_container: CQRSContainer, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, ) -> cqrs.EventMediator: ... def bootstrap( - di_container: di.Container | CQRSContainer, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + di_container: typing.Union[di.Container, CQRSContainer], + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, ) -> cqrs.EventMediator: """ Bootstrap an event mediator with optional middlewares and event bindings. diff --git a/src/cqrs/events/event_emitter.py b/src/cqrs/events/event_emitter.py index e21af7a..26ea9a9 100644 --- a/src/cqrs/events/event_emitter.py +++ b/src/cqrs/events/event_emitter.py @@ -10,7 +10,7 @@ logger = logging.getLogger("cqrs") -_H: typing.TypeAlias = event_handler.EventHandler +_H = event_handler.EventHandler class EventEmitter: @@ -28,7 +28,7 @@ def __init__( self, event_map: map.EventMap, container: di_container.Container, - message_broker: message_brokers.MessageBroker | None = None, + message_broker: typing.Optional[message_brokers.MessageBroker] = None, ) -> None: """ Initialize the event emitter. diff --git a/src/cqrs/events/event_processor.py b/src/cqrs/events/event_processor.py index 8209263..9aa7149 100644 --- a/src/cqrs/events/event_processor.py +++ b/src/cqrs/events/event_processor.py @@ -33,7 +33,7 @@ class EventProcessor: def __init__( self, event_map: EventMap, - event_emitter: EventEmitter | None = None, + event_emitter: typing.Optional[EventEmitter] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, ) -> None: diff --git a/src/cqrs/events/fallback.py b/src/cqrs/events/fallback.py index 6c5ba19..1512f49 100644 --- a/src/cqrs/events/fallback.py +++ b/src/cqrs/events/fallback.py @@ -46,7 +46,7 @@ class EventHandlerFallback: primary: EventHandlerT fallback: EventHandlerT failure_exceptions: tuple[type[Exception], ...] = () - circuit_breaker: ICircuitBreaker | None = None + circuit_breaker: typing.Optional[ICircuitBreaker] = None def __post_init__(self) -> None: if not isinstance(self.primary, type) or not isinstance(self.fallback, type): diff --git a/src/cqrs/events/map.py b/src/cqrs/events/map.py index dbd5b22..b5c524c 100644 --- a/src/cqrs/events/map.py +++ b/src/cqrs/events/map.py @@ -5,8 +5,8 @@ from cqrs.events.fallback import EventHandlerFallback _KT = typing.TypeVar("_KT", bound=typing.Type[IEvent]) -_HandlerItem = typing.Type[event_handler.EventHandler] | EventHandlerFallback -_VT: typing.TypeAlias = typing.List[_HandlerItem] +_HandlerItem = typing.Union[typing.Type[event_handler.EventHandler], EventHandlerFallback] +_VT = typing.List[_HandlerItem] class EventMap(typing.Dict[_KT, _VT]): diff --git a/src/cqrs/generic_utils.py b/src/cqrs/generic_utils.py index 438359b..cec6cbf 100644 --- a/src/cqrs/generic_utils.py +++ b/src/cqrs/generic_utils.py @@ -7,7 +7,7 @@ def get_generic_args_for_origin( klass: type, origin_classes: tuple[type, ...], min_args: int = 1, -) -> tuple[type, ...] | None: +) -> typing.Optional[tuple[type, ...]]: """ Extract generic type arguments from a class that inherits from a Generic base. diff --git a/src/cqrs/mediator.py b/src/cqrs/mediator.py index 71c0d60..dad9fca 100644 --- a/src/cqrs/mediator.py +++ b/src/cqrs/mediator.py @@ -61,9 +61,9 @@ def __init__( self, request_map: RequestMap, container: Container, - event_emitter: EventEmitter | None = None, - middleware_chain: MiddlewareChain | None = None, - event_map: EventMap | None = None, + event_emitter: typing.Optional[EventEmitter] = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, + event_map: typing.Optional[EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, *, @@ -116,7 +116,7 @@ def __init__( self, event_map: EventMap, container: Container, - middleware_chain: MiddlewareChain | None = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, *, dispatcher_type: typing.Type[EventDispatcher] = EventDispatcher, ): @@ -166,9 +166,9 @@ def __init__( self, request_map: RequestMap, container: Container, - event_emitter: EventEmitter | None = None, - middleware_chain: MiddlewareChain | None = None, - event_map: EventMap | None = None, + event_emitter: typing.Optional[EventEmitter] = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, + event_map: typing.Optional[EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, *, @@ -189,7 +189,7 @@ def __init__( def stream( self, request: IRequest, - ) -> typing.AsyncIterator[IResponse | None]: + ) -> typing.AsyncIterator[IResponse]: """ Stream results from a generator-based handler. @@ -207,7 +207,7 @@ def stream( async def _stream_impl( self, request: IRequest, - ) -> typing.AsyncIterator[IResponse | None]: + ) -> typing.AsyncIterator[IResponse]: async for dispatch_result in self._dispatcher.dispatch(request): await self._event_processor.emit_events(dispatch_result.events) @@ -251,12 +251,12 @@ def __init__( self, saga_map: SagaMap, container: Container, - event_emitter: EventEmitter | None = None, - middleware_chain: MiddlewareChain | None = None, - event_map: EventMap | None = None, + event_emitter: typing.Optional[EventEmitter] = None, + middleware_chain: typing.Optional[MiddlewareChain] = None, + event_map: typing.Optional[EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, - storage: ISagaStorage | None = None, + storage: typing.Optional[ISagaStorage] = None, compensation_retry_count: int = 3, compensation_retry_delay: float = 1.0, compensation_retry_backoff: float = 2.0, @@ -282,7 +282,7 @@ def __init__( def stream( self, context: SagaContext, - saga_id: uuid.UUID | None = None, + saga_id: typing.Optional[uuid.UUID] = None, ) -> typing.AsyncIterator[SagaStepResult]: """ Stream results from saga execution. @@ -309,7 +309,7 @@ def stream( async def _stream_impl( self, context: SagaContext, - saga_id: uuid.UUID | None = None, + saga_id: typing.Optional[uuid.UUID] = None, ) -> typing.AsyncIterator[SagaStepResult]: async for dispatch_result in self._dispatcher.dispatch( context, diff --git a/src/cqrs/middlewares/base.py b/src/cqrs/middlewares/base.py index 60856f5..5af2821 100644 --- a/src/cqrs/middlewares/base.py +++ b/src/cqrs/middlewares/base.py @@ -4,11 +4,11 @@ from cqrs.saga.models import SagaContext from cqrs.requests.request import ReqT, ResT -HandleType = typing.Callable[[ReqT], typing.Awaitable[ResT] | ResT] +HandleType = typing.Callable[[ReqT], typing.Awaitable[typing.Optional[ResT]]] class Middleware(typing.Protocol[ReqT, ResT]): - async def __call__(self, request: ReqT, handle: HandleType) -> ResT | None: + async def __call__(self, request: ReqT, handle: HandleType) -> typing.Optional[ResT]: raise NotImplementedError diff --git a/src/cqrs/middlewares/logging.py b/src/cqrs/middlewares/logging.py index d1750b2..a874397 100644 --- a/src/cqrs/middlewares/logging.py +++ b/src/cqrs/middlewares/logging.py @@ -1,4 +1,5 @@ import logging +import typing from cqrs.middlewares import base from cqrs.middlewares.base import HandleType @@ -9,7 +10,7 @@ class LoggingMiddleware(base.Middleware): - async def __call__(self, request: IRequest, handle: HandleType) -> IResponse | None: + async def __call__(self, request: IRequest, handle: HandleType) -> typing.Optional[IResponse]: logger.debug( "Handle %s request", type(request).__name__, diff --git a/src/cqrs/outbox/map.py b/src/cqrs/outbox/map.py index 5cb41d4..01b854b 100644 --- a/src/cqrs/outbox/map.py +++ b/src/cqrs/outbox/map.py @@ -20,5 +20,5 @@ def register( def get( cls, event_name: typing.Text, - ) -> typing.Type[INotificationEvent] | None: + ) -> typing.Optional[typing.Type[INotificationEvent]]: return cls._registry.get(event_name) diff --git a/src/cqrs/outbox/mock.py b/src/cqrs/outbox/mock.py index 6375d27..d4abecb 100644 --- a/src/cqrs/outbox/mock.py +++ b/src/cqrs/outbox/mock.py @@ -28,7 +28,7 @@ def add(self, event: cqrs.INotificationEvent) -> None: async def get_many( self, batch_size: int = 100, - topic: typing.Text | None = None, + topic: typing.Optional[typing.Text] = None, ) -> typing.List[repository.OutboxedEvent]: return list( filter(lambda e: topic == e.topic, self.session.values()) if topic else list(self.session.values()), diff --git a/src/cqrs/outbox/repository.py b/src/cqrs/outbox/repository.py index 474b864..fb4c484 100644 --- a/src/cqrs/outbox/repository.py +++ b/src/cqrs/outbox/repository.py @@ -69,7 +69,7 @@ def add( async def get_many( self, batch_size: int = 100, - topic: typing.Text | None = None, + topic: typing.Optional[typing.Text] = None, ) -> typing.List[OutboxedEvent]: """Get many events from the repository.""" diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py index 99160ac..6de617f 100644 --- a/src/cqrs/outbox/sqlalchemy.py +++ b/src/cqrs/outbox/sqlalchemy.py @@ -138,7 +138,7 @@ def row_to_dict(self) -> typing.Dict[typing.Text, typing.Any]: def get_batch_query( cls, size: int, - topic: typing.Text | None = None, + topic: typing.Optional[typing.Text] = None, ) -> sqlalchemy.Select: return ( sqlalchemy.select(cls) @@ -192,7 +192,7 @@ class SqlAlchemyOutboxedEventRepository(repository.OutboxedEventRepository): def __init__( self, session: sql_session.AsyncSession, - compressor: compressors.Compressor | None = None, + compressor: typing.Optional[compressors.Compressor] = None, ): self.session = session self._compressor = compressor @@ -225,7 +225,7 @@ def add( ), ) - def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None: + def _process_events(self, model: OutboxModel) -> typing.Optional[repository.OutboxedEvent]: event_dict = model.row_to_dict() event_model = map.OutboxedEventMap.get(event_dict["event_name"]) @@ -248,7 +248,7 @@ def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None async def get_many( self, batch_size: int = 100, - topic: typing.Text | None = None, + topic: typing.Optional[typing.Text] = None, ) -> typing.List[repository.OutboxedEvent]: events: typing.Sequence[OutboxModel] = ( (await self.session.execute(OutboxModel.get_batch_query(batch_size, topic))).scalars().all() @@ -283,7 +283,7 @@ async def rollback(self): def rebind_outbox_model( model: typing.Any, new_base: DeclarativeMeta, - table_name: typing.Text | None = None, + table_name: typing.Optional[typing.Text] = None, ): model.__bases__ = (new_base,) model.__table__.name = table_name or model.__table__.name diff --git a/src/cqrs/producer.py b/src/cqrs/producer.py index f1dbaf8..155d14d 100644 --- a/src/cqrs/producer.py +++ b/src/cqrs/producer.py @@ -9,7 +9,7 @@ logger = logging.getLogger("cqrs") logger.setLevel(logging.DEBUG) -SessionFactory: typing.TypeAlias = typing.Callable[[], sql_session.AsyncSession] +SessionFactory = typing.Callable[[], sql_session.AsyncSession] class EventProducer: diff --git a/src/cqrs/requests/bootstrap.py b/src/cqrs/requests/bootstrap.py index 916f432..34c9c44 100644 --- a/src/cqrs/requests/bootstrap.py +++ b/src/cqrs/requests/bootstrap.py @@ -17,23 +17,23 @@ @overload def setup_event_emitter( container: di_container_impl.DIContainer, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - message_broker: protocol.MessageBroker | None = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, ) -> events.EventEmitter: ... @overload def setup_event_emitter( container: CQRSContainer, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - message_broker: protocol.MessageBroker | None = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, ): ... def setup_event_emitter( - container: di_container_impl.DIContainer | CQRSContainer, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - message_broker: protocol.MessageBroker | None = None, + container: typing.Union[di_container_impl.DIContainer, CQRSContainer], + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, ) -> events.EventEmitter: if message_broker is None: message_broker = DEFAULT_MESSAGE_BROKER @@ -54,8 +54,8 @@ def setup_mediator( event_emitter: events.EventEmitter, container: di_container_impl.DIContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, ) -> cqrs.RequestMediator: ... @@ -64,8 +64,8 @@ def setup_mediator( event_emitter: events.EventEmitter, container: CQRSContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, ) -> cqrs.RequestMediator: ... @@ -74,9 +74,9 @@ def setup_mediator( event_emitter: events.EventEmitter, container: di_container_impl.DIContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - event_map: events.EventMap | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + event_map: typing.Optional[events.EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, ) -> cqrs.RequestMediator: ... @@ -87,9 +87,9 @@ def setup_mediator( event_emitter: events.EventEmitter, container: CQRSContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - event_map: events.EventMap | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + event_map: typing.Optional[events.EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, ) -> cqrs.RequestMediator: ... @@ -97,11 +97,11 @@ def setup_mediator( def setup_mediator( event_emitter: events.EventEmitter, - container: di_container_impl.DIContainer | CQRSContainer, + container: typing.Union[di_container_impl.DIContainer, CQRSContainer], middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - event_map: events.EventMap | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + event_map: typing.Optional[events.EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, ) -> cqrs.RequestMediator: @@ -134,36 +134,36 @@ def setup_mediator( @overload def bootstrap( di_container: di.Container, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, ) -> cqrs.RequestMediator: ... @overload def bootstrap( di_container: CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, ) -> cqrs.RequestMediator: ... @overload def bootstrap( di_container: di.Container, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = False, ) -> cqrs.RequestMediator: ... @@ -172,25 +172,25 @@ def bootstrap( @overload def bootstrap( di_container: CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = False, ) -> cqrs.RequestMediator: ... def bootstrap( - di_container: di.Container | CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + di_container: typing.Union[di.Container, CQRSContainer], + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = False, ) -> cqrs.RequestMediator: @@ -239,9 +239,9 @@ def setup_streaming_mediator( event_emitter: events.EventEmitter, container: di_container_impl.DIContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, max_concurrent_event_handlers: int = 10, concurrent_event_handle_enable: bool = True, ) -> cqrs.StreamingRequestMediator: ... @@ -252,9 +252,9 @@ def setup_streaming_mediator( event_emitter: events.EventEmitter, container: CQRSContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, max_concurrent_event_handlers: int = 10, concurrent_event_handle_enable: bool = True, ) -> cqrs.StreamingRequestMediator: ... @@ -262,11 +262,11 @@ def setup_streaming_mediator( def setup_streaming_mediator( event_emitter: events.EventEmitter, - container: di_container_impl.DIContainer | CQRSContainer, + container: typing.Union[di_container_impl.DIContainer, CQRSContainer], middlewares: typing.Iterable[mediator_middlewares.Middleware], - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, max_concurrent_event_handlers: int = 10, concurrent_event_handle_enable: bool = True, ) -> cqrs.StreamingRequestMediator: @@ -299,12 +299,12 @@ def setup_streaming_mediator( @overload def bootstrap_streaming( di_container: di.Container, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, max_concurrent_event_handlers: int = 10, concurrent_event_handle_enable: bool = False, ) -> cqrs.StreamingRequestMediator: ... @@ -313,25 +313,25 @@ def bootstrap_streaming( @overload def bootstrap_streaming( di_container: CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, max_concurrent_event_handlers: int = 10, concurrent_event_handle_enable: bool = False, ) -> cqrs.StreamingRequestMediator: ... def bootstrap_streaming( - di_container: di.Container | CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - commands_mapper: typing.Callable[[RequestMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - queries_mapper: typing.Callable[[RequestMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, + di_container: typing.Union[di.Container, CQRSContainer], + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, max_concurrent_event_handlers: int = 10, concurrent_event_handle_enable: bool = False, ) -> cqrs.StreamingRequestMediator: diff --git a/src/cqrs/requests/cor_request_handler.py b/src/cqrs/requests/cor_request_handler.py index c102fe1..db942c9 100644 --- a/src/cqrs/requests/cor_request_handler.py +++ b/src/cqrs/requests/cor_request_handler.py @@ -6,6 +6,7 @@ from cqrs.events.event import IEvent from cqrs.requests.request import ReqT, ResT +from cqrs.response import IResponse class CORRequestHandler(abc.ABC, typing.Generic[ReqT, ResT]): @@ -22,13 +23,30 @@ def __init__(self, auth_service: AuthServiceProtocol) -> None: self._auth_service = auth_service self.events: typing.List[IEvent] = [] - async def handle(self, request: LoginCommand) -> None | None: + async def handle(self, request: LoginCommand) -> typing.Optional[None ]: if self._auth_service.can_authenticate(request): return await self._auth_service.authenticate(request) return await super().handle(request) """ - _next_handler: "CORRequestHandler[ReqT, ResT] | None" = None + _next_handler: "typing.Optional[CORRequestHandler[ReqT, ResT] ]" = None + + def __class_getitem__(cls, params): # type: ignore[override] + """ + Backward-compatible generic subscription. + + Supports both forms: + - CORRequestHandler[RequestType, ResponseType] + - CORRequestHandler[RequestType] # legacy shorthand + """ + if not isinstance(params, tuple): + params = (params,) + if len(params) == 1: + params = ( + params[0], + typing.Optional[IResponse], + ) + return super().__class_getitem__(params) # pyright: ignore[reportAttributeAccessIssue] def set_next( self, @@ -39,7 +57,7 @@ def set_next( return self._next_handler - async def next(self, request: ReqT) -> ResT | None: + async def next(self, request: ReqT) -> typing.Optional[ResT]: if self._next_handler: return await self._next_handler.handle(request) @@ -56,11 +74,11 @@ def events(self) -> typing.Sequence[IEvent]: return () @abc.abstractmethod - async def handle(self, request: ReqT) -> ResT | None: + async def handle(self, request: ReqT) -> typing.Optional[ResT]: raise NotImplementedError -CORRequestHandlerT: typing.TypeAlias = CORRequestHandler +CORRequestHandlerT = CORRequestHandler def build_chain(handlers: typing.List[CORRequestHandlerT]) -> CORRequestHandlerT: diff --git a/src/cqrs/requests/fallback.py b/src/cqrs/requests/fallback.py index 93f4677..e846d9a 100644 --- a/src/cqrs/requests/fallback.py +++ b/src/cqrs/requests/fallback.py @@ -7,7 +7,7 @@ from cqrs.generic_utils import get_generic_args_for_origin from cqrs.requests.request_handler import RequestHandler, StreamingRequestHandler -RequestHandlerT = type[RequestHandler] | type[StreamingRequestHandler] +RequestHandlerT = typing.Union[type[RequestHandler], type[StreamingRequestHandler]] _REQUEST_HANDLER_ORIGINS: tuple[type, ...] = (RequestHandler, StreamingRequestHandler) @@ -47,7 +47,7 @@ class RequestHandlerFallback: primary: RequestHandlerT fallback: RequestHandlerT failure_exceptions: tuple[type[Exception], ...] = () - circuit_breaker: ICircuitBreaker | None = None + circuit_breaker: typing.Optional[ICircuitBreaker] = None def __post_init__(self) -> None: if not isinstance(self.primary, type) or not isinstance(self.fallback, type): diff --git a/src/cqrs/requests/map.py b/src/cqrs/requests/map.py index e4eabcd..d32837b 100644 --- a/src/cqrs/requests/map.py +++ b/src/cqrs/requests/map.py @@ -13,11 +13,12 @@ _KT = typing.TypeVar("_KT", bound=typing.Type[IRequest]) # Type alias for handler types that can be bound to requests -HandlerType = ( - typing.Type[RequestHandler | StreamingRequestHandler] - | typing.List[typing.Type[CORRequestHandler]] - | RequestHandlerFallback -) +HandlerType = typing.Union[ + typing.Type[RequestHandler], + typing.Type[StreamingRequestHandler], + typing.List[typing.Type[CORRequestHandler]], + RequestHandlerFallback, +] class RequestMap(typing.Dict[_KT, HandlerType]): diff --git a/src/cqrs/requests/mermaid.py b/src/cqrs/requests/mermaid.py index eb87a90..9f453c2 100644 --- a/src/cqrs/requests/mermaid.py +++ b/src/cqrs/requests/mermaid.py @@ -144,13 +144,19 @@ def class_diagram(self) -> str: # Collect all types request_types: set[type] = set() response_types: set[type] = set() - handler_info: list[tuple[str, type | None, type | None]] = [] + handler_info: list[ + tuple[ + str, + typing.Optional[type], + typing.Optional[type], + ] + ] = [] # Extract type information from each handler for handler_type in handlers: handler_name = handler_type.__name__ - request_type: type | None = None - response_type: type | None = None + request_type: typing.Optional[type] = None + response_type: typing.Optional[type] = None # Extract generic type parameters from __orig_bases__ orig_bases = getattr(handler_type, "__orig_bases__", ()) @@ -187,8 +193,8 @@ def class_diagram(self) -> str: # Add CORRequestHandler base class lines.append(" class CORRequestHandler {") lines.append(" <>") - lines.append(" +handle(request) Response | None") - lines.append(" +next(request) Response | None") + lines.append(" +handle(request) typing.Optional[Response ]") + lines.append(" +next(request) typing.Optional[Response ]") lines.append(" +set_next(handler) CORRequestHandler") lines.append(" +events: List[Event]") lines.append(" }") @@ -197,8 +203,8 @@ def class_diagram(self) -> str: # Add handler classes for handler_name, request_type, response_type in handler_info: lines.append(f" class {handler_name} {{") - lines.append(" +handle(request) Response | None") - lines.append(" +next(request) Response | None") + lines.append(" +handle(request) typing.Optional[Response ]") + lines.append(" +next(request) typing.Optional[Response ]") lines.append(" +events: List[Event]") lines.append(" }") lines.append("") diff --git a/src/cqrs/requests/request.py b/src/cqrs/requests/request.py index 2a6d70f..df29fba 100644 --- a/src/cqrs/requests/request.py +++ b/src/cqrs/requests/request.py @@ -54,7 +54,7 @@ def from_dict(cls, **kwargs) -> Self: # Type variables for request/response (defined here to avoid circular import with # cqrs.types <-> cqrs.requests.request_handler). Re-exported from cqrs.types for compatibility. ReqT = typing.TypeVar("ReqT", bound=IRequest, contravariant=True) -ResT = typing.TypeVar("ResT", bound=IResponse | None, covariant=True) +ResT = typing.TypeVar("ResT", bound=typing.Optional[IResponse], covariant=True) @dataclasses.dataclass diff --git a/src/cqrs/saga/bootstrap.py b/src/cqrs/saga/bootstrap.py index f772da7..fc42380 100644 --- a/src/cqrs/saga/bootstrap.py +++ b/src/cqrs/saga/bootstrap.py @@ -21,11 +21,11 @@ def setup_saga_mediator( event_emitter: events.EventEmitter, container: di_container_impl.DIContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - event_map: events.EventMap | None = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + event_map: typing.Optional[events.EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, - saga_storage: ISagaStorage | None = None, + saga_storage: typing.Optional[ISagaStorage] = None, ) -> cqrs.SagaMediator: ... @@ -34,23 +34,23 @@ def setup_saga_mediator( event_emitter: events.EventEmitter, container: CQRSContainer, middlewares: typing.Iterable[mediator_middlewares.Middleware], - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - event_map: events.EventMap | None = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + event_map: typing.Optional[events.EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, - saga_storage: ISagaStorage | None = None, + saga_storage: typing.Optional[ISagaStorage] = None, ) -> cqrs.SagaMediator: ... def setup_saga_mediator( event_emitter: events.EventEmitter, - container: di_container_impl.DIContainer | CQRSContainer, + container: typing.Union[di_container_impl.DIContainer, CQRSContainer], middlewares: typing.Iterable[mediator_middlewares.Middleware], - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - event_map: events.EventMap | None = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + event_map: typing.Optional[events.EventMap] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, - saga_storage: ISagaStorage | None = None, + saga_storage: typing.Optional[ISagaStorage] = None, ) -> cqrs.SagaMediator: """ Setup SagaMediator with configured saga map and dependencies. @@ -101,36 +101,36 @@ def setup_saga_mediator( @overload def bootstrap( di_container: di.Container, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, - saga_storage: ISagaStorage | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, + saga_storage: typing.Optional[ISagaStorage] = None, ) -> cqrs.SagaMediator: ... @overload def bootstrap( di_container: CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, - saga_storage: ISagaStorage | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, + saga_storage: typing.Optional[ISagaStorage] = None, ) -> cqrs.SagaMediator: ... @overload def bootstrap( di_container: di.Container, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, - saga_storage: ISagaStorage | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, + saga_storage: typing.Optional[ISagaStorage] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, ) -> cqrs.SagaMediator: ... @@ -139,25 +139,25 @@ def bootstrap( @overload def bootstrap( di_container: CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, - saga_storage: ISagaStorage | None = None, + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, + saga_storage: typing.Optional[ISagaStorage] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, ) -> cqrs.SagaMediator: ... def bootstrap( - di_container: di.Container | CQRSContainer, - message_broker: protocol.MessageBroker | None = None, - middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None, - sagas_mapper: typing.Callable[[SagaMap], None] | None = None, - domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None, - on_startup: typing.List[typing.Callable[[], None]] | None = None, - saga_storage: ISagaStorage | None = None, + di_container: typing.Union[di.Container, CQRSContainer], + message_broker: typing.Optional[protocol.MessageBroker] = None, + middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None, + sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None, + domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None, + on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None, + saga_storage: typing.Optional[ISagaStorage] = None, max_concurrent_event_handlers: int = 1, concurrent_event_handle_enable: bool = True, ) -> cqrs.SagaMediator: diff --git a/src/cqrs/saga/compensation.py b/src/cqrs/saga/compensation.py index 5ab3eb5..3d74b86 100644 --- a/src/cqrs/saga/compensation.py +++ b/src/cqrs/saga/compensation.py @@ -19,11 +19,11 @@ def __init__( self, saga_id: typing.Any, context: ContextT, - storage: ISagaStorage | SagaStorageRun, + storage: typing.Union[ISagaStorage, SagaStorageRun], retry_count: int = 3, retry_delay: float = 1.0, retry_backoff: float = 2.0, - on_after_compensate_step: typing.Callable[[], typing.Awaitable[None]] | None = None, + on_after_compensate_step: typing.Optional[typing.Callable[[], typing.Awaitable[None]]] = None, ) -> None: """ Create a SagaCompensator configured to perform compensation of completed saga steps with retry and optional post-step callback. @@ -159,7 +159,7 @@ async def _compensate_step_with_retry( """ step_name = step.__class__.__name__ - last_exception: Exception | None = None + last_exception: typing.Optional[Exception] = None for attempt in range(1, self._retry_count + 1): try: await step.compensate(self._context) diff --git a/src/cqrs/saga/execution.py b/src/cqrs/saga/execution.py index 1c0b611..2c7b560 100644 --- a/src/cqrs/saga/execution.py +++ b/src/cqrs/saga/execution.py @@ -21,7 +21,7 @@ class SagaStateManager: def __init__( self, saga_id: typing.Any, - storage: ISagaStorage | SagaStorageRun, + storage: typing.Union[ISagaStorage, SagaStorageRun], ) -> None: """ Create a SagaStateManager bound to a specific saga identifier and storage backend. @@ -61,7 +61,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - error: str | None = None, + error: typing.Optional[str] = None, ) -> None: """Log step execution.""" await self._storage.log_step( @@ -79,9 +79,9 @@ class SagaRecoveryManager: def __init__( self, saga_id: typing.Any, - storage: ISagaStorage | SagaStorageRun, + storage: typing.Union[ISagaStorage, SagaStorageRun], container: Container, - saga_steps: list[type[SagaStepHandler] | Fallback], + saga_steps: list[typing.Union[type[SagaStepHandler], Fallback]], ) -> None: """ Construct a SagaRecoveryManager that holds the identifiers, storage, DI container, and configured saga steps required to reconstruct a saga's execution state. @@ -229,7 +229,12 @@ async def execute_fallback_step( self, fallback_wrapper: Fallback, completed_step_names: set[str], - ) -> tuple[SagaStepResult[ContextT, typing.Any] | None, SagaStepHandler | None]: + ) -> typing.Optional[ + tuple[ + SagaStepResult[ContextT, typing.Any], + SagaStepHandler, + ] + ]: """ Execute a Fallback step with context snapshot/restore mechanism. @@ -251,13 +256,13 @@ async def execute_fallback_step( logger.debug( f"Skipping already completed Fallback primary step: {primary_step_name}", ) - return None, None + return None if fallback_step_name in completed_step_names: logger.debug( f"Skipping already completed Fallback fallback step: {fallback_step_name}", ) - return None, None + return None # Resolve step handlers primary_step = await self._container.resolve(fallback_wrapper.step) diff --git a/src/cqrs/saga/fallback.py b/src/cqrs/saga/fallback.py index 03ab78d..d51909d 100644 --- a/src/cqrs/saga/fallback.py +++ b/src/cqrs/saga/fallback.py @@ -1,6 +1,7 @@ """Fallback wrapper for Saga steps to handle failures gracefully.""" import dataclasses +import typing from cqrs.circuit_breaker import ICircuitBreaker from cqrs.saga.step import SagaStepHandler @@ -32,4 +33,4 @@ class Fallback: step: type[SagaStepHandler] fallback: type[SagaStepHandler] failure_exceptions: tuple[type[Exception], ...] = () - circuit_breaker: ICircuitBreaker | None = None + circuit_breaker: typing.Optional[ICircuitBreaker] = None diff --git a/src/cqrs/saga/mermaid.py b/src/cqrs/saga/mermaid.py index a2e745a..419355e 100644 --- a/src/cqrs/saga/mermaid.py +++ b/src/cqrs/saga/mermaid.py @@ -217,7 +217,14 @@ def class_diagram(self) -> str: context_types: set[type] = set() response_types: set[type] = set() event_types: set[type] = set() - step_info: list[tuple[str, type | None, type | None, list[type]]] = [] + step_info: list[ + tuple[ + str, + typing.Optional[type], + typing.Optional[type], + list[type], + ] + ] = [] # Extract type information from each step for step_item in steps: @@ -228,8 +235,8 @@ def class_diagram(self) -> str: # Process primary step primary_name = primary_step.__name__ - primary_context_type: type | None = None - primary_response_type: type | None = None + primary_context_type: typing.Optional[type] = None + primary_response_type: typing.Optional[type] = None primary_events: list[type] = [] # Extract generic type parameters from primary step @@ -250,8 +257,8 @@ def class_diagram(self) -> str: # Process fallback step fallback_name = fallback_step.__name__ - fallback_context_type: type | None = None - fallback_response_type: type | None = None + fallback_context_type: typing.Optional[type] = None + fallback_response_type: typing.Optional[type] = None fallback_events: list[type] = [] # Extract generic type parameters from fallback step @@ -290,8 +297,8 @@ def class_diagram(self) -> str: else: # Regular step step_name = step_item.__name__ - context_type: type | None = None - response_type: type | None = None + context_type: typing.Optional[type] = None + response_type: typing.Optional[type] = None step_events: list[type] = [] # Extract generic type parameters from __orig_bases__ diff --git a/src/cqrs/saga/saga.py b/src/cqrs/saga/saga.py index db5a3b1..82b0ddb 100644 --- a/src/cqrs/saga/saga.py +++ b/src/cqrs/saga/saga.py @@ -74,7 +74,7 @@ def __init__( context: ContextT, container: Container, storage: ISagaStorage, - saga_id: uuid.UUID | None = None, + saga_id: typing.Optional[uuid.UUID] = None, compensation_retry_count: int = 3, compensation_retry_delay: float = 1.0, compensation_retry_backoff: float = 2.0, @@ -84,7 +84,7 @@ def __init__( self._container = container self._storage = storage self._completed_steps: list[SagaStepHandler[ContextT, typing.Any]] = [] - self._error: BaseException | None = None + self._error: typing.Optional[BaseException] = None self._compensated: bool = False self._comp_retry_count = compensation_retry_count self._comp_retry_delay = compensation_retry_delay @@ -140,9 +140,9 @@ async def __aenter__(self) -> "SagaTransaction[ContextT]": async def __aexit__( self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: types.TracebackType | None, + exc_type: typing.Optional[type[BaseException]], + exc_val: typing.Optional[BaseException], + exc_tb: typing.Optional[types.TracebackType], ) -> bool: # If an exception occurred, compensate all completed steps. # Do not compensate on GeneratorExit: consumer stopped iteration intentionally @@ -222,13 +222,13 @@ def _build_run_scoped_components( async def _execute( self, - run: SagaStorageRun | None, + run: typing.Optional[SagaStorageRun], ) -> typing.AsyncIterator[SagaStepResult[ContextT, typing.Any]]: """ Execute the saga's configured steps, using the provided storage run for checkpointed operations when available, and perform recovery and compensation as required. Parameters: - run (SagaStorageRun | None): Optional per-saga storage run. When provided, the run is used for loading saga state, creating run-scoped managers/executors, and committing at checkpoint boundaries. When None, the transaction's internal managers and executors are used. + run (typing.Optional[SagaStorageRun ]): Optional per-saga storage run. When provided, the run is used for loading saga state, creating run-scoped managers/executors, and committing at checkpoint boundaries. When None, the transaction's internal managers and executors are used. Returns: Async iterator that yields SagaStepResult values for each step that completes; each yielded result will include the transaction's saga_id. @@ -326,14 +326,12 @@ async def _execute( try: for step_item in self._saga.steps: if isinstance(step_item, Fallback): - ( - step_result, - executed_step, - ) = await fallback_executor.execute_fallback_step( + fallback_result = await fallback_executor.execute_fallback_step( step_item, completed_step_names, ) - if step_result is not None and executed_step is not None: + if fallback_result is not None: + step_result, executed_step = fallback_result self._completed_steps.append(executed_step) if run is not None: await run.commit() @@ -341,7 +339,7 @@ async def _execute( step_result, saga_id=self._saga_id, ) - elif executed_step is None: + else: primary_name = step_item.step.__name__ fallback_name = step_item.fallback.__name__ if primary_name in completed_step_names: @@ -439,7 +437,7 @@ class OrderSaga(Saga[OrderContext]): they handle the correct context type. """ - steps: typing.ClassVar[list[type[SagaStepHandler] | Fallback]] = [] + steps: typing.ClassVar[list[typing.Union[type[SagaStepHandler], Fallback]]] = [] def __init_subclass__(cls, **kwargs: typing.Any) -> None: """Validate steps when subclass is created.""" @@ -481,7 +479,7 @@ def transaction( context: ContextT, container: Container, storage: ISagaStorage, - saga_id: uuid.UUID | None = None, + saga_id: typing.Optional[uuid.UUID] = None, compensation_retry_count: int = 3, compensation_retry_delay: float = 1.0, compensation_retry_backoff: float = 2.0, diff --git a/src/cqrs/saga/step.py b/src/cqrs/saga/step.py index 2495d1d..5a6c21e 100644 --- a/src/cqrs/saga/step.py +++ b/src/cqrs/saga/step.py @@ -9,7 +9,7 @@ from cqrs.response import IResponse from cqrs.saga.models import ContextT -Resp = typing.TypeVar("Resp", bound=IResponse | None, covariant=True) +Resp = typing.TypeVar("Resp", bound=typing.Optional[IResponse], covariant=True) @dataclasses.dataclass(frozen=True) @@ -43,10 +43,10 @@ class SagaStepResult(typing.Generic[ContextT, Resp]): response: Resp step_type: type[SagaStepHandler[ContextT, Resp]] with_error: bool = False - error_message: str | None = None - error_traceback: list[str] | None = None - error_type: typing.Type[Exception] | None = None - saga_id: uuid.UUID | None = None + error_message: typing.Optional[str] = None + error_traceback: typing.Optional[list[str]] = None + error_type: typing.Optional[typing.Type[Exception]] = None + saga_id: typing.Optional[uuid.UUID] = None class SagaStepHandler(abc.ABC, typing.Generic[ContextT, Resp]): @@ -112,11 +112,11 @@ async def compensate(self, context: OrderContext) -> None: def _generate_step_result( self, - response: IResponse | None, + response: typing.Optional[IResponse], with_error: bool = False, - error_message: str | None = None, - error_traceback: list[str] | None = None, - error_type: typing.Type[Exception] | None = None, + error_message: typing.Optional[str] = None, + error_traceback: typing.Optional[list[str]] = None, + error_type: typing.Optional[typing.Type[Exception]] = None, ) -> SagaStepResult[ContextT, Resp]: """ Generate a SagaStepResult with proper typing from the class. diff --git a/src/cqrs/saga/storage/memory.py b/src/cqrs/saga/storage/memory.py index 504f4a6..6773af7 100644 --- a/src/cqrs/saga/storage/memory.py +++ b/src/cqrs/saga/storage/memory.py @@ -47,7 +47,7 @@ async def update_context( self, saga_id: uuid.UUID, context: dict[str, typing.Any], - current_version: int | None = None, + current_version: typing.Optional[int] = None, ) -> None: """ Update the stored context for the given saga. @@ -55,7 +55,7 @@ async def update_context( Parameters: saga_id (uuid.UUID): Identifier of the saga whose context will be updated. context (dict[str, typing.Any]): New context to store for the saga. - current_version (int | None): If provided, require the stored saga version to match this value (optimistic locking). + current_version (typing.Optional[int ]): If provided, require the stored saga version to match this value (optimistic locking). Raises: ValueError: If the saga_id does not exist. @@ -86,7 +86,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: """ Log a step entry for the given saga into the underlying storage. @@ -96,7 +96,7 @@ async def log_step( step_name (str): Name of the saga step. action (Literal["act", "compensate"]): Whether the step is a forward action ("act") or a compensation ("compensate"). status (SagaStepStatus): Outcome status of the step. - details (str | None): Optional free-form details or metadata about the step. + details (typing.Optional[str ]): Optional free-form details or metadata about the step. """ await self._storage.log_step( saga_id, @@ -225,7 +225,7 @@ async def update_context( self, saga_id: uuid.UUID, context: dict[str, typing.Any], - current_version: int | None = None, + current_version: typing.Optional[int] = None, ) -> None: if saga_id not in self._sagas: raise ValueError(f"Saga {saga_id} not found") @@ -260,7 +260,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: if saga_id not in self._sagas: raise ValueError(f"Saga {saga_id} not found") @@ -301,8 +301,8 @@ async def get_sagas_for_recovery( self, limit: int, max_recovery_attempts: int = 5, - stale_after_seconds: int | None = None, - saga_name: str | None = None, + stale_after_seconds: typing.Optional[int] = None, + saga_name: typing.Optional[str] = None, ) -> list[uuid.UUID]: """ Selects saga IDs eligible for recovery based on status, recovery attempts, staleness, and an optional name filter. @@ -310,8 +310,8 @@ async def get_sagas_for_recovery( Parameters: limit (int): Maximum number of saga IDs to return. max_recovery_attempts (int): Upper bound (exclusive) on recovery attempts; only sagas with fewer attempts are considered. - stale_after_seconds (int | None): If provided, only sagas last updated earlier than this many seconds before now are considered; if None, staleness is ignored. - saga_name (str | None): If provided, only sagas with this name are considered; if None, name is not filtered. + stale_after_seconds (typing.Optional[int ]): If provided, only sagas last updated earlier than this many seconds before now are considered; if None, staleness is ignored. + saga_name (typing.Optional[str ]): If provided, only sagas with this name are considered; if None, name is not filtered. Returns: list[uuid.UUID]: Up to `limit` saga IDs sorted by oldest `updated_at` first that match the recovery criteria. @@ -333,7 +333,7 @@ async def get_sagas_for_recovery( async def increment_recovery_attempts( self, saga_id: uuid.UUID, - new_status: SagaStatus | None = None, + new_status: typing.Optional[SagaStatus] = None, ) -> None: if saga_id not in self._sagas: raise ValueError(f"Saga {saga_id} not found") diff --git a/src/cqrs/saga/storage/models.py b/src/cqrs/saga/storage/models.py index 6e6baf8..b27f5d6 100644 --- a/src/cqrs/saga/storage/models.py +++ b/src/cqrs/saga/storage/models.py @@ -18,7 +18,7 @@ def __init__( action: typing.Literal["act", "compensate"], status: SagaStepStatus, timestamp: datetime.datetime, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: self.saga_id = saga_id self.step_name = step_name diff --git a/src/cqrs/saga/storage/protocol.py b/src/cqrs/saga/storage/protocol.py index 27fcd88..c4d4300 100644 --- a/src/cqrs/saga/storage/protocol.py +++ b/src/cqrs/saga/storage/protocol.py @@ -33,7 +33,7 @@ async def update_context( self, saga_id: uuid.UUID, context: dict[str, typing.Any], - current_version: int | None = None, + current_version: typing.Optional[int] = None, ) -> None: """ Persist a snapshot of the saga's execution context, optionally using optimistic locking. @@ -41,7 +41,7 @@ async def update_context( Parameters: saga_id (uuid.UUID): Identifier of the saga to update. context (dict[str, Any]): JSON-serializable context object to store as the new snapshot. - current_version (int | None): If provided, perform an optimistic-locking update that succeeds only + current_version (typing.Optional[int ]): If provided, perform an optimistic-locking update that succeeds only if the stored version matches this value; on success the stored version is incremented. Raises: @@ -70,7 +70,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: """ Append a step transition to the saga's execution log. @@ -80,7 +80,7 @@ async def log_step( step_name (str): Logical name of the step (used for diagnostics and replay). action (Literal["act", "compensate"]): Whether this entry records the primary action ("act") or its compensating action ("compensate"). status (SagaStepStatus): The step transition status to record (e.g., started, completed, failed, compensated). - details (str | None): Optional human-readable details or diagnostics about the transition. + details (typing.Optional[str ]): Optional human-readable details or diagnostics about the transition. """ async def load_saga_state( @@ -164,7 +164,7 @@ async def update_context( self, saga_id: uuid.UUID, context: dict[str, typing.Any], - current_version: int | None = None, + current_version: typing.Optional[int] = None, ) -> None: """Save saga context snapshot (e.g. after a step completes). @@ -209,7 +209,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: """Append a step transition to the saga log. @@ -271,8 +271,8 @@ async def get_sagas_for_recovery( self, limit: int, max_recovery_attempts: int = 5, - stale_after_seconds: int | None = None, - saga_name: str | None = None, + stale_after_seconds: typing.Optional[int] = None, + saga_name: typing.Optional[str] = None, ) -> list[uuid.UUID]: """Return saga IDs that are candidates for recovery. @@ -307,7 +307,7 @@ async def get_sagas_for_recovery( async def increment_recovery_attempts( self, saga_id: uuid.UUID, - new_status: SagaStatus | None = None, + new_status: typing.Optional[SagaStatus] = None, ) -> None: """Increment recovery attempt counter after a failed recovery run. diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index e8128bd..584fc89 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -182,7 +182,7 @@ async def update_context( self, saga_id: uuid.UUID, context: dict[str, typing.Any], - current_version: int | None = None, + current_version: typing.Optional[int] = None, ) -> None: """ Update the stored context for a saga and increment its version, optionally enforcing an optimistic version check. @@ -190,7 +190,7 @@ async def update_context( Parameters: saga_id (uuid.UUID): Identifier of the saga to update. context (dict[str, typing.Any]): New serialized saga context to persist. - current_version (int | None): If provided, require the saga's current version to match this value before updating. + current_version (typing.Optional[int ]): If provided, require the saga's current version to match this value before updating. Raises: SagaConcurrencyError: If an optimistic version check fails (indicating a concurrent modification) or if the saga does not exist when a version was supplied. @@ -256,7 +256,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: """ Record a saga step event by creating and staging a log entry in the active session. @@ -266,7 +266,7 @@ async def log_step( step_name (str): Name of the step being recorded. action (Literal["act", "compensate"]): The performed action: "act" for normal action or "compensate" for compensation. status (SagaStepStatus): The step's outcome status. - details (str | None): Optional free-form details or error message associated with the step. + details (typing.Optional[str ]): Optional free-form details or error message associated with the step. """ log_entry = SagaLogModel( saga_id=saga_id, @@ -343,7 +343,7 @@ async def get_step_history( if row.created_at.tzinfo is None else row.created_at, ), - details=typing.cast(str | None, row.details), + details=typing.cast(typing.Optional[str], row.details), ) for row in rows ] @@ -438,7 +438,7 @@ async def update_context( self, saga_id: uuid.UUID, context: dict[str, typing.Any], - current_version: int | None = None, + current_version: typing.Optional[int] = None, ) -> None: async with self.session_factory() as session: try: @@ -508,7 +508,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: async with self.session_factory() as session: try: @@ -576,7 +576,7 @@ async def get_step_history( if row.created_at.tzinfo is None else row.created_at, ), - details=typing.cast(str | None, row.details), + details=typing.cast(typing.Optional[str], row.details), ) for row in rows ] @@ -585,8 +585,8 @@ async def get_sagas_for_recovery( self, limit: int, max_recovery_attempts: int = 5, - stale_after_seconds: int | None = None, - saga_name: str | None = None, + stale_after_seconds: typing.Optional[int] = None, + saga_name: typing.Optional[str] = None, ) -> list[uuid.UUID]: recoverable = ( SagaStatus.RUNNING, @@ -615,14 +615,14 @@ async def get_sagas_for_recovery( async def increment_recovery_attempts( self, saga_id: uuid.UUID, - new_status: SagaStatus | None = None, + new_status: typing.Optional[SagaStatus] = None, ) -> None: """ Increment the recovery attempts counter for the given saga execution and optionally update its status. Parameters: saga_id (uuid.UUID): Identifier of the saga execution to update. - new_status (SagaStatus | None): If provided, set the saga's status to this value. + new_status (typing.Optional[SagaStatus ]): If provided, set the saga's status to this value. Raises: ValueError: If no saga execution exists with the given `saga_id`. diff --git a/src/cqrs/saga/validation.py b/src/cqrs/saga/validation.py index 688af2f..d00b5ff 100644 --- a/src/cqrs/saga/validation.py +++ b/src/cqrs/saga/validation.py @@ -14,7 +14,7 @@ class SagaContextTypeExtractor: """Extracts context type from Generic type parameters.""" @staticmethod - def extract_from_class(klass: type, saga_base_class: type) -> type | None: + def extract_from_class(klass: type, saga_base_class: type) -> typing.Optional[type]: """ Extract context type from a class that inherits from a Generic base. @@ -48,7 +48,7 @@ def extract_from_class(klass: type, saga_base_class: type) -> type | None: return None @staticmethod - def extract_from_step(step_type: type) -> type | None: + def extract_from_step(step_type: type) -> typing.Optional[type]: """ Extract context type from a SagaStepHandler class. @@ -162,7 +162,7 @@ class SagaStepValidator: def __init__( self, saga_name: str, - context_type: type | None = None, + context_type: typing.Optional[type] = None, ) -> None: """ Initialize validator. @@ -178,7 +178,7 @@ def __init__( def validate_steps( self, - steps: list[type[SagaStepHandler] | Fallback], + steps: list[typing.Union[type[SagaStepHandler], Fallback]], ) -> None: """ Validate saga steps. diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py index 603269c..331bf60 100644 --- a/tests/benchmarks/conftest.py +++ b/tests/benchmarks/conftest.py @@ -5,6 +5,7 @@ import asyncio import contextlib import os +import typing import pytest from sqlalchemy.ext.asyncio import create_async_engine @@ -35,13 +36,13 @@ def create_run( @pytest.fixture(scope="session") -def database_dsn() -> str | None: +def database_dsn() -> typing.Optional[str]: """DATABASE_DSN from environment (set in CI by pytest-config.ini).""" return os.environ.get("DATABASE_DSN") or None @pytest.fixture(scope="session") -def saga_benchmark_loop_and_engine(database_dsn: str | None): +def saga_benchmark_loop_and_engine(database_dsn: typing.Optional[str]): """ One event loop and one async engine for the whole benchmark session. Used by saga SQLAlchemy benchmarks so connection setup/teardown is not measured. diff --git a/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py b/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py index 82dc2af..6959dc8 100644 --- a/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py +++ b/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py @@ -24,12 +24,12 @@ class TResult(cqrs.DCResponse): message: str = "" -class HandlerA(CORRequestHandler[TRequest, TResult | None]): +class HandlerA(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: if request.method == "method_a": return TResult( success=True, @@ -39,12 +39,12 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class HandlerB(CORRequestHandler[TRequest, TResult | None]): +class HandlerB(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: if request.method == "method_b": return TResult( success=True, @@ -54,12 +54,12 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class HandlerC(CORRequestHandler[TRequest, TResult | None]): +class HandlerC(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: if request.method == "method_c": return TResult( success=True, @@ -69,12 +69,12 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class DefaultHandler(CORRequestHandler[TRequest, TResult | None]): +class DefaultHandler(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: return TResult( success=False, handler_name="DefaultHandler", diff --git a/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py b/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py index feb6dbc..ab5bcec 100644 --- a/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py +++ b/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py @@ -2,6 +2,7 @@ import asyncio import dataclasses +import typing import pytest @@ -63,7 +64,7 @@ async def handle(self, event: _EventL3) -> None: class _ChainContainer(Container[object]): def __init__(self) -> None: - self._external: object | None = None + self._external: typing.Optional[object] = None @property def external_container(self) -> object: diff --git a/tests/benchmarks/default/test_benchmark_cor_request_handler.py b/tests/benchmarks/default/test_benchmark_cor_request_handler.py index 08963be..1ee56fc 100644 --- a/tests/benchmarks/default/test_benchmark_cor_request_handler.py +++ b/tests/benchmarks/default/test_benchmark_cor_request_handler.py @@ -21,12 +21,12 @@ class TResult(cqrs.Response): message: str = "" -class HandlerA(CORRequestHandler[TRequest, TResult | None]): +class HandlerA(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: if request.method == "method_a": return TResult( success=True, @@ -36,12 +36,12 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class HandlerB(CORRequestHandler[TRequest, TResult | None]): +class HandlerB(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: if request.method == "method_b": return TResult( success=True, @@ -51,12 +51,12 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class HandlerC(CORRequestHandler[TRequest, TResult | None]): +class HandlerC(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: if request.method == "method_c": return TResult( success=True, @@ -66,12 +66,12 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class DefaultHandler(CORRequestHandler[TRequest, TResult | None]): +class DefaultHandler(CORRequestHandler[TRequest, TResult]): @property def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: return TResult( success=False, handler_name="DefaultHandler", diff --git a/tests/benchmarks/default/test_benchmark_event_handler_chain.py b/tests/benchmarks/default/test_benchmark_event_handler_chain.py index a335f54..924fdb7 100644 --- a/tests/benchmarks/default/test_benchmark_event_handler_chain.py +++ b/tests/benchmarks/default/test_benchmark_event_handler_chain.py @@ -1,6 +1,7 @@ """Benchmarks: 3-level event chain, volume >> semaphore (parallel follow-ups).""" import asyncio +import typing import pydantic import pytest @@ -66,7 +67,7 @@ def __init__(self) -> None: self._h1 = _HandlerL1() self._h2 = _HandlerL2() self._h3 = _HandlerL3() - self._external: object | None = None + self._external: typing.Optional[object] = None @property def external_container(self) -> object: diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py index cbb1f5b..067f976 100644 --- a/tests/integration/fixtures.py +++ b/tests/integration/fixtures.py @@ -1,4 +1,3 @@ -import contextlib import functools import os @@ -39,9 +38,11 @@ async def session(init_orm): DATABASE_DSN, isolation_level="REPEATABLE READ", ) - session = async_sessionmaker(engine_factory())() - async with contextlib.aclosing(session): - yield session + db_session = async_sessionmaker(engine_factory())() + try: + yield db_session + finally: + await db_session.close() # --- Saga storage: MySQL (отдельные фикстуры, поднимают схему и всё необходимое) --- diff --git a/tests/integration/test_decompression.py b/tests/integration/test_decompression.py index 3501253..0da11fd 100644 --- a/tests/integration/test_decompression.py +++ b/tests/integration/test_decompression.py @@ -18,7 +18,7 @@ async def test_decompression_positive(session): repository.add(event) await session.commit() - read_event: outbox_repository.OutboxedEvent | None = next( + read_event: typing.Optional[outbox_repository.OutboxedEvent] = next( iter( await repository.get_many( batch_size=1, diff --git a/tests/integration/test_event_outbox.py b/tests/integration/test_event_outbox.py index 40b11e7..f9f6047 100644 --- a/tests/integration/test_event_outbox.py +++ b/tests/integration/test_event_outbox.py @@ -128,7 +128,7 @@ async def test_get_new_event_positive(self, session): await OutboxRequestHandler(repository).handle(request) [event_over_get_all_events_method] = await repository.get_many(1) - event: outbox_repository.OutboxedEvent | None = next( + event: typing.Optional[outbox_repository.OutboxedEvent] = next( iter( await repository.get_many( batch_size=1, diff --git a/tests/integration/test_kafka_producer.py b/tests/integration/test_kafka_producer.py index b4e9d19..fd5dd1f 100644 --- a/tests/integration/test_kafka_producer.py +++ b/tests/integration/test_kafka_producer.py @@ -71,7 +71,7 @@ async def test_produce_some_event( mediator: cqrs.RequestMediator, kafka_producer, ) -> None: - handler: CloseMeetingRoomCommandHandler | None = await MockContainer().resolve( + handler: typing.Optional[CloseMeetingRoomCommandHandler] = await MockContainer().resolve( CloseMeetingRoomCommandHandler, ) # noqa command = CloseMeetingRoomCommand(meeting_room_id=uuid.uuid4()) diff --git a/tests/integration/test_saga_mediator_memory.py b/tests/integration/test_saga_mediator_memory.py index b64139d..d3b1773 100644 --- a/tests/integration/test_saga_mediator_memory.py +++ b/tests/integration/test_saga_mediator_memory.py @@ -27,9 +27,9 @@ class OrderContext(SagaContext): order_id: str user_id: str amount: float - inventory_id: str | None = None - payment_id: str | None = None - shipment_id: str | None = None + inventory_id: typing.Optional[str] = None + payment_id: typing.Optional[str] = None + shipment_id: typing.Optional[str] = None class ReserveInventoryResponse(Response): diff --git a/tests/integration/test_streaming_mediator.py b/tests/integration/test_streaming_mediator.py index 66ecb6f..bdd2532 100644 --- a/tests/integration/test_streaming_mediator.py +++ b/tests/integration/test_streaming_mediator.py @@ -100,7 +100,7 @@ async def test_streaming_mediator_integration( kafka_producer, ) -> None: mediator, container = streaming_mediator - handler: ProcessItemsCommandHandler | None = await container.resolve( + handler: typing.Optional[ProcessItemsCommandHandler] = await container.resolve( ProcessItemsCommandHandler, ) @@ -134,7 +134,7 @@ async def test_streaming_mediator_events_emitted_after_each_yield( kafka_producer, ) -> None: mediator, container = streaming_mediator - _: ProcessItemsCommandHandler | None = await container.resolve( + _: typing.Optional[ProcessItemsCommandHandler] = await container.resolve( ProcessItemsCommandHandler, ) @@ -162,7 +162,7 @@ async def test_streaming_mediator_empty_items_list( kafka_producer, ) -> None: mediator, container = streaming_mediator - handler: ProcessItemsCommandHandler | None = await container.resolve( + handler: typing.Optional[ProcessItemsCommandHandler] = await container.resolve( ProcessItemsCommandHandler, ) @@ -183,7 +183,7 @@ async def test_streaming_mediator_single_item( kafka_producer, ) -> None: mediator, container = streaming_mediator - handler: ProcessItemsCommandHandler | None = await container.resolve( + handler: typing.Optional[ProcessItemsCommandHandler] = await container.resolve( ProcessItemsCommandHandler, ) diff --git a/tests/unit/test_cor_mermaid.py b/tests/unit/test_cor_mermaid.py index 2906629..27befe5 100644 --- a/tests/unit/test_cor_mermaid.py +++ b/tests/unit/test_cor_mermaid.py @@ -15,7 +15,7 @@ class ProcessPaymentCommand(cqrs.Request): class PaymentResult(cqrs.Response): success: bool - transaction_id: str | None = None + transaction_id: typing.Optional[str] = None message: str = "" @@ -24,7 +24,7 @@ class CreditCardHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResult]) def events(self) -> typing.List: return [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: if request.payment_method == "credit_card": return PaymentResult( success=True, @@ -39,7 +39,7 @@ class PayPalHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResult]): def events(self) -> typing.List: return [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: if request.payment_method == "paypal": return PaymentResult( success=True, @@ -54,7 +54,7 @@ class BankTransferHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResult def events(self) -> typing.List: return [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: if request.payment_method == "bank_transfer": return PaymentResult( success=True, @@ -69,7 +69,7 @@ class DefaultPaymentHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResu def events(self) -> typing.List: return [] - async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None: + async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]: return PaymentResult( success=False, message=f"Unsupported payment method: {request.payment_method}", @@ -179,8 +179,8 @@ def test_class_diagram_basic_structure() -> None: assert "classDiagram" in diagram assert "class CORRequestHandler" in diagram assert "<>" in diagram - assert "+handle(request) Response | None" in diagram - assert "+next(request) Response | None" in diagram + assert "+handle(request) typing.Optional[Response ]" in diagram + assert "+next(request) typing.Optional[Response ]" in diagram assert "+set_next(handler) CORRequestHandler" in diagram # Check handler classes @@ -189,8 +189,8 @@ def test_class_diagram_basic_structure() -> None: assert "class BankTransferHandler" in diagram # Check handler methods - assert "+handle(request) Response | None" in diagram - assert "+next(request) Response | None" in diagram + assert "+handle(request) typing.Optional[Response ]" in diagram + assert "+next(request) typing.Optional[Response ]" in diagram assert "+events: List[Event]" in diagram # Check types are included diff --git a/tests/unit/test_cor_request_handler.py b/tests/unit/test_cor_request_handler.py index da180fa..abd1c95 100644 --- a/tests/unit/test_cor_request_handler.py +++ b/tests/unit/test_cor_request_handler.py @@ -18,7 +18,7 @@ class TResult(cqrs.Response): message: str = "" -class TestHandlerA(CORRequestHandler[TRequest, TResult | None]): +class TestHandlerA(CORRequestHandler[TRequest, TResult]): """Test handler that processes method_a and tracks calls.""" call_count: int = 0 @@ -27,7 +27,7 @@ class TestHandlerA(CORRequestHandler[TRequest, TResult | None]): def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: TestHandlerA.call_count += 1 if request.method == "method_a": @@ -40,7 +40,7 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class TestHandlerB(CORRequestHandler[TRequest, TResult | None]): +class TestHandlerB(CORRequestHandler[TRequest, TResult]): """Test handler that processes method_b and tracks calls.""" call_count: int = 0 @@ -49,7 +49,7 @@ class TestHandlerB(CORRequestHandler[TRequest, TResult | None]): def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: TestHandlerB.call_count += 1 if request.method == "method_b": @@ -62,7 +62,7 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class TestHandlerC(CORRequestHandler[TRequest, TResult | None]): +class TestHandlerC(CORRequestHandler[TRequest, TResult]): """Test handler that processes method_c and tracks calls.""" call_count: int = 0 @@ -71,7 +71,7 @@ class TestHandlerC(CORRequestHandler[TRequest, TResult | None]): def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: TestHandlerC.call_count += 1 if request.method == "method_c": @@ -84,7 +84,7 @@ async def handle(self, request: TRequest) -> TResult | None: return await self.next(request) -class DefaultTestHandler(CORRequestHandler[TRequest, TResult | None]): +class DefaultTestHandler(CORRequestHandler[TRequest, TResult]): """Default handler that always handles the request (end of chain).""" call_count: int = 0 @@ -93,7 +93,7 @@ class DefaultTestHandler(CORRequestHandler[TRequest, TResult | None]): def events(self) -> typing.Sequence[cqrs.IEvent]: return [] - async def handle(self, request: TRequest) -> TResult | None: + async def handle(self, request: TRequest) -> typing.Optional[TResult]: DefaultTestHandler.call_count += 1 return TResult( success=False, diff --git a/tests/unit/test_dispatcher.py b/tests/unit/test_dispatcher.py index 084c484..abe7ca4 100644 --- a/tests/unit/test_dispatcher.py +++ b/tests/unit/test_dispatcher.py @@ -1,4 +1,5 @@ from uuid import UUID, uuid4 +import typing import pydantic @@ -13,12 +14,12 @@ class ReadMeetingDetailsQuery(Request): meeting_room_id: UUID = pydantic.Field() - status: str | None = pydantic.Field(default=None) + status: typing.Optional[str] = pydantic.Field(default=None) class ReadMeetingDetailsQueryResult(Response): meeting_room_id: UUID = pydantic.Field() - status: str | None = pydantic.Field(default=None) + status: typing.Optional[str] = pydantic.Field(default=None) class ReadMeetingDetailsQueryHandler( diff --git a/tests/unit/test_event_processor.py b/tests/unit/test_event_processor.py index dc54117..197bfeb 100644 --- a/tests/unit/test_event_processor.py +++ b/tests/unit/test_event_processor.py @@ -1,4 +1,5 @@ import asyncio +import typing from unittest import mock import pydantic @@ -279,7 +280,7 @@ def events(self) -> tuple[_ChainedEvent, ...]: return (_ChainedEvent(level=self._last.level + 1, seq=self._last.seq),) def __init__(self) -> None: - self._last: _ChainedEvent | None = None + self._last: typing.Optional[_ChainedEvent] = None async def handle(self, event: _ChainedEvent) -> None: self._last = event diff --git a/tests/unit/test_multi_level_events_parameterized.py b/tests/unit/test_multi_level_events_parameterized.py index 85fc31a..028fbf8 100644 --- a/tests/unit/test_multi_level_events_parameterized.py +++ b/tests/unit/test_multi_level_events_parameterized.py @@ -51,7 +51,7 @@ async def handle(self, event: _FanEvent) -> None: class _FanContainer(Container[object]): def __init__(self, handler: _FanHandler) -> None: self._handler = handler - self._external: object | None = None + self._external: typing.Optional[object] = None @property def external_container(self) -> object: @@ -153,7 +153,7 @@ def __init__(self) -> None: self._h1 = _HandlerL1() self._h2 = _HandlerL2() self._h3 = _HandlerL3() - self._external: object | None = None + self._external: typing.Optional[object] = None @property def external_container(self) -> object: diff --git a/tests/unit/test_request_response.py b/tests/unit/test_request_response.py index 6ae3a65..1fb4351 100644 --- a/tests/unit/test_request_response.py +++ b/tests/unit/test_request_response.py @@ -1,4 +1,5 @@ from uuid import UUID, uuid4 +import typing import pydantic import pytest @@ -62,7 +63,7 @@ class TestContainer: async def resolve( self, type_, - ) -> CloseMeetingRoomCommandHandler | ReadMeetingDetailsQueryHandler: + ) -> typing.Union[CloseMeetingRoomCommandHandler, ReadMeetingDetailsQueryHandler]: if type_ is CloseMeetingRoomCommandHandler: return self.close_meeting_room_command_handler elif type_ is ReadMeetingDetailsQueryHandler: diff --git a/tests/unit/test_saga/conftest.py b/tests/unit/test_saga/conftest.py index 0bc055d..1d2e827 100644 --- a/tests/unit/test_saga/conftest.py +++ b/tests/unit/test_saga/conftest.py @@ -42,7 +42,7 @@ def __init__(self) -> None: self._events: list[Event] = [] self.act_called = False self.compensate_called = False - self._inventory_id: str | None = None + self._inventory_id: typing.Optional[str] = None @property def events(self) -> list[Event]: @@ -70,7 +70,7 @@ def __init__(self) -> None: self._events: list[Event] = [] self.act_called = False self.compensate_called = False - self._payment_id: str | None = None + self._payment_id: typing.Optional[str] = None @property def events(self) -> list[Event]: @@ -95,7 +95,7 @@ def __init__(self) -> None: self._events: list[Event] = [] self.act_called = False self.compensate_called = False - self._shipment_id: str | None = None + self._shipment_id: typing.Optional[str] = None @property def events(self) -> list[Event]: diff --git a/tests/unit/test_saga/test_fallback.py b/tests/unit/test_saga/test_fallback.py index 81c1075..e46307a 100644 --- a/tests/unit/test_saga/test_fallback.py +++ b/tests/unit/test_saga/test_fallback.py @@ -53,7 +53,7 @@ def __init__(self) -> None: self._events: list[Event] = [] self.act_called = False self.compensate_called = False - self._inventory_id: str | None = None + self._inventory_id: typing.Optional[str] = None @property def events(self) -> list[Event]: @@ -491,7 +491,7 @@ class SuccessfulStep(SagaStepHandler[OrderContext, ReserveInventoryResponse]): def __init__(self) -> None: self._events: list[Event] = [] self.act_called = False - self._inventory_id: str | None = None + self._inventory_id: typing.Optional[str] = None @property def events(self) -> list[Event]: @@ -585,7 +585,7 @@ class ContextVerifyingFallbackStep( def __init__(self) -> None: self._events: list[Event] = [] self.act_called = False - self._inventory_id: str | None = None + self._inventory_id: typing.Optional[str] = None @property def events(self) -> list[Event]: @@ -647,7 +647,7 @@ def __init__(self) -> None: self._events: list[Event] = [] self.act_called = False self.compensate_called = False - self._inventory_id: str | None = None + self._inventory_id: typing.Optional[str] = None @property def events(self) -> list[Event]: diff --git a/tests/unit/test_saga/test_saga_storage_run.py b/tests/unit/test_saga/test_saga_storage_run.py index d25ebda..dcc922b 100644 --- a/tests/unit/test_saga/test_saga_storage_run.py +++ b/tests/unit/test_saga/test_saga_storage_run.py @@ -42,7 +42,7 @@ async def update_context( self, saga_id: uuid.UUID, context: dict, - current_version: int | None = None, + current_version: typing.Optional[int] = None, ) -> None: """ Update the stored context for a saga, optionally validating the expected current version. @@ -50,7 +50,7 @@ async def update_context( Parameters: saga_id (uuid.UUID): Identifier of the saga whose context will be updated. context (dict): New context data to persist for the saga. - current_version (int | None): If provided, the update will only proceed when the stored version equals this value; pass None to skip version validation. + current_version (typing.Optional[int ]): If provided, the update will only proceed when the stored version equals this value; pass None to skip version validation. """ await self._inner.update_context(saga_id, context, current_version) @@ -70,7 +70,7 @@ async def log_step( step_name: str, action: typing.Literal["act", "compensate"], status: SagaStepStatus, - details: str | None = None, + details: typing.Optional[str] = None, ) -> None: """ Record the execution or compensation outcome of a saga step. @@ -80,7 +80,7 @@ async def log_step( step_name (str): Name of the step being logged. action (Literal["act", "compensate"]): Whether this log entry is for the step's normal action ("act") or its compensation ("compensate"). status (SagaStepStatus): Resulting status of the step. - details (str | None): Optional human-readable details or metadata about the step event. + details (typing.Optional[str ]): Optional human-readable details or metadata about the step event. """ await self._inner.log_step(saga_id, step_name, action, status, details) @@ -121,8 +121,8 @@ async def get_sagas_for_recovery( self, limit: int, max_recovery_attempts: int = 5, - stale_after_seconds: int | None = None, - saga_name: str | None = None, + stale_after_seconds: typing.Optional[int] = None, + saga_name: typing.Optional[str] = None, ) -> list[uuid.UUID]: """ Selects saga IDs that are eligible for recovery. @@ -130,8 +130,8 @@ async def get_sagas_for_recovery( Parameters: limit (int): Maximum number of saga IDs to return. max_recovery_attempts (int): Only include sagas with fewer than this many recovery attempts. - stale_after_seconds (int | None): If provided, only include sagas last updated more than this many seconds ago; if None, do not filter by staleness. - saga_name (str | None): If provided, restrict results to sagas with this name. + stale_after_seconds (typing.Optional[int ]): If provided, only include sagas last updated more than this many seconds ago; if None, do not filter by staleness. + saga_name (typing.Optional[str ]): If provided, restrict results to sagas with this name. Returns: list[uuid.UUID]: Saga UUIDs that match the recovery criteria, up to `limit`. @@ -146,14 +146,14 @@ async def get_sagas_for_recovery( async def increment_recovery_attempts( self, saga_id: uuid.UUID, - new_status: SagaStatus | None = None, + new_status: typing.Optional[SagaStatus] = None, ) -> None: """ Increment the recovery-attempts counter for a saga and optionally update its status. Parameters: saga_id (uuid.UUID): Identifier of the saga whose recovery attempts should be incremented. - new_status (SagaStatus | None): If provided, update the saga's status to this value after incrementing attempts; otherwise leave status unchanged. + new_status (typing.Optional[SagaStatus ]): If provided, update the saga's status to this value after incrementing attempts; otherwise leave status unchanged. """ await self._inner.increment_recovery_attempts(saga_id, new_status) @@ -270,7 +270,7 @@ class SagaWithFailure(Saga[OrderContext]): storage = MemorySagaStorage() saga = SagaWithFailure() context = OrderContext(order_id="o3", user_id="u3", amount=70.0) - saga_id: uuid.UUID | None = None + saga_id: typing.Optional[uuid.UUID] = None with pytest.raises(ValueError, match="Step failed"): async with saga.transaction( context=context, diff --git a/tests/unit/test_saga/test_saga_to_mermaid.py b/tests/unit/test_saga/test_saga_to_mermaid.py index d8f8bab..8e17e0c 100644 --- a/tests/unit/test_saga/test_saga_to_mermaid.py +++ b/tests/unit/test_saga/test_saga_to_mermaid.py @@ -22,7 +22,7 @@ def test_to_mermaid_empty_steps(saga_container: SagaContainer) -> None: """Test that Mermaid handles empty steps list correctly.""" class EmptySaga(Saga[OrderContext]): - steps: typing.ClassVar[list[type[SagaStepHandler] | Fallback]] = [] + steps: typing.ClassVar[list[typing.Union[type[SagaStepHandler], Fallback]]] = [] saga = EmptySaga() generator = SagaMermaid(saga) @@ -239,7 +239,7 @@ def test_class_diagram_empty_steps(saga_container: SagaContainer) -> None: """Test that class_diagram() handles empty steps list correctly.""" class EmptySaga(Saga[OrderContext]): - steps: typing.ClassVar[list[type[SagaStepHandler] | Fallback]] = [] + steps: typing.ClassVar[list[typing.Union[type[SagaStepHandler], Fallback]]] = [] saga = EmptySaga() generator = SagaMermaid(saga) diff --git a/tests/unit/test_streaming_outbox_background_processing.py b/tests/unit/test_streaming_outbox_background_processing.py index 1b07977..65b8689 100644 --- a/tests/unit/test_streaming_outbox_background_processing.py +++ b/tests/unit/test_streaming_outbox_background_processing.py @@ -105,8 +105,8 @@ async def handle( class MockContainer: def __init__( self, - handler: ProcessServiceStreamingHandler | None = None, - event_handler: ServiceChangedEventHandler | None = None, + handler: typing.Optional[ProcessServiceStreamingHandler] = None, + event_handler: typing.Optional[ServiceChangedEventHandler] = None, ) -> None: self._handler = handler self._event_handler = event_handler