diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml
index e34785b..18116e0 100644
--- a/.github/workflows/python-publish.yml
+++ b/.github/workflows/python-publish.yml
@@ -13,12 +13,12 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- python-version: [ "3.10", "3.11", "3.12" ]
+ python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13", "3.14" ]
steps:
- uses: actions/checkout@v4
- name: Set up Python
- uses: actions/setup-python@v3
+ uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
@@ -28,6 +28,6 @@ jobs:
- name: Build package
run: python -m build
- name: Publish package
- if: success() && github.event_name == 'release' && matrix.python-version == '3.12'
+ if: success() && github.event_name == 'release' && matrix.python-version == '3.14'
run: |
twine upload dist/* --username __token__ --password ${{ secrets.PYPI_API_TOKEN }}
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 3f69bfc..0d6a3e5 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -12,7 +12,19 @@ jobs:
strategy:
fail-fast: false
matrix:
- python-version: ["3.10", "3.11", "3.12"]
+ include:
+ - python-version: "3.9"
+ pyright-version: "3.9"
+ - python-version: "3.10"
+ pyright-version: "3.10"
+ - python-version: "3.11"
+ pyright-version: "3.11"
+ - python-version: "3.12"
+ pyright-version: "3.12"
+ - python-version: "3.13"
+ pyright-version: "3.13"
+ - python-version: "3.14"
+ pyright-version: "3.14"
steps:
- uses: actions/checkout@v4
@@ -62,11 +74,11 @@ jobs:
- name: Run pyright
run: |
- pyright --pythonversion ${{ matrix.python-version }} src tests examples
+ pyright --pythonversion ${{ matrix.pyright-version }} src
- name: Check minimum Python version (vermin)
run: |
- vermin --target=3.10- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests
+ vermin --target=3.9- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests
test:
name: test (py ${{ matrix.python-version }})
@@ -74,7 +86,13 @@ jobs:
strategy:
fail-fast: false
matrix:
- python-version: ["3.10", "3.11", "3.12"]
+ include:
+ - python-version: "3.9"
+ - python-version: "3.10"
+ - python-version: "3.11"
+ - python-version: "3.12"
+ - python-version: "3.13"
+ - python-version: "3.14"
steps:
- uses: actions/checkout@v4
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 67e2f15..4039593 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -53,14 +53,14 @@ repos:
hooks:
- id: pytest-unit
name: unit tests
- entry: pytest -c ./tests/pytest-config.ini ./tests/unit
+ entry: env PYTHONPATH=src ./venv/bin/python -m pytest -c ./tests/pytest-config.ini ./tests/unit
language: system
types: [python]
pass_filenames: false
always_run: true
- id: pytest-integration
name: integration tests
- entry: pytest -c ./tests/pytest-config.ini ./tests/integration
+ entry: env PYTHONPATH=src ./venv/bin/python -m pytest -c ./tests/pytest-config.ini ./tests/integration
language: system
types: [python]
pass_filenames: false
diff --git a/README.md b/README.md
index 0e979d0..56a99c3 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
Python CQRS
Event-Driven Architecture Framework for Distributed Systems
- Python 3.10+ · Full documentation: mkdocs.python-cqrs.dev
+ Python 3.9+ · Full documentation: mkdocs.python-cqrs.dev
@@ -88,7 +88,7 @@ project ([documentation](https://akhundmurad.github.io/diator/)) with several en
## Installation
-**Python 3.10+** is required.
+**Python 3.9+** is required. CI runs on Python **3.9-3.14**.
```bash
pip install python-cqrs
diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml
index fdb209c..7b48fc5 100644
--- a/docker-compose-dev.yml
+++ b/docker-compose-dev.yml
@@ -1,4 +1,3 @@
-version: '3'
services:
mysql_dev:
image: mysql:8.3.0
diff --git a/examples/cor_mermaid.py b/examples/cor_mermaid.py
index 5aeb408..2320a08 100644
--- a/examples/cor_mermaid.py
+++ b/examples/cor_mermaid.py
@@ -81,7 +81,7 @@ class PaymentResult(cqrs.Response):
"""Payment processing result."""
success: bool
- transaction_id: str | None = None
+ transaction_id: typing.Optional[str] = None
message: str = ""
@@ -109,7 +109,7 @@ def __init__(self) -> None:
def events(self) -> typing.List[Event]:
return self._events.copy()
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
"""Process credit card payment."""
if request.payment_method == "credit_card":
transaction_id = f"cc_{request.user_id}_{int(request.amount * 100)}"
@@ -143,7 +143,7 @@ def __init__(self) -> None:
def events(self) -> typing.List[Event]:
return self._events.copy()
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
"""Process PayPal payment."""
if request.payment_method == "paypal":
transaction_id = f"pp_{request.user_id}_{int(request.amount * 100)}"
@@ -177,7 +177,7 @@ def __init__(self) -> None:
def events(self) -> typing.List[Event]:
return self._events.copy()
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
"""Process bank transfer payment."""
if request.payment_method == "bank_transfer":
transaction_id = f"bt_{request.user_id}_{int(request.amount * 100)}"
@@ -207,7 +207,7 @@ class DefaultPaymentHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResu
def events(self) -> typing.List[Event]:
return []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
"""Handle unsupported payment methods."""
return PaymentResult(
success=False,
diff --git a/examples/cor_request_fallback.py b/examples/cor_request_fallback.py
index 0a193cc..3fa7d4b 100644
--- a/examples/cor_request_fallback.py
+++ b/examples/cor_request_fallback.py
@@ -55,6 +55,7 @@
import asyncio
import logging
+import typing
import di
from di import dependent
@@ -96,7 +97,7 @@ class SourceAHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]):
def events(self) -> list[cqrs.Event]:
return []
- async def handle(self, request: FetchDataCommand) -> FetchDataResult | None:
+ async def handle(self, request: FetchDataCommand) -> typing.Optional[FetchDataResult]:
if request.source == "a":
logger.info("COR chain: SourceAHandler handled source=a")
HANDLER_SOURCE.append("chain")
@@ -109,7 +110,7 @@ class SourceBHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]):
def events(self) -> list[cqrs.Event]:
return []
- async def handle(self, request: FetchDataCommand) -> FetchDataResult | None:
+ async def handle(self, request: FetchDataCommand) -> typing.Optional[FetchDataResult]:
if request.source == "b":
logger.info("COR chain: SourceBHandler handled source=b")
HANDLER_SOURCE.append("chain")
@@ -124,7 +125,7 @@ class DefaultChainHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]):
def events(self) -> list[cqrs.Event]:
return []
- async def handle(self, request: FetchDataCommand) -> FetchDataResult | None:
+ async def handle(self, request: FetchDataCommand) -> typing.Optional[FetchDataResult]:
if request.source == "error":
logger.info("COR chain: DefaultChainHandler raising ConnectionError for source=error")
raise ConnectionError("Downstream service unavailable")
diff --git a/examples/cor_request_handler.py b/examples/cor_request_handler.py
index a69f5b5..7fc23e2 100644
--- a/examples/cor_request_handler.py
+++ b/examples/cor_request_handler.py
@@ -82,7 +82,7 @@ class ProcessPaymentCommand(cqrs.Request):
class PaymentResult(cqrs.Response):
success: bool
- transaction_id: str | None = None
+ transaction_id: typing.Optional[str] = None
message: str = ""
@@ -101,7 +101,7 @@ def __init__(self) -> None:
super().__init__()
self._events: typing.List[cqrs.Event] = []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
if request.payment_method == "credit_card":
transaction_id = f"cc_{request.user_id}_{int(request.amount * 100)}"
TRANSACTIONS["credit_card"].append(transaction_id)
@@ -134,7 +134,7 @@ def __init__(self) -> None:
super().__init__()
self._events: typing.List[cqrs.Event] = []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
if request.payment_method == "paypal":
transaction_id = f"pp_{request.user_id}_{int(request.amount * 100)}"
TRANSACTIONS["paypal"].append(transaction_id)
@@ -167,7 +167,7 @@ def __init__(self) -> None:
super().__init__()
self._events: typing.List[cqrs.Event] = []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
if request.payment_method == "bank_transfer":
transaction_id = f"bt_{request.user_id}_{int(request.amount * 100)}"
TRANSACTIONS["bank_transfer"].append(transaction_id)
@@ -198,7 +198,7 @@ class DefaultPaymentHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResu
def events(self) -> typing.List[cqrs.Event]:
return []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
# Default handler always handles the request (end of chain)
print(
f"Default: Unsupported payment method '{request.payment_method}' for user {request.user_id}",
diff --git a/examples/kafka_event_consuming.py b/examples/kafka_event_consuming.py
index b6f7a89..2d7bf87 100644
--- a/examples/kafka_event_consuming.py
+++ b/examples/kafka_event_consuming.py
@@ -114,7 +114,7 @@ async def empty_message_decoder(
[kafka.KafkaMessage],
typing.Awaitable[types.DecodedMessage],
],
-) -> types.DecodedMessage | None:
+) -> typing.Optional[types.DecodedMessage]:
"""
Decode a kafka message and return it if it is not empty.
"""
@@ -158,7 +158,7 @@ def mediator_factory() -> cqrs.EventMediator:
decoder=empty_message_decoder,
)
async def hello_world_event_handler(
- body: cqrs.NotificationEvent[HelloWorldPayload] | deserializers.DeserializeJsonError | None,
+ body: typing.Union[cqrs.NotificationEvent[HelloWorldPayload], typing.Optional[deserializers.DeserializeJsonError]],
msg: kafka.KafkaMessage,
mediator: cqrs.EventMediator = faststream.Depends(mediator_factory),
):
diff --git a/examples/saga.py b/examples/saga.py
index 417eb54..2be0eeb 100644
--- a/examples/saga.py
+++ b/examples/saga.py
@@ -86,6 +86,7 @@
import asyncio
import dataclasses
import logging
+import typing
import uuid
import di
@@ -119,9 +120,9 @@ class OrderContext(SagaContext):
shipping_address: str
# These fields are populated by steps during execution
- inventory_reservation_id: str | None = None
- payment_id: str | None = None
- shipment_id: str | None = None
+ inventory_reservation_id: typing.Optional[str] = None
+ payment_id: typing.Optional[str] = None
+ shipment_id: typing.Optional[str] = None
# ============================================================================
diff --git a/examples/saga_fallback.py b/examples/saga_fallback.py
index 78a8fa9..683ba18 100644
--- a/examples/saga_fallback.py
+++ b/examples/saga_fallback.py
@@ -70,6 +70,7 @@
import asyncio
import dataclasses
import logging
+import typing
import uuid
import di
@@ -104,7 +105,7 @@ class OrderContext(SagaContext):
amount: float
# This field is populated by step during execution
- reservation_id: str | None = None
+ reservation_id: typing.Optional[str] = None
# ============================================================================
diff --git a/examples/saga_fastapi_sse.py b/examples/saga_fastapi_sse.py
index 8cb8b86..5d2a766 100644
--- a/examples/saga_fastapi_sse.py
+++ b/examples/saga_fastapi_sse.py
@@ -179,9 +179,9 @@ class OrderContext(SagaContext):
total_amount: float
shipping_address: str
- inventory_reservation_id: str | None = None
- payment_id: str | None = None
- shipment_id: str | None = None
+ inventory_reservation_id: typing.Optional[str] = None
+ payment_id: typing.Optional[str] = None
+ shipment_id: typing.Optional[str] = None
class ProcessOrderRequest(pydantic.BaseModel):
@@ -448,7 +448,7 @@ def mediator_factory() -> cqrs.SagaMediator:
)
-def serialize_response(response: Response | None) -> dict[str, typing.Any]:
+def serialize_response(response: typing.Optional[Response]) -> dict[str, typing.Any]:
if response is None:
return {}
return response.to_dict()
diff --git a/examples/saga_mermaid.py b/examples/saga_mermaid.py
index 84252f7..f3758b9 100644
--- a/examples/saga_mermaid.py
+++ b/examples/saga_mermaid.py
@@ -87,9 +87,9 @@ class OrderContext(SagaContext):
total_amount: float
shipping_address: str
- inventory_reservation_id: str | None = None
- payment_id: str | None = None
- shipment_id: str | None = None
+ inventory_reservation_id: typing.Optional[str] = None
+ payment_id: typing.Optional[str] = None
+ shipment_id: typing.Optional[str] = None
class ReserveInventoryResponse(Response):
diff --git a/examples/saga_recovery.py b/examples/saga_recovery.py
index a7848be..b117bbb 100644
--- a/examples/saga_recovery.py
+++ b/examples/saga_recovery.py
@@ -127,9 +127,9 @@ class OrderContext(SagaContext):
shipping_address: str
# These fields are populated by steps during execution
- inventory_reservation_id: str | None = None
- payment_id: str | None = None
- shipment_id: str | None = None
+ inventory_reservation_id: typing.Optional[str] = None
+ payment_id: typing.Optional[str] = None
+ shipment_id: typing.Optional[str] = None
# ============================================================================
diff --git a/examples/saga_recovery_scheduler.py b/examples/saga_recovery_scheduler.py
index 16d6d5e..89318e9 100644
--- a/examples/saga_recovery_scheduler.py
+++ b/examples/saga_recovery_scheduler.py
@@ -112,9 +112,9 @@ class OrderContext(SagaContext):
total_amount: float
shipping_address: str
- inventory_reservation_id: str | None = None
- payment_id: str | None = None
- shipment_id: str | None = None
+ inventory_reservation_id: typing.Optional[str] = None
+ payment_id: typing.Optional[str] = None
+ shipment_id: typing.Optional[str] = None
# ============================================================================
@@ -536,7 +536,7 @@ async def recovery_loop(
storage: ISagaStorage,
*,
interval_seconds: float = RECOVERY_INTERVAL_SECONDS,
- max_iterations: int | None = None,
+ max_iterations: typing.Optional[int] = None,
) -> None:
"""
Run the recovery scheduler loop.
diff --git a/pyproject.toml b/pyproject.toml
index 9896e07..d750e00 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,9 +11,13 @@ classifiers = [
"Development Status :: 4 - Beta",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
+ "Programming Language :: Python :: 3 :: Only",
+ "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
- "Programming Language :: Python :: 3.12"
+ "Programming Language :: Python :: 3.12",
+ "Programming Language :: Python :: 3.13",
+ "Programming Language :: Python :: 3.14"
]
dependencies = [
"dataclass-wizard==0.*",
@@ -22,15 +26,14 @@ dependencies = [
"orjson==3.*",
"pydantic==2.*",
"sqlalchemy[asyncio]==2.0.*",
- "python-dotenv==1.*",
- "retry-async==0.1.*",
+ "python-dotenv>=0.21,<2",
"typing-extensions>=4.0"
]
description = "Event-Driven Architecture Framework for Distributed Systems"
maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
-requires-python = ">=3.10"
+requires-python = ">=3.9,<3.15"
version = "4.10.1"
[project.optional-dependencies]
diff --git a/pyrightconfig.json b/pyrightconfig.json
index 2b2abd5..067c3d3 100644
--- a/pyrightconfig.json
+++ b/pyrightconfig.json
@@ -8,18 +8,18 @@
"defineConstant": {
"DEBUG": true
},
- "pythonVersion": "3.10",
+ "pythonVersion": "3.9",
"pythonPlatform": "Linux",
"executionEnvironments": [
{
"root": "./",
- "pythonVersion": "3.10",
+ "pythonVersion": "3.9",
"pythonPlatform": "Linux",
"reportMissingImports": "error"
},
{
"root": "./examples",
- "pythonVersion": "3.10",
+ "pythonVersion": "3.9",
"pythonPlatform": "Linux",
"reportMissingImports": "warning"
}
diff --git a/ruff.toml b/ruff.toml
index 0fba781..0f23677 100644
--- a/ruff.toml
+++ b/ruff.toml
@@ -32,8 +32,8 @@ exclude = [
line-length = 120
indent-width = 4
-# Assume Python 3.10
-target-version = "py310"
+# Assume Python 3.9
+target-version = "py39"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
diff --git a/src/cqrs/adapters/circuit_breaker.py b/src/cqrs/adapters/circuit_breaker.py
index 46c5218..ca063c5 100644
--- a/src/cqrs/adapters/circuit_breaker.py
+++ b/src/cqrs/adapters/circuit_breaker.py
@@ -105,7 +105,7 @@ def default_memory_storage_factory(name: str) -> _CircuitBreakerStorage:
return CircuitMemoryStorage(state=aiobreaker.CircuitBreakerState.CLOSED)
-def _identifier_to_name(identifier: type | str) -> str:
+def _identifier_to_name(identifier: typing.Union[type, str]) -> str:
"""Build circuit breaker namespace from type or string."""
if isinstance(identifier, str):
return identifier
@@ -142,8 +142,8 @@ def __init__(
self,
fail_max: int = 5,
timeout_duration: int = 60,
- exclude: list[type[Exception]] | None = None,
- storage_factory: StorageFactory | None = None,
+ exclude: typing.Optional[list[type[Exception]]] = None,
+ storage_factory: typing.Optional[StorageFactory] = None,
) -> None:
if CircuitBreaker is None:
raise ImportError(
@@ -158,7 +158,7 @@ def __init__(
# Dictionary to store circuit breakers per identifier (type or str)
self._breakers: dict[str, typing.Any] = {} # type: ignore[type-arg]
- def _get_breaker(self, identifier: type | str) -> typing.Any: # type: ignore[return-type]
+ def _get_breaker(self, identifier: typing.Union[type, str]) -> typing.Any: # type: ignore[return-type]
"""
Get or create circuit breaker for an identifier (type or string).
@@ -208,7 +208,7 @@ def _create_breaker(self, name: str) -> typing.Any: # type: ignore[return-type]
async def call(
self,
- identifier: type | str,
+ identifier: typing.Union[type, str],
func: typing.Callable[..., typing.Awaitable[typing.Any]],
*args: typing.Any,
**kwargs: typing.Any,
diff --git a/src/cqrs/adapters/kafka.py b/src/cqrs/adapters/kafka.py
index ceb6c8f..b507e46 100644
--- a/src/cqrs/adapters/kafka.py
+++ b/src/cqrs/adapters/kafka.py
@@ -1,5 +1,4 @@
import asyncio
-import functools
import logging
import ssl
import typing
@@ -8,7 +7,6 @@
from cqrs.serializers import default
import aiokafka
-import retry_async
from aiokafka import errors
@@ -17,23 +15,19 @@
"kafka_producer_factory",
)
-_retry = functools.partial(
- retry_async.retry,
- exceptions=(
- errors.KafkaConnectionError,
- errors.NodeNotReadyError,
- errors.RequestTimedOutError,
- ),
- is_async=True,
+_RETRYABLE_KAFKA_EXCEPTIONS = (
+ errors.KafkaConnectionError,
+ errors.NodeNotReadyError,
+ errors.RequestTimedOutError,
)
-SecurityProtocol: typing.TypeAlias = typing.Literal[
+SecurityProtocol = typing.Literal[
"PLAINTEXT",
"SSL",
"SASL_PLAINTEXT",
"SASL_SSL",
]
-SaslMechanism: typing.TypeAlias = typing.Literal[
+SaslMechanism = typing.Literal[
"PLAIN",
"GSSAPI",
"SCRAM-SHA-256",
@@ -44,7 +38,7 @@
logger = logging.getLogger("cqrs")
logger.setLevel(logging.DEBUG)
-Serializer = typing.Callable[[typing.Any], typing.ByteString | None]
+Serializer = typing.Callable[[typing.Any], typing.Optional[typing.ByteString]]
class KafkaProducer(protocol.KafkaProducer):
@@ -73,26 +67,27 @@ async def produce(self, topic: typing.Text, message: typing.Any):
Produces event to kafka broker.
Tries to reconnect if connect has been lost or has not been opened.
"""
- await _retry(tries=self._retry_count, delay=self._retry_delay)(self._produce)(
- topic,
- message,
- )
+ for attempt in range(1, self._retry_count + 1):
+ try:
+ await self._produce(topic, message)
+ return
+ except _RETRYABLE_KAFKA_EXCEPTIONS:
+ if attempt == self._retry_count:
+ raise
+ await asyncio.sleep(self._retry_delay)
def kafka_producer_factory(
dsn: typing.Text,
security_protocol: SecurityProtocol = "PLAINTEXT",
sasl_mechanism: SaslMechanism = "PLAIN",
- ssl_context: ssl.SSLContext | None = None,
+ ssl_context: typing.Optional[ssl.SSLContext] = None,
retry_count: int = 3,
retry_delay: int = 1,
- user: typing.Text | None = None,
- password: typing.Text | None = None,
- value_serializer: Serializer | None = None,
+ user: typing.Optional[typing.Text] = None,
+ password: typing.Optional[typing.Text] = None,
+ value_serializer: typing.Optional[Serializer] = None,
) -> KafkaProducer:
- loop = asyncio.get_event_loop()
- asyncio.set_event_loop(loop)
-
producer = aiokafka.AIOKafkaProducer(
bootstrap_servers=dsn,
value_serializer=value_serializer or default.default_serializer,
@@ -101,7 +96,6 @@ def kafka_producer_factory(
sasl_plain_username=user,
sasl_plain_password=password,
ssl_context=ssl_context,
- loop=loop,
)
return KafkaProducer(
producer=producer,
diff --git a/src/cqrs/circuit_breaker.py b/src/cqrs/circuit_breaker.py
index 172eab6..e11d0b7 100644
--- a/src/cqrs/circuit_breaker.py
+++ b/src/cqrs/circuit_breaker.py
@@ -18,7 +18,7 @@ class ICircuitBreaker(typing.Protocol):
async def call(
self,
- identifier: type | str,
+ identifier: typing.Union[type, str],
func: typing.Callable[..., typing.Awaitable[typing.Any]],
*args: typing.Any,
**kwargs: typing.Any,
@@ -56,7 +56,7 @@ def is_circuit_breaker_error(self, exc: Exception) -> bool:
def should_use_fallback(
primary_error: Exception,
- circuit_breaker: ICircuitBreaker | None,
+ circuit_breaker: typing.Optional[ICircuitBreaker],
failure_exceptions: tuple[type[Exception], ...],
) -> bool:
"""
diff --git a/src/cqrs/deserializers/exceptions.py b/src/cqrs/deserializers/exceptions.py
index 4cb2d0a..dcb58ad 100644
--- a/src/cqrs/deserializers/exceptions.py
+++ b/src/cqrs/deserializers/exceptions.py
@@ -15,4 +15,4 @@ class DeserializeJsonError:
error_message: str
error_type: typing.Type[Exception]
- message_data: str | bytes | None
+ message_data: typing.Union[str, typing.Optional[bytes]]
diff --git a/src/cqrs/deserializers/json.py b/src/cqrs/deserializers/json.py
index 9f25565..6782d26 100644
--- a/src/cqrs/deserializers/json.py
+++ b/src/cqrs/deserializers/json.py
@@ -80,7 +80,10 @@ def __init__(self, model: typing.Type[typing.Any]):
# Store model - type is preserved through generic parameter _T for return type
self._model: typing.Type[typing.Any] = model
- def __call__(self, data: str | bytes | None) -> _T | None | DeserializeJsonError:
+ def __call__(
+ self,
+ data: typing.Union[str, typing.Optional[bytes]],
+ ) -> typing.Union[typing.Optional[_T], DeserializeJsonError]:
"""
Deserialize JSON data into model instance.
diff --git a/src/cqrs/dispatcher/event.py b/src/cqrs/dispatcher/event.py
index a6b0169..a383cf1 100644
--- a/src/cqrs/dispatcher/event.py
+++ b/src/cqrs/dispatcher/event.py
@@ -8,7 +8,7 @@
from cqrs.events.map import EventMap
from cqrs.middlewares.base import MiddlewareChain
-_EventHandler: typing.TypeAlias = EventHandler
+_EventHandler = EventHandler
logger = logging.getLogger("cqrs")
@@ -18,7 +18,7 @@ def __init__(
self,
event_map: EventMap,
container: Container,
- middleware_chain: MiddlewareChain | None = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
) -> None:
self._event_map = event_map
self._container = container
diff --git a/src/cqrs/dispatcher/models.py b/src/cqrs/dispatcher/models.py
index 5db8e44..808c725 100644
--- a/src/cqrs/dispatcher/models.py
+++ b/src/cqrs/dispatcher/models.py
@@ -25,4 +25,4 @@ class SagaDispatchResult:
step_result: SagaStepResult
events: typing.List[IEvent] = dataclasses.field(default_factory=list)
- saga_id: str | None = None
+ saga_id: typing.Optional[str] = None
diff --git a/src/cqrs/dispatcher/request.py b/src/cqrs/dispatcher/request.py
index 8e9b5cd..dd4583f 100644
--- a/src/cqrs/dispatcher/request.py
+++ b/src/cqrs/dispatcher/request.py
@@ -10,7 +10,7 @@
RequestHandlerTypeError,
)
from cqrs.dispatcher.models import RequestDispatchResult
-from cqrs.middlewares.base import MiddlewareChain
+from cqrs.middlewares.base import HandleType, MiddlewareChain
from cqrs.requests.cor_request_handler import (
CORRequestHandler,
build_chain,
@@ -23,7 +23,7 @@
logger = logging.getLogger("cqrs")
-_RequestHandler: typing.TypeAlias = RequestHandler | CORRequestHandler
+_RequestHandler = typing.Union[RequestHandler, CORRequestHandler]
class RequestDispatcher:
@@ -31,7 +31,7 @@ def __init__(
self,
request_map: RequestMap,
container: Container,
- middleware_chain: MiddlewareChain | None = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
) -> None:
self._request_map = request_map
self._container = container
@@ -80,7 +80,9 @@ async def _dispatch_fallback(
"""Dispatch using primary handler with fallback on failure."""
primary = await self._container.resolve(fallback_config.primary)
try:
- wrapped_primary = self._middleware_chain.wrap(primary.handle)
+ wrapped_primary = self._middleware_chain.wrap(
+ typing.cast(HandleType, primary.handle),
+ )
if fallback_config.circuit_breaker is not None:
response = await fallback_config.circuit_breaker.call(
fallback_config.primary,
@@ -116,7 +118,9 @@ async def _dispatch_fallback(
fallback_config.fallback.__name__,
)
fallback_handler = await self._container.resolve(fallback_config.fallback)
- wrapped_fallback = self._middleware_chain.wrap(fallback_handler.handle)
+ wrapped_fallback = self._middleware_chain.wrap(
+ typing.cast(HandleType, fallback_handler.handle),
+ )
response = await wrapped_fallback(request)
return RequestDispatchResult(
response=response,
diff --git a/src/cqrs/dispatcher/saga.py b/src/cqrs/dispatcher/saga.py
index d3e3202..ea41e60 100644
--- a/src/cqrs/dispatcher/saga.py
+++ b/src/cqrs/dispatcher/saga.py
@@ -27,8 +27,8 @@ def __init__(
self,
saga_map: SagaMap,
container: Container,
- storage: ISagaStorage | None = None,
- middleware_chain: MiddlewareChain | None = None,
+ storage: typing.Optional[ISagaStorage] = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
compensation_retry_count: int = 3,
compensation_retry_delay: float = 1.0,
compensation_retry_backoff: float = 2.0,
@@ -56,7 +56,7 @@ def __init__(
def dispatch(
self,
context: SagaContext,
- saga_id: uuid.UUID | None = None,
+ saga_id: typing.Optional[uuid.UUID] = None,
) -> typing.AsyncIterator[SagaDispatchResult]:
"""
Dispatch a saga execution for the given context.
@@ -81,7 +81,7 @@ def dispatch(
async def _dispatch_impl(
self,
context: SagaContext,
- saga_id: uuid.UUID | None = None,
+ saga_id: typing.Optional[uuid.UUID] = None,
) -> typing.AsyncIterator[SagaDispatchResult]:
# Find saga type by context type
saga_type = self._saga_map.get(type(context))
diff --git a/src/cqrs/dispatcher/streaming.py b/src/cqrs/dispatcher/streaming.py
index 2caabec..4900651 100644
--- a/src/cqrs/dispatcher/streaming.py
+++ b/src/cqrs/dispatcher/streaming.py
@@ -34,7 +34,7 @@ def __init__(
self,
request_map: RequestMap,
container: Container,
- middleware_chain: MiddlewareChain | None = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
) -> None:
self._request_map = request_map
self._container = container
diff --git a/src/cqrs/events/bootstrap.py b/src/cqrs/events/bootstrap.py
index 36da6b1..5bfccd8 100644
--- a/src/cqrs/events/bootstrap.py
+++ b/src/cqrs/events/bootstrap.py
@@ -14,7 +14,7 @@
def setup_mediator(
container: di_container_impl.DIContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- events_mapper: typing.Callable[[events.EventMap], None] | None = None,
+ events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
) -> cqrs.EventMediator: ...
@@ -22,14 +22,14 @@ def setup_mediator(
def setup_mediator(
container: CQRSContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- events_mapper: typing.Callable[[events.EventMap], None] | None = None,
+ events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
) -> cqrs.EventMediator: ...
def setup_mediator(
- container: di_container_impl.DIContainer | CQRSContainer,
+ container: typing.Union[di_container_impl.DIContainer, CQRSContainer],
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- events_mapper: typing.Callable[[events.EventMap], None] | None = None,
+ events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
) -> cqrs.EventMediator:
"""
Create an event mediator with the given container and middlewares.
@@ -75,26 +75,26 @@ def bind_events(event_map: events.EventMap) -> None:
@overload
def bootstrap(
di_container: di.Container,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
) -> cqrs.EventMediator: ...
@overload
def bootstrap(
di_container: CQRSContainer,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
) -> cqrs.EventMediator: ...
def bootstrap(
- di_container: di.Container | CQRSContainer,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ di_container: typing.Union[di.Container, CQRSContainer],
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
) -> cqrs.EventMediator:
"""
Bootstrap an event mediator with optional middlewares and event bindings.
diff --git a/src/cqrs/events/event_emitter.py b/src/cqrs/events/event_emitter.py
index e21af7a..26ea9a9 100644
--- a/src/cqrs/events/event_emitter.py
+++ b/src/cqrs/events/event_emitter.py
@@ -10,7 +10,7 @@
logger = logging.getLogger("cqrs")
-_H: typing.TypeAlias = event_handler.EventHandler
+_H = event_handler.EventHandler
class EventEmitter:
@@ -28,7 +28,7 @@ def __init__(
self,
event_map: map.EventMap,
container: di_container.Container,
- message_broker: message_brokers.MessageBroker | None = None,
+ message_broker: typing.Optional[message_brokers.MessageBroker] = None,
) -> None:
"""
Initialize the event emitter.
diff --git a/src/cqrs/events/event_processor.py b/src/cqrs/events/event_processor.py
index 8209263..9aa7149 100644
--- a/src/cqrs/events/event_processor.py
+++ b/src/cqrs/events/event_processor.py
@@ -33,7 +33,7 @@ class EventProcessor:
def __init__(
self,
event_map: EventMap,
- event_emitter: EventEmitter | None = None,
+ event_emitter: typing.Optional[EventEmitter] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
) -> None:
diff --git a/src/cqrs/events/fallback.py b/src/cqrs/events/fallback.py
index 6c5ba19..1512f49 100644
--- a/src/cqrs/events/fallback.py
+++ b/src/cqrs/events/fallback.py
@@ -46,7 +46,7 @@ class EventHandlerFallback:
primary: EventHandlerT
fallback: EventHandlerT
failure_exceptions: tuple[type[Exception], ...] = ()
- circuit_breaker: ICircuitBreaker | None = None
+ circuit_breaker: typing.Optional[ICircuitBreaker] = None
def __post_init__(self) -> None:
if not isinstance(self.primary, type) or not isinstance(self.fallback, type):
diff --git a/src/cqrs/events/map.py b/src/cqrs/events/map.py
index dbd5b22..b5c524c 100644
--- a/src/cqrs/events/map.py
+++ b/src/cqrs/events/map.py
@@ -5,8 +5,8 @@
from cqrs.events.fallback import EventHandlerFallback
_KT = typing.TypeVar("_KT", bound=typing.Type[IEvent])
-_HandlerItem = typing.Type[event_handler.EventHandler] | EventHandlerFallback
-_VT: typing.TypeAlias = typing.List[_HandlerItem]
+_HandlerItem = typing.Union[typing.Type[event_handler.EventHandler], EventHandlerFallback]
+_VT = typing.List[_HandlerItem]
class EventMap(typing.Dict[_KT, _VT]):
diff --git a/src/cqrs/generic_utils.py b/src/cqrs/generic_utils.py
index 438359b..cec6cbf 100644
--- a/src/cqrs/generic_utils.py
+++ b/src/cqrs/generic_utils.py
@@ -7,7 +7,7 @@ def get_generic_args_for_origin(
klass: type,
origin_classes: tuple[type, ...],
min_args: int = 1,
-) -> tuple[type, ...] | None:
+) -> typing.Optional[tuple[type, ...]]:
"""
Extract generic type arguments from a class that inherits from a Generic base.
diff --git a/src/cqrs/mediator.py b/src/cqrs/mediator.py
index 71c0d60..dad9fca 100644
--- a/src/cqrs/mediator.py
+++ b/src/cqrs/mediator.py
@@ -61,9 +61,9 @@ def __init__(
self,
request_map: RequestMap,
container: Container,
- event_emitter: EventEmitter | None = None,
- middleware_chain: MiddlewareChain | None = None,
- event_map: EventMap | None = None,
+ event_emitter: typing.Optional[EventEmitter] = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
+ event_map: typing.Optional[EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
*,
@@ -116,7 +116,7 @@ def __init__(
self,
event_map: EventMap,
container: Container,
- middleware_chain: MiddlewareChain | None = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
*,
dispatcher_type: typing.Type[EventDispatcher] = EventDispatcher,
):
@@ -166,9 +166,9 @@ def __init__(
self,
request_map: RequestMap,
container: Container,
- event_emitter: EventEmitter | None = None,
- middleware_chain: MiddlewareChain | None = None,
- event_map: EventMap | None = None,
+ event_emitter: typing.Optional[EventEmitter] = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
+ event_map: typing.Optional[EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
*,
@@ -189,7 +189,7 @@ def __init__(
def stream(
self,
request: IRequest,
- ) -> typing.AsyncIterator[IResponse | None]:
+ ) -> typing.AsyncIterator[IResponse]:
"""
Stream results from a generator-based handler.
@@ -207,7 +207,7 @@ def stream(
async def _stream_impl(
self,
request: IRequest,
- ) -> typing.AsyncIterator[IResponse | None]:
+ ) -> typing.AsyncIterator[IResponse]:
async for dispatch_result in self._dispatcher.dispatch(request):
await self._event_processor.emit_events(dispatch_result.events)
@@ -251,12 +251,12 @@ def __init__(
self,
saga_map: SagaMap,
container: Container,
- event_emitter: EventEmitter | None = None,
- middleware_chain: MiddlewareChain | None = None,
- event_map: EventMap | None = None,
+ event_emitter: typing.Optional[EventEmitter] = None,
+ middleware_chain: typing.Optional[MiddlewareChain] = None,
+ event_map: typing.Optional[EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
- storage: ISagaStorage | None = None,
+ storage: typing.Optional[ISagaStorage] = None,
compensation_retry_count: int = 3,
compensation_retry_delay: float = 1.0,
compensation_retry_backoff: float = 2.0,
@@ -282,7 +282,7 @@ def __init__(
def stream(
self,
context: SagaContext,
- saga_id: uuid.UUID | None = None,
+ saga_id: typing.Optional[uuid.UUID] = None,
) -> typing.AsyncIterator[SagaStepResult]:
"""
Stream results from saga execution.
@@ -309,7 +309,7 @@ def stream(
async def _stream_impl(
self,
context: SagaContext,
- saga_id: uuid.UUID | None = None,
+ saga_id: typing.Optional[uuid.UUID] = None,
) -> typing.AsyncIterator[SagaStepResult]:
async for dispatch_result in self._dispatcher.dispatch(
context,
diff --git a/src/cqrs/middlewares/base.py b/src/cqrs/middlewares/base.py
index 60856f5..5af2821 100644
--- a/src/cqrs/middlewares/base.py
+++ b/src/cqrs/middlewares/base.py
@@ -4,11 +4,11 @@
from cqrs.saga.models import SagaContext
from cqrs.requests.request import ReqT, ResT
-HandleType = typing.Callable[[ReqT], typing.Awaitable[ResT] | ResT]
+HandleType = typing.Callable[[ReqT], typing.Awaitable[typing.Optional[ResT]]]
class Middleware(typing.Protocol[ReqT, ResT]):
- async def __call__(self, request: ReqT, handle: HandleType) -> ResT | None:
+ async def __call__(self, request: ReqT, handle: HandleType) -> typing.Optional[ResT]:
raise NotImplementedError
diff --git a/src/cqrs/middlewares/logging.py b/src/cqrs/middlewares/logging.py
index d1750b2..a874397 100644
--- a/src/cqrs/middlewares/logging.py
+++ b/src/cqrs/middlewares/logging.py
@@ -1,4 +1,5 @@
import logging
+import typing
from cqrs.middlewares import base
from cqrs.middlewares.base import HandleType
@@ -9,7 +10,7 @@
class LoggingMiddleware(base.Middleware):
- async def __call__(self, request: IRequest, handle: HandleType) -> IResponse | None:
+ async def __call__(self, request: IRequest, handle: HandleType) -> typing.Optional[IResponse]:
logger.debug(
"Handle %s request",
type(request).__name__,
diff --git a/src/cqrs/outbox/map.py b/src/cqrs/outbox/map.py
index 5cb41d4..01b854b 100644
--- a/src/cqrs/outbox/map.py
+++ b/src/cqrs/outbox/map.py
@@ -20,5 +20,5 @@ def register(
def get(
cls,
event_name: typing.Text,
- ) -> typing.Type[INotificationEvent] | None:
+ ) -> typing.Optional[typing.Type[INotificationEvent]]:
return cls._registry.get(event_name)
diff --git a/src/cqrs/outbox/mock.py b/src/cqrs/outbox/mock.py
index 6375d27..d4abecb 100644
--- a/src/cqrs/outbox/mock.py
+++ b/src/cqrs/outbox/mock.py
@@ -28,7 +28,7 @@ def add(self, event: cqrs.INotificationEvent) -> None:
async def get_many(
self,
batch_size: int = 100,
- topic: typing.Text | None = None,
+ topic: typing.Optional[typing.Text] = None,
) -> typing.List[repository.OutboxedEvent]:
return list(
filter(lambda e: topic == e.topic, self.session.values()) if topic else list(self.session.values()),
diff --git a/src/cqrs/outbox/repository.py b/src/cqrs/outbox/repository.py
index 474b864..fb4c484 100644
--- a/src/cqrs/outbox/repository.py
+++ b/src/cqrs/outbox/repository.py
@@ -69,7 +69,7 @@ def add(
async def get_many(
self,
batch_size: int = 100,
- topic: typing.Text | None = None,
+ topic: typing.Optional[typing.Text] = None,
) -> typing.List[OutboxedEvent]:
"""Get many events from the repository."""
diff --git a/src/cqrs/outbox/sqlalchemy.py b/src/cqrs/outbox/sqlalchemy.py
index 99160ac..6de617f 100644
--- a/src/cqrs/outbox/sqlalchemy.py
+++ b/src/cqrs/outbox/sqlalchemy.py
@@ -138,7 +138,7 @@ def row_to_dict(self) -> typing.Dict[typing.Text, typing.Any]:
def get_batch_query(
cls,
size: int,
- topic: typing.Text | None = None,
+ topic: typing.Optional[typing.Text] = None,
) -> sqlalchemy.Select:
return (
sqlalchemy.select(cls)
@@ -192,7 +192,7 @@ class SqlAlchemyOutboxedEventRepository(repository.OutboxedEventRepository):
def __init__(
self,
session: sql_session.AsyncSession,
- compressor: compressors.Compressor | None = None,
+ compressor: typing.Optional[compressors.Compressor] = None,
):
self.session = session
self._compressor = compressor
@@ -225,7 +225,7 @@ def add(
),
)
- def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None:
+ def _process_events(self, model: OutboxModel) -> typing.Optional[repository.OutboxedEvent]:
event_dict = model.row_to_dict()
event_model = map.OutboxedEventMap.get(event_dict["event_name"])
@@ -248,7 +248,7 @@ def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None
async def get_many(
self,
batch_size: int = 100,
- topic: typing.Text | None = None,
+ topic: typing.Optional[typing.Text] = None,
) -> typing.List[repository.OutboxedEvent]:
events: typing.Sequence[OutboxModel] = (
(await self.session.execute(OutboxModel.get_batch_query(batch_size, topic))).scalars().all()
@@ -283,7 +283,7 @@ async def rollback(self):
def rebind_outbox_model(
model: typing.Any,
new_base: DeclarativeMeta,
- table_name: typing.Text | None = None,
+ table_name: typing.Optional[typing.Text] = None,
):
model.__bases__ = (new_base,)
model.__table__.name = table_name or model.__table__.name
diff --git a/src/cqrs/producer.py b/src/cqrs/producer.py
index f1dbaf8..155d14d 100644
--- a/src/cqrs/producer.py
+++ b/src/cqrs/producer.py
@@ -9,7 +9,7 @@
logger = logging.getLogger("cqrs")
logger.setLevel(logging.DEBUG)
-SessionFactory: typing.TypeAlias = typing.Callable[[], sql_session.AsyncSession]
+SessionFactory = typing.Callable[[], sql_session.AsyncSession]
class EventProducer:
diff --git a/src/cqrs/requests/bootstrap.py b/src/cqrs/requests/bootstrap.py
index 916f432..34c9c44 100644
--- a/src/cqrs/requests/bootstrap.py
+++ b/src/cqrs/requests/bootstrap.py
@@ -17,23 +17,23 @@
@overload
def setup_event_emitter(
container: di_container_impl.DIContainer,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- message_broker: protocol.MessageBroker | None = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
) -> events.EventEmitter: ...
@overload
def setup_event_emitter(
container: CQRSContainer,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- message_broker: protocol.MessageBroker | None = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
): ...
def setup_event_emitter(
- container: di_container_impl.DIContainer | CQRSContainer,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- message_broker: protocol.MessageBroker | None = None,
+ container: typing.Union[di_container_impl.DIContainer, CQRSContainer],
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
) -> events.EventEmitter:
if message_broker is None:
message_broker = DEFAULT_MESSAGE_BROKER
@@ -54,8 +54,8 @@ def setup_mediator(
event_emitter: events.EventEmitter,
container: di_container_impl.DIContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
) -> cqrs.RequestMediator: ...
@@ -64,8 +64,8 @@ def setup_mediator(
event_emitter: events.EventEmitter,
container: CQRSContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
) -> cqrs.RequestMediator: ...
@@ -74,9 +74,9 @@ def setup_mediator(
event_emitter: events.EventEmitter,
container: di_container_impl.DIContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- event_map: events.EventMap | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ event_map: typing.Optional[events.EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
) -> cqrs.RequestMediator: ...
@@ -87,9 +87,9 @@ def setup_mediator(
event_emitter: events.EventEmitter,
container: CQRSContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- event_map: events.EventMap | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ event_map: typing.Optional[events.EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
) -> cqrs.RequestMediator: ...
@@ -97,11 +97,11 @@ def setup_mediator(
def setup_mediator(
event_emitter: events.EventEmitter,
- container: di_container_impl.DIContainer | CQRSContainer,
+ container: typing.Union[di_container_impl.DIContainer, CQRSContainer],
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- event_map: events.EventMap | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ event_map: typing.Optional[events.EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
) -> cqrs.RequestMediator:
@@ -134,36 +134,36 @@ def setup_mediator(
@overload
def bootstrap(
di_container: di.Container,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
) -> cqrs.RequestMediator: ...
@overload
def bootstrap(
di_container: CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
) -> cqrs.RequestMediator: ...
@overload
def bootstrap(
di_container: di.Container,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = False,
) -> cqrs.RequestMediator: ...
@@ -172,25 +172,25 @@ def bootstrap(
@overload
def bootstrap(
di_container: CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = False,
) -> cqrs.RequestMediator: ...
def bootstrap(
- di_container: di.Container | CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ di_container: typing.Union[di.Container, CQRSContainer],
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = False,
) -> cqrs.RequestMediator:
@@ -239,9 +239,9 @@ def setup_streaming_mediator(
event_emitter: events.EventEmitter,
container: di_container_impl.DIContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = True,
) -> cqrs.StreamingRequestMediator: ...
@@ -252,9 +252,9 @@ def setup_streaming_mediator(
event_emitter: events.EventEmitter,
container: CQRSContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = True,
) -> cqrs.StreamingRequestMediator: ...
@@ -262,11 +262,11 @@ def setup_streaming_mediator(
def setup_streaming_mediator(
event_emitter: events.EventEmitter,
- container: di_container_impl.DIContainer | CQRSContainer,
+ container: typing.Union[di_container_impl.DIContainer, CQRSContainer],
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = True,
) -> cqrs.StreamingRequestMediator:
@@ -299,12 +299,12 @@ def setup_streaming_mediator(
@overload
def bootstrap_streaming(
di_container: di.Container,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = False,
) -> cqrs.StreamingRequestMediator: ...
@@ -313,25 +313,25 @@ def bootstrap_streaming(
@overload
def bootstrap_streaming(
di_container: CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = False,
) -> cqrs.StreamingRequestMediator: ...
def bootstrap_streaming(
- di_container: di.Container | CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- commands_mapper: typing.Callable[[RequestMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- queries_mapper: typing.Callable[[RequestMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
+ di_container: typing.Union[di.Container, CQRSContainer],
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ commands_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ queries_mapper: typing.Optional[typing.Callable[[RequestMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
max_concurrent_event_handlers: int = 10,
concurrent_event_handle_enable: bool = False,
) -> cqrs.StreamingRequestMediator:
diff --git a/src/cqrs/requests/cor_request_handler.py b/src/cqrs/requests/cor_request_handler.py
index c102fe1..db942c9 100644
--- a/src/cqrs/requests/cor_request_handler.py
+++ b/src/cqrs/requests/cor_request_handler.py
@@ -6,6 +6,7 @@
from cqrs.events.event import IEvent
from cqrs.requests.request import ReqT, ResT
+from cqrs.response import IResponse
class CORRequestHandler(abc.ABC, typing.Generic[ReqT, ResT]):
@@ -22,13 +23,30 @@ def __init__(self, auth_service: AuthServiceProtocol) -> None:
self._auth_service = auth_service
self.events: typing.List[IEvent] = []
- async def handle(self, request: LoginCommand) -> None | None:
+ async def handle(self, request: LoginCommand) -> typing.Optional[None ]:
if self._auth_service.can_authenticate(request):
return await self._auth_service.authenticate(request)
return await super().handle(request)
"""
- _next_handler: "CORRequestHandler[ReqT, ResT] | None" = None
+ _next_handler: "typing.Optional[CORRequestHandler[ReqT, ResT] ]" = None
+
+ def __class_getitem__(cls, params): # type: ignore[override]
+ """
+ Backward-compatible generic subscription.
+
+ Supports both forms:
+ - CORRequestHandler[RequestType, ResponseType]
+ - CORRequestHandler[RequestType] # legacy shorthand
+ """
+ if not isinstance(params, tuple):
+ params = (params,)
+ if len(params) == 1:
+ params = (
+ params[0],
+ typing.Optional[IResponse],
+ )
+ return super().__class_getitem__(params) # pyright: ignore[reportAttributeAccessIssue]
def set_next(
self,
@@ -39,7 +57,7 @@ def set_next(
return self._next_handler
- async def next(self, request: ReqT) -> ResT | None:
+ async def next(self, request: ReqT) -> typing.Optional[ResT]:
if self._next_handler:
return await self._next_handler.handle(request)
@@ -56,11 +74,11 @@ def events(self) -> typing.Sequence[IEvent]:
return ()
@abc.abstractmethod
- async def handle(self, request: ReqT) -> ResT | None:
+ async def handle(self, request: ReqT) -> typing.Optional[ResT]:
raise NotImplementedError
-CORRequestHandlerT: typing.TypeAlias = CORRequestHandler
+CORRequestHandlerT = CORRequestHandler
def build_chain(handlers: typing.List[CORRequestHandlerT]) -> CORRequestHandlerT:
diff --git a/src/cqrs/requests/fallback.py b/src/cqrs/requests/fallback.py
index 93f4677..e846d9a 100644
--- a/src/cqrs/requests/fallback.py
+++ b/src/cqrs/requests/fallback.py
@@ -7,7 +7,7 @@
from cqrs.generic_utils import get_generic_args_for_origin
from cqrs.requests.request_handler import RequestHandler, StreamingRequestHandler
-RequestHandlerT = type[RequestHandler] | type[StreamingRequestHandler]
+RequestHandlerT = typing.Union[type[RequestHandler], type[StreamingRequestHandler]]
_REQUEST_HANDLER_ORIGINS: tuple[type, ...] = (RequestHandler, StreamingRequestHandler)
@@ -47,7 +47,7 @@ class RequestHandlerFallback:
primary: RequestHandlerT
fallback: RequestHandlerT
failure_exceptions: tuple[type[Exception], ...] = ()
- circuit_breaker: ICircuitBreaker | None = None
+ circuit_breaker: typing.Optional[ICircuitBreaker] = None
def __post_init__(self) -> None:
if not isinstance(self.primary, type) or not isinstance(self.fallback, type):
diff --git a/src/cqrs/requests/map.py b/src/cqrs/requests/map.py
index e4eabcd..d32837b 100644
--- a/src/cqrs/requests/map.py
+++ b/src/cqrs/requests/map.py
@@ -13,11 +13,12 @@
_KT = typing.TypeVar("_KT", bound=typing.Type[IRequest])
# Type alias for handler types that can be bound to requests
-HandlerType = (
- typing.Type[RequestHandler | StreamingRequestHandler]
- | typing.List[typing.Type[CORRequestHandler]]
- | RequestHandlerFallback
-)
+HandlerType = typing.Union[
+ typing.Type[RequestHandler],
+ typing.Type[StreamingRequestHandler],
+ typing.List[typing.Type[CORRequestHandler]],
+ RequestHandlerFallback,
+]
class RequestMap(typing.Dict[_KT, HandlerType]):
diff --git a/src/cqrs/requests/mermaid.py b/src/cqrs/requests/mermaid.py
index eb87a90..9f453c2 100644
--- a/src/cqrs/requests/mermaid.py
+++ b/src/cqrs/requests/mermaid.py
@@ -144,13 +144,19 @@ def class_diagram(self) -> str:
# Collect all types
request_types: set[type] = set()
response_types: set[type] = set()
- handler_info: list[tuple[str, type | None, type | None]] = []
+ handler_info: list[
+ tuple[
+ str,
+ typing.Optional[type],
+ typing.Optional[type],
+ ]
+ ] = []
# Extract type information from each handler
for handler_type in handlers:
handler_name = handler_type.__name__
- request_type: type | None = None
- response_type: type | None = None
+ request_type: typing.Optional[type] = None
+ response_type: typing.Optional[type] = None
# Extract generic type parameters from __orig_bases__
orig_bases = getattr(handler_type, "__orig_bases__", ())
@@ -187,8 +193,8 @@ def class_diagram(self) -> str:
# Add CORRequestHandler base class
lines.append(" class CORRequestHandler {")
lines.append(" <>")
- lines.append(" +handle(request) Response | None")
- lines.append(" +next(request) Response | None")
+ lines.append(" +handle(request) typing.Optional[Response ]")
+ lines.append(" +next(request) typing.Optional[Response ]")
lines.append(" +set_next(handler) CORRequestHandler")
lines.append(" +events: List[Event]")
lines.append(" }")
@@ -197,8 +203,8 @@ def class_diagram(self) -> str:
# Add handler classes
for handler_name, request_type, response_type in handler_info:
lines.append(f" class {handler_name} {{")
- lines.append(" +handle(request) Response | None")
- lines.append(" +next(request) Response | None")
+ lines.append(" +handle(request) typing.Optional[Response ]")
+ lines.append(" +next(request) typing.Optional[Response ]")
lines.append(" +events: List[Event]")
lines.append(" }")
lines.append("")
diff --git a/src/cqrs/requests/request.py b/src/cqrs/requests/request.py
index 2a6d70f..df29fba 100644
--- a/src/cqrs/requests/request.py
+++ b/src/cqrs/requests/request.py
@@ -54,7 +54,7 @@ def from_dict(cls, **kwargs) -> Self:
# Type variables for request/response (defined here to avoid circular import with
# cqrs.types <-> cqrs.requests.request_handler). Re-exported from cqrs.types for compatibility.
ReqT = typing.TypeVar("ReqT", bound=IRequest, contravariant=True)
-ResT = typing.TypeVar("ResT", bound=IResponse | None, covariant=True)
+ResT = typing.TypeVar("ResT", bound=typing.Optional[IResponse], covariant=True)
@dataclasses.dataclass
diff --git a/src/cqrs/saga/bootstrap.py b/src/cqrs/saga/bootstrap.py
index f772da7..fc42380 100644
--- a/src/cqrs/saga/bootstrap.py
+++ b/src/cqrs/saga/bootstrap.py
@@ -21,11 +21,11 @@ def setup_saga_mediator(
event_emitter: events.EventEmitter,
container: di_container_impl.DIContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- event_map: events.EventMap | None = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ event_map: typing.Optional[events.EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
- saga_storage: ISagaStorage | None = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
) -> cqrs.SagaMediator: ...
@@ -34,23 +34,23 @@ def setup_saga_mediator(
event_emitter: events.EventEmitter,
container: CQRSContainer,
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- event_map: events.EventMap | None = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ event_map: typing.Optional[events.EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
- saga_storage: ISagaStorage | None = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
) -> cqrs.SagaMediator: ...
def setup_saga_mediator(
event_emitter: events.EventEmitter,
- container: di_container_impl.DIContainer | CQRSContainer,
+ container: typing.Union[di_container_impl.DIContainer, CQRSContainer],
middlewares: typing.Iterable[mediator_middlewares.Middleware],
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- event_map: events.EventMap | None = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ event_map: typing.Optional[events.EventMap] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
- saga_storage: ISagaStorage | None = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
) -> cqrs.SagaMediator:
"""
Setup SagaMediator with configured saga map and dependencies.
@@ -101,36 +101,36 @@ def setup_saga_mediator(
@overload
def bootstrap(
di_container: di.Container,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
- saga_storage: ISagaStorage | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
) -> cqrs.SagaMediator: ...
@overload
def bootstrap(
di_container: CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
- saga_storage: ISagaStorage | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
) -> cqrs.SagaMediator: ...
@overload
def bootstrap(
di_container: di.Container,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
- saga_storage: ISagaStorage | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
) -> cqrs.SagaMediator: ...
@@ -139,25 +139,25 @@ def bootstrap(
@overload
def bootstrap(
di_container: CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
- saga_storage: ISagaStorage | None = None,
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
) -> cqrs.SagaMediator: ...
def bootstrap(
- di_container: di.Container | CQRSContainer,
- message_broker: protocol.MessageBroker | None = None,
- middlewares: typing.Sequence[mediator_middlewares.Middleware] | None = None,
- sagas_mapper: typing.Callable[[SagaMap], None] | None = None,
- domain_events_mapper: typing.Callable[[events.EventMap], None] | None = None,
- on_startup: typing.List[typing.Callable[[], None]] | None = None,
- saga_storage: ISagaStorage | None = None,
+ di_container: typing.Union[di.Container, CQRSContainer],
+ message_broker: typing.Optional[protocol.MessageBroker] = None,
+ middlewares: typing.Optional[typing.Sequence[mediator_middlewares.Middleware]] = None,
+ sagas_mapper: typing.Optional[typing.Callable[[SagaMap], None]] = None,
+ domain_events_mapper: typing.Optional[typing.Callable[[events.EventMap], None]] = None,
+ on_startup: typing.Optional[typing.List[typing.Callable[[], None]]] = None,
+ saga_storage: typing.Optional[ISagaStorage] = None,
max_concurrent_event_handlers: int = 1,
concurrent_event_handle_enable: bool = True,
) -> cqrs.SagaMediator:
diff --git a/src/cqrs/saga/compensation.py b/src/cqrs/saga/compensation.py
index 5ab3eb5..3d74b86 100644
--- a/src/cqrs/saga/compensation.py
+++ b/src/cqrs/saga/compensation.py
@@ -19,11 +19,11 @@ def __init__(
self,
saga_id: typing.Any,
context: ContextT,
- storage: ISagaStorage | SagaStorageRun,
+ storage: typing.Union[ISagaStorage, SagaStorageRun],
retry_count: int = 3,
retry_delay: float = 1.0,
retry_backoff: float = 2.0,
- on_after_compensate_step: typing.Callable[[], typing.Awaitable[None]] | None = None,
+ on_after_compensate_step: typing.Optional[typing.Callable[[], typing.Awaitable[None]]] = None,
) -> None:
"""
Create a SagaCompensator configured to perform compensation of completed saga steps with retry and optional post-step callback.
@@ -159,7 +159,7 @@ async def _compensate_step_with_retry(
"""
step_name = step.__class__.__name__
- last_exception: Exception | None = None
+ last_exception: typing.Optional[Exception] = None
for attempt in range(1, self._retry_count + 1):
try:
await step.compensate(self._context)
diff --git a/src/cqrs/saga/execution.py b/src/cqrs/saga/execution.py
index 1c0b611..2c7b560 100644
--- a/src/cqrs/saga/execution.py
+++ b/src/cqrs/saga/execution.py
@@ -21,7 +21,7 @@ class SagaStateManager:
def __init__(
self,
saga_id: typing.Any,
- storage: ISagaStorage | SagaStorageRun,
+ storage: typing.Union[ISagaStorage, SagaStorageRun],
) -> None:
"""
Create a SagaStateManager bound to a specific saga identifier and storage backend.
@@ -61,7 +61,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- error: str | None = None,
+ error: typing.Optional[str] = None,
) -> None:
"""Log step execution."""
await self._storage.log_step(
@@ -79,9 +79,9 @@ class SagaRecoveryManager:
def __init__(
self,
saga_id: typing.Any,
- storage: ISagaStorage | SagaStorageRun,
+ storage: typing.Union[ISagaStorage, SagaStorageRun],
container: Container,
- saga_steps: list[type[SagaStepHandler] | Fallback],
+ saga_steps: list[typing.Union[type[SagaStepHandler], Fallback]],
) -> None:
"""
Construct a SagaRecoveryManager that holds the identifiers, storage, DI container, and configured saga steps required to reconstruct a saga's execution state.
@@ -229,7 +229,12 @@ async def execute_fallback_step(
self,
fallback_wrapper: Fallback,
completed_step_names: set[str],
- ) -> tuple[SagaStepResult[ContextT, typing.Any] | None, SagaStepHandler | None]:
+ ) -> typing.Optional[
+ tuple[
+ SagaStepResult[ContextT, typing.Any],
+ SagaStepHandler,
+ ]
+ ]:
"""
Execute a Fallback step with context snapshot/restore mechanism.
@@ -251,13 +256,13 @@ async def execute_fallback_step(
logger.debug(
f"Skipping already completed Fallback primary step: {primary_step_name}",
)
- return None, None
+ return None
if fallback_step_name in completed_step_names:
logger.debug(
f"Skipping already completed Fallback fallback step: {fallback_step_name}",
)
- return None, None
+ return None
# Resolve step handlers
primary_step = await self._container.resolve(fallback_wrapper.step)
diff --git a/src/cqrs/saga/fallback.py b/src/cqrs/saga/fallback.py
index 03ab78d..d51909d 100644
--- a/src/cqrs/saga/fallback.py
+++ b/src/cqrs/saga/fallback.py
@@ -1,6 +1,7 @@
"""Fallback wrapper for Saga steps to handle failures gracefully."""
import dataclasses
+import typing
from cqrs.circuit_breaker import ICircuitBreaker
from cqrs.saga.step import SagaStepHandler
@@ -32,4 +33,4 @@ class Fallback:
step: type[SagaStepHandler]
fallback: type[SagaStepHandler]
failure_exceptions: tuple[type[Exception], ...] = ()
- circuit_breaker: ICircuitBreaker | None = None
+ circuit_breaker: typing.Optional[ICircuitBreaker] = None
diff --git a/src/cqrs/saga/mermaid.py b/src/cqrs/saga/mermaid.py
index a2e745a..419355e 100644
--- a/src/cqrs/saga/mermaid.py
+++ b/src/cqrs/saga/mermaid.py
@@ -217,7 +217,14 @@ def class_diagram(self) -> str:
context_types: set[type] = set()
response_types: set[type] = set()
event_types: set[type] = set()
- step_info: list[tuple[str, type | None, type | None, list[type]]] = []
+ step_info: list[
+ tuple[
+ str,
+ typing.Optional[type],
+ typing.Optional[type],
+ list[type],
+ ]
+ ] = []
# Extract type information from each step
for step_item in steps:
@@ -228,8 +235,8 @@ def class_diagram(self) -> str:
# Process primary step
primary_name = primary_step.__name__
- primary_context_type: type | None = None
- primary_response_type: type | None = None
+ primary_context_type: typing.Optional[type] = None
+ primary_response_type: typing.Optional[type] = None
primary_events: list[type] = []
# Extract generic type parameters from primary step
@@ -250,8 +257,8 @@ def class_diagram(self) -> str:
# Process fallback step
fallback_name = fallback_step.__name__
- fallback_context_type: type | None = None
- fallback_response_type: type | None = None
+ fallback_context_type: typing.Optional[type] = None
+ fallback_response_type: typing.Optional[type] = None
fallback_events: list[type] = []
# Extract generic type parameters from fallback step
@@ -290,8 +297,8 @@ def class_diagram(self) -> str:
else:
# Regular step
step_name = step_item.__name__
- context_type: type | None = None
- response_type: type | None = None
+ context_type: typing.Optional[type] = None
+ response_type: typing.Optional[type] = None
step_events: list[type] = []
# Extract generic type parameters from __orig_bases__
diff --git a/src/cqrs/saga/saga.py b/src/cqrs/saga/saga.py
index db5a3b1..82b0ddb 100644
--- a/src/cqrs/saga/saga.py
+++ b/src/cqrs/saga/saga.py
@@ -74,7 +74,7 @@ def __init__(
context: ContextT,
container: Container,
storage: ISagaStorage,
- saga_id: uuid.UUID | None = None,
+ saga_id: typing.Optional[uuid.UUID] = None,
compensation_retry_count: int = 3,
compensation_retry_delay: float = 1.0,
compensation_retry_backoff: float = 2.0,
@@ -84,7 +84,7 @@ def __init__(
self._container = container
self._storage = storage
self._completed_steps: list[SagaStepHandler[ContextT, typing.Any]] = []
- self._error: BaseException | None = None
+ self._error: typing.Optional[BaseException] = None
self._compensated: bool = False
self._comp_retry_count = compensation_retry_count
self._comp_retry_delay = compensation_retry_delay
@@ -140,9 +140,9 @@ async def __aenter__(self) -> "SagaTransaction[ContextT]":
async def __aexit__(
self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: types.TracebackType | None,
+ exc_type: typing.Optional[type[BaseException]],
+ exc_val: typing.Optional[BaseException],
+ exc_tb: typing.Optional[types.TracebackType],
) -> bool:
# If an exception occurred, compensate all completed steps.
# Do not compensate on GeneratorExit: consumer stopped iteration intentionally
@@ -222,13 +222,13 @@ def _build_run_scoped_components(
async def _execute(
self,
- run: SagaStorageRun | None,
+ run: typing.Optional[SagaStorageRun],
) -> typing.AsyncIterator[SagaStepResult[ContextT, typing.Any]]:
"""
Execute the saga's configured steps, using the provided storage run for checkpointed operations when available, and perform recovery and compensation as required.
Parameters:
- run (SagaStorageRun | None): Optional per-saga storage run. When provided, the run is used for loading saga state, creating run-scoped managers/executors, and committing at checkpoint boundaries. When None, the transaction's internal managers and executors are used.
+ run (typing.Optional[SagaStorageRun ]): Optional per-saga storage run. When provided, the run is used for loading saga state, creating run-scoped managers/executors, and committing at checkpoint boundaries. When None, the transaction's internal managers and executors are used.
Returns:
Async iterator that yields SagaStepResult values for each step that completes; each yielded result will include the transaction's saga_id.
@@ -326,14 +326,12 @@ async def _execute(
try:
for step_item in self._saga.steps:
if isinstance(step_item, Fallback):
- (
- step_result,
- executed_step,
- ) = await fallback_executor.execute_fallback_step(
+ fallback_result = await fallback_executor.execute_fallback_step(
step_item,
completed_step_names,
)
- if step_result is not None and executed_step is not None:
+ if fallback_result is not None:
+ step_result, executed_step = fallback_result
self._completed_steps.append(executed_step)
if run is not None:
await run.commit()
@@ -341,7 +339,7 @@ async def _execute(
step_result,
saga_id=self._saga_id,
)
- elif executed_step is None:
+ else:
primary_name = step_item.step.__name__
fallback_name = step_item.fallback.__name__
if primary_name in completed_step_names:
@@ -439,7 +437,7 @@ class OrderSaga(Saga[OrderContext]):
they handle the correct context type.
"""
- steps: typing.ClassVar[list[type[SagaStepHandler] | Fallback]] = []
+ steps: typing.ClassVar[list[typing.Union[type[SagaStepHandler], Fallback]]] = []
def __init_subclass__(cls, **kwargs: typing.Any) -> None:
"""Validate steps when subclass is created."""
@@ -481,7 +479,7 @@ def transaction(
context: ContextT,
container: Container,
storage: ISagaStorage,
- saga_id: uuid.UUID | None = None,
+ saga_id: typing.Optional[uuid.UUID] = None,
compensation_retry_count: int = 3,
compensation_retry_delay: float = 1.0,
compensation_retry_backoff: float = 2.0,
diff --git a/src/cqrs/saga/step.py b/src/cqrs/saga/step.py
index 2495d1d..5a6c21e 100644
--- a/src/cqrs/saga/step.py
+++ b/src/cqrs/saga/step.py
@@ -9,7 +9,7 @@
from cqrs.response import IResponse
from cqrs.saga.models import ContextT
-Resp = typing.TypeVar("Resp", bound=IResponse | None, covariant=True)
+Resp = typing.TypeVar("Resp", bound=typing.Optional[IResponse], covariant=True)
@dataclasses.dataclass(frozen=True)
@@ -43,10 +43,10 @@ class SagaStepResult(typing.Generic[ContextT, Resp]):
response: Resp
step_type: type[SagaStepHandler[ContextT, Resp]]
with_error: bool = False
- error_message: str | None = None
- error_traceback: list[str] | None = None
- error_type: typing.Type[Exception] | None = None
- saga_id: uuid.UUID | None = None
+ error_message: typing.Optional[str] = None
+ error_traceback: typing.Optional[list[str]] = None
+ error_type: typing.Optional[typing.Type[Exception]] = None
+ saga_id: typing.Optional[uuid.UUID] = None
class SagaStepHandler(abc.ABC, typing.Generic[ContextT, Resp]):
@@ -112,11 +112,11 @@ async def compensate(self, context: OrderContext) -> None:
def _generate_step_result(
self,
- response: IResponse | None,
+ response: typing.Optional[IResponse],
with_error: bool = False,
- error_message: str | None = None,
- error_traceback: list[str] | None = None,
- error_type: typing.Type[Exception] | None = None,
+ error_message: typing.Optional[str] = None,
+ error_traceback: typing.Optional[list[str]] = None,
+ error_type: typing.Optional[typing.Type[Exception]] = None,
) -> SagaStepResult[ContextT, Resp]:
"""
Generate a SagaStepResult with proper typing from the class.
diff --git a/src/cqrs/saga/storage/memory.py b/src/cqrs/saga/storage/memory.py
index 504f4a6..6773af7 100644
--- a/src/cqrs/saga/storage/memory.py
+++ b/src/cqrs/saga/storage/memory.py
@@ -47,7 +47,7 @@ async def update_context(
self,
saga_id: uuid.UUID,
context: dict[str, typing.Any],
- current_version: int | None = None,
+ current_version: typing.Optional[int] = None,
) -> None:
"""
Update the stored context for the given saga.
@@ -55,7 +55,7 @@ async def update_context(
Parameters:
saga_id (uuid.UUID): Identifier of the saga whose context will be updated.
context (dict[str, typing.Any]): New context to store for the saga.
- current_version (int | None): If provided, require the stored saga version to match this value (optimistic locking).
+ current_version (typing.Optional[int ]): If provided, require the stored saga version to match this value (optimistic locking).
Raises:
ValueError: If the saga_id does not exist.
@@ -86,7 +86,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
"""
Log a step entry for the given saga into the underlying storage.
@@ -96,7 +96,7 @@ async def log_step(
step_name (str): Name of the saga step.
action (Literal["act", "compensate"]): Whether the step is a forward action ("act") or a compensation ("compensate").
status (SagaStepStatus): Outcome status of the step.
- details (str | None): Optional free-form details or metadata about the step.
+ details (typing.Optional[str ]): Optional free-form details or metadata about the step.
"""
await self._storage.log_step(
saga_id,
@@ -225,7 +225,7 @@ async def update_context(
self,
saga_id: uuid.UUID,
context: dict[str, typing.Any],
- current_version: int | None = None,
+ current_version: typing.Optional[int] = None,
) -> None:
if saga_id not in self._sagas:
raise ValueError(f"Saga {saga_id} not found")
@@ -260,7 +260,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
if saga_id not in self._sagas:
raise ValueError(f"Saga {saga_id} not found")
@@ -301,8 +301,8 @@ async def get_sagas_for_recovery(
self,
limit: int,
max_recovery_attempts: int = 5,
- stale_after_seconds: int | None = None,
- saga_name: str | None = None,
+ stale_after_seconds: typing.Optional[int] = None,
+ saga_name: typing.Optional[str] = None,
) -> list[uuid.UUID]:
"""
Selects saga IDs eligible for recovery based on status, recovery attempts, staleness, and an optional name filter.
@@ -310,8 +310,8 @@ async def get_sagas_for_recovery(
Parameters:
limit (int): Maximum number of saga IDs to return.
max_recovery_attempts (int): Upper bound (exclusive) on recovery attempts; only sagas with fewer attempts are considered.
- stale_after_seconds (int | None): If provided, only sagas last updated earlier than this many seconds before now are considered; if None, staleness is ignored.
- saga_name (str | None): If provided, only sagas with this name are considered; if None, name is not filtered.
+ stale_after_seconds (typing.Optional[int ]): If provided, only sagas last updated earlier than this many seconds before now are considered; if None, staleness is ignored.
+ saga_name (typing.Optional[str ]): If provided, only sagas with this name are considered; if None, name is not filtered.
Returns:
list[uuid.UUID]: Up to `limit` saga IDs sorted by oldest `updated_at` first that match the recovery criteria.
@@ -333,7 +333,7 @@ async def get_sagas_for_recovery(
async def increment_recovery_attempts(
self,
saga_id: uuid.UUID,
- new_status: SagaStatus | None = None,
+ new_status: typing.Optional[SagaStatus] = None,
) -> None:
if saga_id not in self._sagas:
raise ValueError(f"Saga {saga_id} not found")
diff --git a/src/cqrs/saga/storage/models.py b/src/cqrs/saga/storage/models.py
index 6e6baf8..b27f5d6 100644
--- a/src/cqrs/saga/storage/models.py
+++ b/src/cqrs/saga/storage/models.py
@@ -18,7 +18,7 @@ def __init__(
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
timestamp: datetime.datetime,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
self.saga_id = saga_id
self.step_name = step_name
diff --git a/src/cqrs/saga/storage/protocol.py b/src/cqrs/saga/storage/protocol.py
index 27fcd88..c4d4300 100644
--- a/src/cqrs/saga/storage/protocol.py
+++ b/src/cqrs/saga/storage/protocol.py
@@ -33,7 +33,7 @@ async def update_context(
self,
saga_id: uuid.UUID,
context: dict[str, typing.Any],
- current_version: int | None = None,
+ current_version: typing.Optional[int] = None,
) -> None:
"""
Persist a snapshot of the saga's execution context, optionally using optimistic locking.
@@ -41,7 +41,7 @@ async def update_context(
Parameters:
saga_id (uuid.UUID): Identifier of the saga to update.
context (dict[str, Any]): JSON-serializable context object to store as the new snapshot.
- current_version (int | None): If provided, perform an optimistic-locking update that succeeds only
+ current_version (typing.Optional[int ]): If provided, perform an optimistic-locking update that succeeds only
if the stored version matches this value; on success the stored version is incremented.
Raises:
@@ -70,7 +70,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
"""
Append a step transition to the saga's execution log.
@@ -80,7 +80,7 @@ async def log_step(
step_name (str): Logical name of the step (used for diagnostics and replay).
action (Literal["act", "compensate"]): Whether this entry records the primary action ("act") or its compensating action ("compensate").
status (SagaStepStatus): The step transition status to record (e.g., started, completed, failed, compensated).
- details (str | None): Optional human-readable details or diagnostics about the transition.
+ details (typing.Optional[str ]): Optional human-readable details or diagnostics about the transition.
"""
async def load_saga_state(
@@ -164,7 +164,7 @@ async def update_context(
self,
saga_id: uuid.UUID,
context: dict[str, typing.Any],
- current_version: int | None = None,
+ current_version: typing.Optional[int] = None,
) -> None:
"""Save saga context snapshot (e.g. after a step completes).
@@ -209,7 +209,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
"""Append a step transition to the saga log.
@@ -271,8 +271,8 @@ async def get_sagas_for_recovery(
self,
limit: int,
max_recovery_attempts: int = 5,
- stale_after_seconds: int | None = None,
- saga_name: str | None = None,
+ stale_after_seconds: typing.Optional[int] = None,
+ saga_name: typing.Optional[str] = None,
) -> list[uuid.UUID]:
"""Return saga IDs that are candidates for recovery.
@@ -307,7 +307,7 @@ async def get_sagas_for_recovery(
async def increment_recovery_attempts(
self,
saga_id: uuid.UUID,
- new_status: SagaStatus | None = None,
+ new_status: typing.Optional[SagaStatus] = None,
) -> None:
"""Increment recovery attempt counter after a failed recovery run.
diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py
index e8128bd..584fc89 100644
--- a/src/cqrs/saga/storage/sqlalchemy.py
+++ b/src/cqrs/saga/storage/sqlalchemy.py
@@ -182,7 +182,7 @@ async def update_context(
self,
saga_id: uuid.UUID,
context: dict[str, typing.Any],
- current_version: int | None = None,
+ current_version: typing.Optional[int] = None,
) -> None:
"""
Update the stored context for a saga and increment its version, optionally enforcing an optimistic version check.
@@ -190,7 +190,7 @@ async def update_context(
Parameters:
saga_id (uuid.UUID): Identifier of the saga to update.
context (dict[str, typing.Any]): New serialized saga context to persist.
- current_version (int | None): If provided, require the saga's current version to match this value before updating.
+ current_version (typing.Optional[int ]): If provided, require the saga's current version to match this value before updating.
Raises:
SagaConcurrencyError: If an optimistic version check fails (indicating a concurrent modification) or if the saga does not exist when a version was supplied.
@@ -256,7 +256,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
"""
Record a saga step event by creating and staging a log entry in the active session.
@@ -266,7 +266,7 @@ async def log_step(
step_name (str): Name of the step being recorded.
action (Literal["act", "compensate"]): The performed action: "act" for normal action or "compensate" for compensation.
status (SagaStepStatus): The step's outcome status.
- details (str | None): Optional free-form details or error message associated with the step.
+ details (typing.Optional[str ]): Optional free-form details or error message associated with the step.
"""
log_entry = SagaLogModel(
saga_id=saga_id,
@@ -343,7 +343,7 @@ async def get_step_history(
if row.created_at.tzinfo is None
else row.created_at,
),
- details=typing.cast(str | None, row.details),
+ details=typing.cast(typing.Optional[str], row.details),
)
for row in rows
]
@@ -438,7 +438,7 @@ async def update_context(
self,
saga_id: uuid.UUID,
context: dict[str, typing.Any],
- current_version: int | None = None,
+ current_version: typing.Optional[int] = None,
) -> None:
async with self.session_factory() as session:
try:
@@ -508,7 +508,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
async with self.session_factory() as session:
try:
@@ -576,7 +576,7 @@ async def get_step_history(
if row.created_at.tzinfo is None
else row.created_at,
),
- details=typing.cast(str | None, row.details),
+ details=typing.cast(typing.Optional[str], row.details),
)
for row in rows
]
@@ -585,8 +585,8 @@ async def get_sagas_for_recovery(
self,
limit: int,
max_recovery_attempts: int = 5,
- stale_after_seconds: int | None = None,
- saga_name: str | None = None,
+ stale_after_seconds: typing.Optional[int] = None,
+ saga_name: typing.Optional[str] = None,
) -> list[uuid.UUID]:
recoverable = (
SagaStatus.RUNNING,
@@ -615,14 +615,14 @@ async def get_sagas_for_recovery(
async def increment_recovery_attempts(
self,
saga_id: uuid.UUID,
- new_status: SagaStatus | None = None,
+ new_status: typing.Optional[SagaStatus] = None,
) -> None:
"""
Increment the recovery attempts counter for the given saga execution and optionally update its status.
Parameters:
saga_id (uuid.UUID): Identifier of the saga execution to update.
- new_status (SagaStatus | None): If provided, set the saga's status to this value.
+ new_status (typing.Optional[SagaStatus ]): If provided, set the saga's status to this value.
Raises:
ValueError: If no saga execution exists with the given `saga_id`.
diff --git a/src/cqrs/saga/validation.py b/src/cqrs/saga/validation.py
index 688af2f..d00b5ff 100644
--- a/src/cqrs/saga/validation.py
+++ b/src/cqrs/saga/validation.py
@@ -14,7 +14,7 @@ class SagaContextTypeExtractor:
"""Extracts context type from Generic type parameters."""
@staticmethod
- def extract_from_class(klass: type, saga_base_class: type) -> type | None:
+ def extract_from_class(klass: type, saga_base_class: type) -> typing.Optional[type]:
"""
Extract context type from a class that inherits from a Generic base.
@@ -48,7 +48,7 @@ def extract_from_class(klass: type, saga_base_class: type) -> type | None:
return None
@staticmethod
- def extract_from_step(step_type: type) -> type | None:
+ def extract_from_step(step_type: type) -> typing.Optional[type]:
"""
Extract context type from a SagaStepHandler class.
@@ -162,7 +162,7 @@ class SagaStepValidator:
def __init__(
self,
saga_name: str,
- context_type: type | None = None,
+ context_type: typing.Optional[type] = None,
) -> None:
"""
Initialize validator.
@@ -178,7 +178,7 @@ def __init__(
def validate_steps(
self,
- steps: list[type[SagaStepHandler] | Fallback],
+ steps: list[typing.Union[type[SagaStepHandler], Fallback]],
) -> None:
"""
Validate saga steps.
diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py
index 603269c..331bf60 100644
--- a/tests/benchmarks/conftest.py
+++ b/tests/benchmarks/conftest.py
@@ -5,6 +5,7 @@
import asyncio
import contextlib
import os
+import typing
import pytest
from sqlalchemy.ext.asyncio import create_async_engine
@@ -35,13 +36,13 @@ def create_run(
@pytest.fixture(scope="session")
-def database_dsn() -> str | None:
+def database_dsn() -> typing.Optional[str]:
"""DATABASE_DSN from environment (set in CI by pytest-config.ini)."""
return os.environ.get("DATABASE_DSN") or None
@pytest.fixture(scope="session")
-def saga_benchmark_loop_and_engine(database_dsn: str | None):
+def saga_benchmark_loop_and_engine(database_dsn: typing.Optional[str]):
"""
One event loop and one async engine for the whole benchmark session.
Used by saga SQLAlchemy benchmarks so connection setup/teardown is not measured.
diff --git a/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py b/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py
index 82dc2af..6959dc8 100644
--- a/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py
+++ b/tests/benchmarks/dataclasses/test_benchmark_cor_request_handler.py
@@ -24,12 +24,12 @@ class TResult(cqrs.DCResponse):
message: str = ""
-class HandlerA(CORRequestHandler[TRequest, TResult | None]):
+class HandlerA(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
if request.method == "method_a":
return TResult(
success=True,
@@ -39,12 +39,12 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class HandlerB(CORRequestHandler[TRequest, TResult | None]):
+class HandlerB(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
if request.method == "method_b":
return TResult(
success=True,
@@ -54,12 +54,12 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class HandlerC(CORRequestHandler[TRequest, TResult | None]):
+class HandlerC(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
if request.method == "method_c":
return TResult(
success=True,
@@ -69,12 +69,12 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class DefaultHandler(CORRequestHandler[TRequest, TResult | None]):
+class DefaultHandler(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
return TResult(
success=False,
handler_name="DefaultHandler",
diff --git a/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py b/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py
index feb6dbc..ab5bcec 100644
--- a/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py
+++ b/tests/benchmarks/dataclasses/test_benchmark_event_handler_chain.py
@@ -2,6 +2,7 @@
import asyncio
import dataclasses
+import typing
import pytest
@@ -63,7 +64,7 @@ async def handle(self, event: _EventL3) -> None:
class _ChainContainer(Container[object]):
def __init__(self) -> None:
- self._external: object | None = None
+ self._external: typing.Optional[object] = None
@property
def external_container(self) -> object:
diff --git a/tests/benchmarks/default/test_benchmark_cor_request_handler.py b/tests/benchmarks/default/test_benchmark_cor_request_handler.py
index 08963be..1ee56fc 100644
--- a/tests/benchmarks/default/test_benchmark_cor_request_handler.py
+++ b/tests/benchmarks/default/test_benchmark_cor_request_handler.py
@@ -21,12 +21,12 @@ class TResult(cqrs.Response):
message: str = ""
-class HandlerA(CORRequestHandler[TRequest, TResult | None]):
+class HandlerA(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
if request.method == "method_a":
return TResult(
success=True,
@@ -36,12 +36,12 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class HandlerB(CORRequestHandler[TRequest, TResult | None]):
+class HandlerB(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
if request.method == "method_b":
return TResult(
success=True,
@@ -51,12 +51,12 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class HandlerC(CORRequestHandler[TRequest, TResult | None]):
+class HandlerC(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
if request.method == "method_c":
return TResult(
success=True,
@@ -66,12 +66,12 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class DefaultHandler(CORRequestHandler[TRequest, TResult | None]):
+class DefaultHandler(CORRequestHandler[TRequest, TResult]):
@property
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
return TResult(
success=False,
handler_name="DefaultHandler",
diff --git a/tests/benchmarks/default/test_benchmark_event_handler_chain.py b/tests/benchmarks/default/test_benchmark_event_handler_chain.py
index a335f54..924fdb7 100644
--- a/tests/benchmarks/default/test_benchmark_event_handler_chain.py
+++ b/tests/benchmarks/default/test_benchmark_event_handler_chain.py
@@ -1,6 +1,7 @@
"""Benchmarks: 3-level event chain, volume >> semaphore (parallel follow-ups)."""
import asyncio
+import typing
import pydantic
import pytest
@@ -66,7 +67,7 @@ def __init__(self) -> None:
self._h1 = _HandlerL1()
self._h2 = _HandlerL2()
self._h3 = _HandlerL3()
- self._external: object | None = None
+ self._external: typing.Optional[object] = None
@property
def external_container(self) -> object:
diff --git a/tests/integration/fixtures.py b/tests/integration/fixtures.py
index cbb1f5b..067f976 100644
--- a/tests/integration/fixtures.py
+++ b/tests/integration/fixtures.py
@@ -1,4 +1,3 @@
-import contextlib
import functools
import os
@@ -39,9 +38,11 @@ async def session(init_orm):
DATABASE_DSN,
isolation_level="REPEATABLE READ",
)
- session = async_sessionmaker(engine_factory())()
- async with contextlib.aclosing(session):
- yield session
+ db_session = async_sessionmaker(engine_factory())()
+ try:
+ yield db_session
+ finally:
+ await db_session.close()
# --- Saga storage: MySQL (отдельные фикстуры, поднимают схему и всё необходимое) ---
diff --git a/tests/integration/test_decompression.py b/tests/integration/test_decompression.py
index 3501253..0da11fd 100644
--- a/tests/integration/test_decompression.py
+++ b/tests/integration/test_decompression.py
@@ -18,7 +18,7 @@ async def test_decompression_positive(session):
repository.add(event)
await session.commit()
- read_event: outbox_repository.OutboxedEvent | None = next(
+ read_event: typing.Optional[outbox_repository.OutboxedEvent] = next(
iter(
await repository.get_many(
batch_size=1,
diff --git a/tests/integration/test_event_outbox.py b/tests/integration/test_event_outbox.py
index 40b11e7..f9f6047 100644
--- a/tests/integration/test_event_outbox.py
+++ b/tests/integration/test_event_outbox.py
@@ -128,7 +128,7 @@ async def test_get_new_event_positive(self, session):
await OutboxRequestHandler(repository).handle(request)
[event_over_get_all_events_method] = await repository.get_many(1)
- event: outbox_repository.OutboxedEvent | None = next(
+ event: typing.Optional[outbox_repository.OutboxedEvent] = next(
iter(
await repository.get_many(
batch_size=1,
diff --git a/tests/integration/test_kafka_producer.py b/tests/integration/test_kafka_producer.py
index b4e9d19..fd5dd1f 100644
--- a/tests/integration/test_kafka_producer.py
+++ b/tests/integration/test_kafka_producer.py
@@ -71,7 +71,7 @@ async def test_produce_some_event(
mediator: cqrs.RequestMediator,
kafka_producer,
) -> None:
- handler: CloseMeetingRoomCommandHandler | None = await MockContainer().resolve(
+ handler: typing.Optional[CloseMeetingRoomCommandHandler] = await MockContainer().resolve(
CloseMeetingRoomCommandHandler,
) # noqa
command = CloseMeetingRoomCommand(meeting_room_id=uuid.uuid4())
diff --git a/tests/integration/test_saga_mediator_memory.py b/tests/integration/test_saga_mediator_memory.py
index b64139d..d3b1773 100644
--- a/tests/integration/test_saga_mediator_memory.py
+++ b/tests/integration/test_saga_mediator_memory.py
@@ -27,9 +27,9 @@ class OrderContext(SagaContext):
order_id: str
user_id: str
amount: float
- inventory_id: str | None = None
- payment_id: str | None = None
- shipment_id: str | None = None
+ inventory_id: typing.Optional[str] = None
+ payment_id: typing.Optional[str] = None
+ shipment_id: typing.Optional[str] = None
class ReserveInventoryResponse(Response):
diff --git a/tests/integration/test_streaming_mediator.py b/tests/integration/test_streaming_mediator.py
index 66ecb6f..bdd2532 100644
--- a/tests/integration/test_streaming_mediator.py
+++ b/tests/integration/test_streaming_mediator.py
@@ -100,7 +100,7 @@ async def test_streaming_mediator_integration(
kafka_producer,
) -> None:
mediator, container = streaming_mediator
- handler: ProcessItemsCommandHandler | None = await container.resolve(
+ handler: typing.Optional[ProcessItemsCommandHandler] = await container.resolve(
ProcessItemsCommandHandler,
)
@@ -134,7 +134,7 @@ async def test_streaming_mediator_events_emitted_after_each_yield(
kafka_producer,
) -> None:
mediator, container = streaming_mediator
- _: ProcessItemsCommandHandler | None = await container.resolve(
+ _: typing.Optional[ProcessItemsCommandHandler] = await container.resolve(
ProcessItemsCommandHandler,
)
@@ -162,7 +162,7 @@ async def test_streaming_mediator_empty_items_list(
kafka_producer,
) -> None:
mediator, container = streaming_mediator
- handler: ProcessItemsCommandHandler | None = await container.resolve(
+ handler: typing.Optional[ProcessItemsCommandHandler] = await container.resolve(
ProcessItemsCommandHandler,
)
@@ -183,7 +183,7 @@ async def test_streaming_mediator_single_item(
kafka_producer,
) -> None:
mediator, container = streaming_mediator
- handler: ProcessItemsCommandHandler | None = await container.resolve(
+ handler: typing.Optional[ProcessItemsCommandHandler] = await container.resolve(
ProcessItemsCommandHandler,
)
diff --git a/tests/unit/test_cor_mermaid.py b/tests/unit/test_cor_mermaid.py
index 2906629..27befe5 100644
--- a/tests/unit/test_cor_mermaid.py
+++ b/tests/unit/test_cor_mermaid.py
@@ -15,7 +15,7 @@ class ProcessPaymentCommand(cqrs.Request):
class PaymentResult(cqrs.Response):
success: bool
- transaction_id: str | None = None
+ transaction_id: typing.Optional[str] = None
message: str = ""
@@ -24,7 +24,7 @@ class CreditCardHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResult])
def events(self) -> typing.List:
return []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
if request.payment_method == "credit_card":
return PaymentResult(
success=True,
@@ -39,7 +39,7 @@ class PayPalHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResult]):
def events(self) -> typing.List:
return []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
if request.payment_method == "paypal":
return PaymentResult(
success=True,
@@ -54,7 +54,7 @@ class BankTransferHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResult
def events(self) -> typing.List:
return []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
if request.payment_method == "bank_transfer":
return PaymentResult(
success=True,
@@ -69,7 +69,7 @@ class DefaultPaymentHandler(CORRequestHandler[ProcessPaymentCommand, PaymentResu
def events(self) -> typing.List:
return []
- async def handle(self, request: ProcessPaymentCommand) -> PaymentResult | None:
+ async def handle(self, request: ProcessPaymentCommand) -> typing.Optional[PaymentResult]:
return PaymentResult(
success=False,
message=f"Unsupported payment method: {request.payment_method}",
@@ -179,8 +179,8 @@ def test_class_diagram_basic_structure() -> None:
assert "classDiagram" in diagram
assert "class CORRequestHandler" in diagram
assert "<>" in diagram
- assert "+handle(request) Response | None" in diagram
- assert "+next(request) Response | None" in diagram
+ assert "+handle(request) typing.Optional[Response ]" in diagram
+ assert "+next(request) typing.Optional[Response ]" in diagram
assert "+set_next(handler) CORRequestHandler" in diagram
# Check handler classes
@@ -189,8 +189,8 @@ def test_class_diagram_basic_structure() -> None:
assert "class BankTransferHandler" in diagram
# Check handler methods
- assert "+handle(request) Response | None" in diagram
- assert "+next(request) Response | None" in diagram
+ assert "+handle(request) typing.Optional[Response ]" in diagram
+ assert "+next(request) typing.Optional[Response ]" in diagram
assert "+events: List[Event]" in diagram
# Check types are included
diff --git a/tests/unit/test_cor_request_handler.py b/tests/unit/test_cor_request_handler.py
index da180fa..abd1c95 100644
--- a/tests/unit/test_cor_request_handler.py
+++ b/tests/unit/test_cor_request_handler.py
@@ -18,7 +18,7 @@ class TResult(cqrs.Response):
message: str = ""
-class TestHandlerA(CORRequestHandler[TRequest, TResult | None]):
+class TestHandlerA(CORRequestHandler[TRequest, TResult]):
"""Test handler that processes method_a and tracks calls."""
call_count: int = 0
@@ -27,7 +27,7 @@ class TestHandlerA(CORRequestHandler[TRequest, TResult | None]):
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
TestHandlerA.call_count += 1
if request.method == "method_a":
@@ -40,7 +40,7 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class TestHandlerB(CORRequestHandler[TRequest, TResult | None]):
+class TestHandlerB(CORRequestHandler[TRequest, TResult]):
"""Test handler that processes method_b and tracks calls."""
call_count: int = 0
@@ -49,7 +49,7 @@ class TestHandlerB(CORRequestHandler[TRequest, TResult | None]):
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
TestHandlerB.call_count += 1
if request.method == "method_b":
@@ -62,7 +62,7 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class TestHandlerC(CORRequestHandler[TRequest, TResult | None]):
+class TestHandlerC(CORRequestHandler[TRequest, TResult]):
"""Test handler that processes method_c and tracks calls."""
call_count: int = 0
@@ -71,7 +71,7 @@ class TestHandlerC(CORRequestHandler[TRequest, TResult | None]):
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
TestHandlerC.call_count += 1
if request.method == "method_c":
@@ -84,7 +84,7 @@ async def handle(self, request: TRequest) -> TResult | None:
return await self.next(request)
-class DefaultTestHandler(CORRequestHandler[TRequest, TResult | None]):
+class DefaultTestHandler(CORRequestHandler[TRequest, TResult]):
"""Default handler that always handles the request (end of chain)."""
call_count: int = 0
@@ -93,7 +93,7 @@ class DefaultTestHandler(CORRequestHandler[TRequest, TResult | None]):
def events(self) -> typing.Sequence[cqrs.IEvent]:
return []
- async def handle(self, request: TRequest) -> TResult | None:
+ async def handle(self, request: TRequest) -> typing.Optional[TResult]:
DefaultTestHandler.call_count += 1
return TResult(
success=False,
diff --git a/tests/unit/test_dispatcher.py b/tests/unit/test_dispatcher.py
index 084c484..abe7ca4 100644
--- a/tests/unit/test_dispatcher.py
+++ b/tests/unit/test_dispatcher.py
@@ -1,4 +1,5 @@
from uuid import UUID, uuid4
+import typing
import pydantic
@@ -13,12 +14,12 @@
class ReadMeetingDetailsQuery(Request):
meeting_room_id: UUID = pydantic.Field()
- status: str | None = pydantic.Field(default=None)
+ status: typing.Optional[str] = pydantic.Field(default=None)
class ReadMeetingDetailsQueryResult(Response):
meeting_room_id: UUID = pydantic.Field()
- status: str | None = pydantic.Field(default=None)
+ status: typing.Optional[str] = pydantic.Field(default=None)
class ReadMeetingDetailsQueryHandler(
diff --git a/tests/unit/test_event_processor.py b/tests/unit/test_event_processor.py
index dc54117..197bfeb 100644
--- a/tests/unit/test_event_processor.py
+++ b/tests/unit/test_event_processor.py
@@ -1,4 +1,5 @@
import asyncio
+import typing
from unittest import mock
import pydantic
@@ -279,7 +280,7 @@ def events(self) -> tuple[_ChainedEvent, ...]:
return (_ChainedEvent(level=self._last.level + 1, seq=self._last.seq),)
def __init__(self) -> None:
- self._last: _ChainedEvent | None = None
+ self._last: typing.Optional[_ChainedEvent] = None
async def handle(self, event: _ChainedEvent) -> None:
self._last = event
diff --git a/tests/unit/test_multi_level_events_parameterized.py b/tests/unit/test_multi_level_events_parameterized.py
index 85fc31a..028fbf8 100644
--- a/tests/unit/test_multi_level_events_parameterized.py
+++ b/tests/unit/test_multi_level_events_parameterized.py
@@ -51,7 +51,7 @@ async def handle(self, event: _FanEvent) -> None:
class _FanContainer(Container[object]):
def __init__(self, handler: _FanHandler) -> None:
self._handler = handler
- self._external: object | None = None
+ self._external: typing.Optional[object] = None
@property
def external_container(self) -> object:
@@ -153,7 +153,7 @@ def __init__(self) -> None:
self._h1 = _HandlerL1()
self._h2 = _HandlerL2()
self._h3 = _HandlerL3()
- self._external: object | None = None
+ self._external: typing.Optional[object] = None
@property
def external_container(self) -> object:
diff --git a/tests/unit/test_request_response.py b/tests/unit/test_request_response.py
index 6ae3a65..1fb4351 100644
--- a/tests/unit/test_request_response.py
+++ b/tests/unit/test_request_response.py
@@ -1,4 +1,5 @@
from uuid import UUID, uuid4
+import typing
import pydantic
import pytest
@@ -62,7 +63,7 @@ class TestContainer:
async def resolve(
self,
type_,
- ) -> CloseMeetingRoomCommandHandler | ReadMeetingDetailsQueryHandler:
+ ) -> typing.Union[CloseMeetingRoomCommandHandler, ReadMeetingDetailsQueryHandler]:
if type_ is CloseMeetingRoomCommandHandler:
return self.close_meeting_room_command_handler
elif type_ is ReadMeetingDetailsQueryHandler:
diff --git a/tests/unit/test_saga/conftest.py b/tests/unit/test_saga/conftest.py
index 0bc055d..1d2e827 100644
--- a/tests/unit/test_saga/conftest.py
+++ b/tests/unit/test_saga/conftest.py
@@ -42,7 +42,7 @@ def __init__(self) -> None:
self._events: list[Event] = []
self.act_called = False
self.compensate_called = False
- self._inventory_id: str | None = None
+ self._inventory_id: typing.Optional[str] = None
@property
def events(self) -> list[Event]:
@@ -70,7 +70,7 @@ def __init__(self) -> None:
self._events: list[Event] = []
self.act_called = False
self.compensate_called = False
- self._payment_id: str | None = None
+ self._payment_id: typing.Optional[str] = None
@property
def events(self) -> list[Event]:
@@ -95,7 +95,7 @@ def __init__(self) -> None:
self._events: list[Event] = []
self.act_called = False
self.compensate_called = False
- self._shipment_id: str | None = None
+ self._shipment_id: typing.Optional[str] = None
@property
def events(self) -> list[Event]:
diff --git a/tests/unit/test_saga/test_fallback.py b/tests/unit/test_saga/test_fallback.py
index 81c1075..e46307a 100644
--- a/tests/unit/test_saga/test_fallback.py
+++ b/tests/unit/test_saga/test_fallback.py
@@ -53,7 +53,7 @@ def __init__(self) -> None:
self._events: list[Event] = []
self.act_called = False
self.compensate_called = False
- self._inventory_id: str | None = None
+ self._inventory_id: typing.Optional[str] = None
@property
def events(self) -> list[Event]:
@@ -491,7 +491,7 @@ class SuccessfulStep(SagaStepHandler[OrderContext, ReserveInventoryResponse]):
def __init__(self) -> None:
self._events: list[Event] = []
self.act_called = False
- self._inventory_id: str | None = None
+ self._inventory_id: typing.Optional[str] = None
@property
def events(self) -> list[Event]:
@@ -585,7 +585,7 @@ class ContextVerifyingFallbackStep(
def __init__(self) -> None:
self._events: list[Event] = []
self.act_called = False
- self._inventory_id: str | None = None
+ self._inventory_id: typing.Optional[str] = None
@property
def events(self) -> list[Event]:
@@ -647,7 +647,7 @@ def __init__(self) -> None:
self._events: list[Event] = []
self.act_called = False
self.compensate_called = False
- self._inventory_id: str | None = None
+ self._inventory_id: typing.Optional[str] = None
@property
def events(self) -> list[Event]:
diff --git a/tests/unit/test_saga/test_saga_storage_run.py b/tests/unit/test_saga/test_saga_storage_run.py
index d25ebda..dcc922b 100644
--- a/tests/unit/test_saga/test_saga_storage_run.py
+++ b/tests/unit/test_saga/test_saga_storage_run.py
@@ -42,7 +42,7 @@ async def update_context(
self,
saga_id: uuid.UUID,
context: dict,
- current_version: int | None = None,
+ current_version: typing.Optional[int] = None,
) -> None:
"""
Update the stored context for a saga, optionally validating the expected current version.
@@ -50,7 +50,7 @@ async def update_context(
Parameters:
saga_id (uuid.UUID): Identifier of the saga whose context will be updated.
context (dict): New context data to persist for the saga.
- current_version (int | None): If provided, the update will only proceed when the stored version equals this value; pass None to skip version validation.
+ current_version (typing.Optional[int ]): If provided, the update will only proceed when the stored version equals this value; pass None to skip version validation.
"""
await self._inner.update_context(saga_id, context, current_version)
@@ -70,7 +70,7 @@ async def log_step(
step_name: str,
action: typing.Literal["act", "compensate"],
status: SagaStepStatus,
- details: str | None = None,
+ details: typing.Optional[str] = None,
) -> None:
"""
Record the execution or compensation outcome of a saga step.
@@ -80,7 +80,7 @@ async def log_step(
step_name (str): Name of the step being logged.
action (Literal["act", "compensate"]): Whether this log entry is for the step's normal action ("act") or its compensation ("compensate").
status (SagaStepStatus): Resulting status of the step.
- details (str | None): Optional human-readable details or metadata about the step event.
+ details (typing.Optional[str ]): Optional human-readable details or metadata about the step event.
"""
await self._inner.log_step(saga_id, step_name, action, status, details)
@@ -121,8 +121,8 @@ async def get_sagas_for_recovery(
self,
limit: int,
max_recovery_attempts: int = 5,
- stale_after_seconds: int | None = None,
- saga_name: str | None = None,
+ stale_after_seconds: typing.Optional[int] = None,
+ saga_name: typing.Optional[str] = None,
) -> list[uuid.UUID]:
"""
Selects saga IDs that are eligible for recovery.
@@ -130,8 +130,8 @@ async def get_sagas_for_recovery(
Parameters:
limit (int): Maximum number of saga IDs to return.
max_recovery_attempts (int): Only include sagas with fewer than this many recovery attempts.
- stale_after_seconds (int | None): If provided, only include sagas last updated more than this many seconds ago; if None, do not filter by staleness.
- saga_name (str | None): If provided, restrict results to sagas with this name.
+ stale_after_seconds (typing.Optional[int ]): If provided, only include sagas last updated more than this many seconds ago; if None, do not filter by staleness.
+ saga_name (typing.Optional[str ]): If provided, restrict results to sagas with this name.
Returns:
list[uuid.UUID]: Saga UUIDs that match the recovery criteria, up to `limit`.
@@ -146,14 +146,14 @@ async def get_sagas_for_recovery(
async def increment_recovery_attempts(
self,
saga_id: uuid.UUID,
- new_status: SagaStatus | None = None,
+ new_status: typing.Optional[SagaStatus] = None,
) -> None:
"""
Increment the recovery-attempts counter for a saga and optionally update its status.
Parameters:
saga_id (uuid.UUID): Identifier of the saga whose recovery attempts should be incremented.
- new_status (SagaStatus | None): If provided, update the saga's status to this value after incrementing attempts; otherwise leave status unchanged.
+ new_status (typing.Optional[SagaStatus ]): If provided, update the saga's status to this value after incrementing attempts; otherwise leave status unchanged.
"""
await self._inner.increment_recovery_attempts(saga_id, new_status)
@@ -270,7 +270,7 @@ class SagaWithFailure(Saga[OrderContext]):
storage = MemorySagaStorage()
saga = SagaWithFailure()
context = OrderContext(order_id="o3", user_id="u3", amount=70.0)
- saga_id: uuid.UUID | None = None
+ saga_id: typing.Optional[uuid.UUID] = None
with pytest.raises(ValueError, match="Step failed"):
async with saga.transaction(
context=context,
diff --git a/tests/unit/test_saga/test_saga_to_mermaid.py b/tests/unit/test_saga/test_saga_to_mermaid.py
index d8f8bab..8e17e0c 100644
--- a/tests/unit/test_saga/test_saga_to_mermaid.py
+++ b/tests/unit/test_saga/test_saga_to_mermaid.py
@@ -22,7 +22,7 @@ def test_to_mermaid_empty_steps(saga_container: SagaContainer) -> None:
"""Test that Mermaid handles empty steps list correctly."""
class EmptySaga(Saga[OrderContext]):
- steps: typing.ClassVar[list[type[SagaStepHandler] | Fallback]] = []
+ steps: typing.ClassVar[list[typing.Union[type[SagaStepHandler], Fallback]]] = []
saga = EmptySaga()
generator = SagaMermaid(saga)
@@ -239,7 +239,7 @@ def test_class_diagram_empty_steps(saga_container: SagaContainer) -> None:
"""Test that class_diagram() handles empty steps list correctly."""
class EmptySaga(Saga[OrderContext]):
- steps: typing.ClassVar[list[type[SagaStepHandler] | Fallback]] = []
+ steps: typing.ClassVar[list[typing.Union[type[SagaStepHandler], Fallback]]] = []
saga = EmptySaga()
generator = SagaMermaid(saga)
diff --git a/tests/unit/test_streaming_outbox_background_processing.py b/tests/unit/test_streaming_outbox_background_processing.py
index 1b07977..65b8689 100644
--- a/tests/unit/test_streaming_outbox_background_processing.py
+++ b/tests/unit/test_streaming_outbox_background_processing.py
@@ -105,8 +105,8 @@ async def handle(
class MockContainer:
def __init__(
self,
- handler: ProcessServiceStreamingHandler | None = None,
- event_handler: ServiceChangedEventHandler | None = None,
+ handler: typing.Optional[ProcessServiceStreamingHandler] = None,
+ event_handler: typing.Optional[ServiceChangedEventHandler] = None,
) -> None:
self._handler = handler
self._event_handler = event_handler