Skip to content
177 changes: 83 additions & 94 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,29 @@

## Overview

This is a package for implementing the CQRS (Command Query Responsibility Segregation) pattern in Python applications.
It provides a set of abstractions and utilities to help separate read and write use cases, ensuring better scalability,
performance, and maintainability of the application.
An event-driven framework for building distributed systems in Python. It centers on CQRS (Command Query Responsibility Segregation) and extends into messaging, sagas, and reliable event delivery — so you can separate read and write flows, react to events from the bus, run distributed transactions with compensation, and publish events via Transaction Outbox. The result is clearer structure, better scalability, and easier evolution of the application.

This package is a fork of the [diator](https://github.com/akhundMurad/diator)
project ([documentation](https://akhundmurad.github.io/diator/)) with several enhancements:

1. Support for Pydantic [v2.*](https://docs.pydantic.dev/2.8/);
2. `Kafka` support using [aiokafka](https://github.com/aio-libs/aiokafka);
3. Added `EventMediator` for handling `Notification` and `ECST` events coming from the bus;
4. Redesigned the event and request mapping mechanism to handlers;
5. Added `bootstrap` for easy setup;
6. Added support for [Transaction Outbox](https://microservices.io/patterns/data/transactional-outbox.html), ensuring
that `Notification` and `ECST` events are sent to the broker;
7. FastAPI supporting;
8. FastStream supporting;
9. [Protobuf](https://protobuf.dev/) events supporting;
10. `StreamingRequestMediator` and `StreamingRequestHandler` for handling streaming requests with real-time progress updates;
11. Parallel event processing with configurable concurrency limits;
12. Chain of Responsibility pattern support with `CORRequestHandler` for processing requests through multiple handlers in sequence;
13. Orchestrated Saga pattern support for managing distributed transactions with automatic compensation and recovery mechanisms;
14. Built-in Mermaid diagram generation, enabling automatic generation of Sequence and Class diagrams for documentation and visualization;
15. Flexible Request and Response types support - use Pydantic-based or Dataclass-based implementations, with the ability to mix and match types based on your needs.
project ([documentation](https://akhundmurad.github.io/diator/)) with several enhancements, ordered by importance:

**Core framework**

1. Redesigned the event and request mapping mechanism to handlers;
2. `EventMediator` for handling `Notification` and `ECST` events coming from the bus;
3. `bootstrap` for easy setup;
4. **Transaction Outbox**, ensuring that `Notification` and `ECST` events are sent to the broker;
5. **Orchestrated Saga** pattern for distributed transactions with automatic compensation and recovery;
6. `StreamingRequestMediator` and `StreamingRequestHandler` for streaming requests with real-time progress updates;
7. **Chain of Responsibility** with `CORRequestHandler` for processing requests through multiple handlers in sequence;
8. **Parallel event processing** with configurable concurrency limits.

**Also**

- **Typing:** Pydantic [v2.*](https://docs.pydantic.dev/2.8/) and `IRequest`/`IResponse` interfaces — use Pydantic-based, dataclass-based, or custom Request/Response implementations.
- **Broker:** Kafka via [aiokafka](https://github.com/aio-libs/aiokafka).
- **Integration:** Ready for integration with FastAPI and FastStream.
- **Documentation:** Built-in Mermaid diagram generation (Sequence and Class diagrams).
- **Protobuf:** Interface-level support for converting Notification events to Protobuf and back.

## Request Handlers

Expand Down Expand Up @@ -304,6 +304,63 @@ class CustomResponse(cqrs.IResponse):

A complete example can be found in [request_response_types.py](https://github.com/vadikko2/cqrs/blob/master/examples/request_response_types.py)

## Mapping

To bind commands, queries and events with specific handlers, you can use the registries `EventMap` and `RequestMap`.

```python
from cqrs import requests, events

from app import commands, command_handlers
from app import queries, query_handlers
from app import events as event_models, event_handlers


def init_commands(mapper: requests.RequestMap) -> None:
mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)

def init_queries(mapper: requests.RequestMap) -> None:
mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)

def init_events(mapper: events.EventMap) -> None:
mapper.bind(events.NotificationEvent[events_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
```
Comment thread
vadikko2 marked this conversation as resolved.

## Bootstrap

The `python-cqrs` package implements a set of bootstrap utilities designed to simplify the initial configuration of an
application.

```python
import functools

from cqrs.events import bootstrap as event_bootstrap
from cqrs.requests import bootstrap as request_bootstrap

from app import dependencies, mapping, orm


@functools.lru_cache
def mediator_factory():
return request_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
commands_mapper=mapping.init_commands,
queries_mapper=mapping.init_queries,
domain_events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)


@functools.lru_cache
def event_mediator_factory():
return event_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)
```

## Saga Pattern

The package implements the Orchestrated Saga pattern for managing distributed transactions across multiple services or operations.
Expand Down Expand Up @@ -689,17 +746,7 @@ loop.run_until_complete(periodically_task())
A complete example can be found in
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/kafka_outboxed_event_producing.py)

## Transaction log tailing

If the Outbox polling strategy does not suit your needs, I recommend exploring
the [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html) pattern.
The current version of the python-cqrs package does not support the implementation of this pattern.

> [!TIP]
> However, it can be implemented
> using [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html),
> which allows you to produce all newly created events within the Outbox storage directly to the corresponding topic in
> Kafka (or any other broker).
**Transaction log tailing.** If Outbox polling does not suit you, consider [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html). The package does not implement it; you can use [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html) to tail the Outbox and produce events to Kafka.

## DI container

Expand Down Expand Up @@ -765,65 +812,10 @@ Complete examples can be found in:
- [Simple example](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_simple_example.py)
- [Practical example with FastAPI](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_practical_example.py)

## Mapping

To bind commands, queries and events with specific handlers, you can use the registries `EventMap` and `RequestMap`.

```python
from cqrs import requests, events

from app import commands, command_handlers
from app import queries, query_handlers
from app import events as event_models, event_handlers


def init_commands(mapper: requests.RequestMap) -> None:
mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)

def init_queries(mapper: requests.RequestMap) -> None:
mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)

def init_events(mapper: events.EventMap) -> None:
mapper.bind(events.NotificationEvent[events_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
```

## Bootstrap

The `python-cqrs` package implements a set of bootstrap utilities designed to simplify the initial configuration of an
application.

```python
import functools

from cqrs.events import bootstrap as event_bootstrap
from cqrs.requests import bootstrap as request_bootstrap

from app import dependencies, mapping, orm


@functools.lru_cache
def mediator_factory():
return request_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
commands_mapper=mapping.init_commands,
queries_mapper=mapping.init_queries,
domain_events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)


@functools.lru_cache
def event_mediator_factory():
return event_bootstrap.bootstrap(
di_container=dependencies.setup_di(),
events_mapper=mapping.init_events,
on_startup=[orm.init_store_event_mapper],
)
```

## Integration with presentation layers

The framework is ready for integration with **FastAPI** and **FastStream**.

> [!TIP]
> I recommend reading the useful
> paper [Onion Architecture Used in Software Development](https://www.researchgate.net/publication/371006360_Onion_Architecture_Used_in_Software_Development).
Expand Down Expand Up @@ -956,8 +948,5 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastap

## Protobuf messaging

The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\\
Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data –
think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use
special generated source code to easily write and read your structured data to and from a variety of data streams and
using a variety of languages.
The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).
There is interface-level support for converting Notification events to Protobuf and back. Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.
11 changes: 7 additions & 4 deletions examples/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

4. Saga Storage and Logging:
- SagaStorage persists saga state and execution history
- MemorySagaStorage and SqlAlchemySagaStorage support create_run(): execution
uses one session per saga and checkpoint commits (fewer commits, better performance)
- Each step execution is logged (act/compensate, status, timestamp)
- Storage enables recovery of interrupted sagas
- Use storage.get_step_history() to view execution log
Expand Down Expand Up @@ -283,8 +285,7 @@ async def create_shipment(

self._shipments[shipment_id] = tracking_number
print(
f" ✓ Created shipment {shipment_id} for order {order_id} "
f"(tracking: {tracking_number})",
f" ✓ Created shipment {shipment_id} for order {order_id} " f"(tracking: {tracking_number})",
)
return shipment_id, tracking_number

Expand Down Expand Up @@ -478,8 +479,10 @@ async def run_successful_saga() -> None:
payment_service = PaymentService()
shipping_service = ShippingService()

# Create saga storage for persistence
# In production, use SQLAlchemySagaStorage or another persistent storage
# Create saga storage for persistence.
# MemorySagaStorage (and SqlAlchemySagaStorage) support create_run():
# execution uses one session per saga and checkpoint commits for better performance.
# In production, use SqlAlchemySagaStorage or another persistent storage.
storage = MemorySagaStorage()

# Setup DI container
Expand Down
5 changes: 2 additions & 3 deletions examples/saga_fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ async def act(
"""Primary step that always raises an error."""
self._call_count += 1
logger.info(
f" [PrimaryStep] Executing act() for order {context.order_id} "
f"(call #{self._call_count})...",
f" [PrimaryStep] Executing act() for order {context.order_id} " f"(call #{self._call_count})...",
)
raise RuntimeError("Primary step failed - service unavailable")

Expand Down Expand Up @@ -302,7 +301,7 @@ async def main() -> None:
),
)

# Create saga storage
# Create saga storage (supports create_run(): one session per saga, checkpoint commits)
storage = MemorySagaStorage()
di_container.bind(
di.bind_by_type(
Expand Down
4 changes: 3 additions & 1 deletion examples/saga_fastapi_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
- Saga state and execution history are persisted to SagaStorage

3. Saga Storage and Logging:
- MemorySagaStorage/SqlAlchemySagaStorage support create_run(): one session per saga,
checkpoint commits (fewer commits, better performance)
- SagaStorage persists saga state and execution history
- Each step execution is logged (act/compensate, status, timestamp)
- Storage enables recovery of interrupted sagas
Expand Down Expand Up @@ -400,7 +402,7 @@ class OrderSaga(Saga[OrderContext]):
# DI Container Setup
# ============================================================================

# Shared storage instance (in production, use persistent storage)
# Shared storage (MemorySagaStorage uses create_run(): scoped run, checkpoint commits)
saga_storage = MemorySagaStorage()

# Setup DI container
Expand Down
21 changes: 9 additions & 12 deletions examples/saga_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
================================================================================

1. Saga State Persistence:
- MemorySagaStorage/SqlAlchemySagaStorage use create_run(): one session per saga,
checkpoint commits after each step (fewer commits, better performance)
- Saga state is saved to storage after each step
- Storage tracks which steps completed successfully
- Context data is persisted for recovery
Expand Down Expand Up @@ -289,8 +291,7 @@ async def create_shipment(

self._shipments[shipment_id] = tracking_number
logger.info(
f" ✓ Created shipment {shipment_id} for order {order_id} "
f"(tracking: {tracking_number})",
f" ✓ Created shipment {shipment_id} for order {order_id} " f"(tracking: {tracking_number})",
)
return shipment_id, tracking_number

Expand Down Expand Up @@ -534,7 +535,7 @@ async def simulate_interrupted_saga() -> tuple[uuid.UUID, MemorySagaStorage]:
print("=" * 70)
print("\nSimulating server crash after first step...")

# Setup services and storage
# Setup services and storage (MemorySagaStorage uses create_run(): scoped run, checkpoint commits)
inventory_service = InventoryService()
payment_service = PaymentService()
shipping_service = ShippingService()
Expand Down Expand Up @@ -604,8 +605,7 @@ async def simulate_interrupted_saga() -> tuple[uuid.UUID, MemorySagaStorage]:
print("\n Execution log (SagaLog) before recovery:")
for entry in history:
print(
f" [{entry.timestamp.strftime('%H:%M:%S')}] "
f"{entry.step_name}.{entry.action}: {entry.status.value}",
f" [{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}: {entry.status.value}",
)

print("\n⚠️ Problem: Order is incomplete!")
Expand Down Expand Up @@ -676,8 +676,7 @@ async def recover_interrupted_saga(
print("\n Execution log (SagaLog):")
for entry in history:
print(
f" [{entry.timestamp.strftime('%H:%M:%S')}] "
f"{entry.step_name}.{entry.action}: {entry.status.value}",
f" [{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}: {entry.status.value}",
)

print("\n✅ System is now in consistent state!")
Expand All @@ -699,7 +698,7 @@ async def simulate_interrupted_compensation() -> tuple[uuid.UUID, MemorySagaStor
print("=" * 70)
print("\nSimulating failure during compensation...")

# Setup services
# Setup services and storage (scoped run via create_run() when supported)
inventory_service = InventoryService()
payment_service = PaymentService()
shipping_service = ShippingService()
Expand Down Expand Up @@ -786,8 +785,7 @@ async def resolve_with_failing_step(type_: type) -> typing.Any:
print("\n Execution log (SagaLog) before recovery:")
for entry in history:
print(
f" [{entry.timestamp.strftime('%H:%M:%S')}] "
f"{entry.step_name}.{entry.action}: {entry.status.value}",
f" [{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}: {entry.status.value}",
)

print("\n⚠️ Problem: Compensation incomplete!")
Expand Down Expand Up @@ -852,8 +850,7 @@ async def recover_interrupted_compensation(
print("\n Execution log (SagaLog):")
for entry in history:
print(
f" [{entry.timestamp.strftime('%H:%M:%S')}] "
f"{entry.step_name}.{entry.action}: {entry.status.value}",
f" [{entry.timestamp.strftime('%H:%M:%S')}] " f"{entry.step_name}.{entry.action}: {entry.status.value}",
)

print("\n✅ System is now in consistent state!")
Expand Down
4 changes: 3 additions & 1 deletion examples/saga_recovery_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
- while True with asyncio.sleep(interval_seconds)
- get_sagas_for_recovery(limit, max_recovery_attempts, stale_after_seconds)
- Per-saga recover_saga() only; increment_recovery_attempts is done inside recover_saga on failure
- When storage supports create_run() (e.g. MemorySagaStorage, SqlAlchemySagaStorage),
saga execution uses one session per saga and checkpoint commits

2. Staleness filter (stale_after_seconds):
- Only sagas not updated recently are considered (avoids recovering
Expand Down Expand Up @@ -631,7 +633,7 @@ async def main() -> None:
" 3. recover_saga() per saga (increment_recovery_attempts on failure is internal)",
)

storage = MemorySagaStorage()
storage = MemorySagaStorage() # supports create_run(): scoped run when executing sagas

saga_id = await create_interrupted_saga(storage)
storage._sagas[saga_id]["updated_at"] = datetime.datetime.now(
Expand Down
Loading
Loading