Skip to content

Commit fccf7ac

Browse files
authored
Add streaming handlers supporting (#22)
🚀 New Features Streaming Support & SSE StreamingRequestMediator: Added a new mediator designed for handling streaming requests that yield results incrementally. StreamingRequestHandler: Introduced a new handler type for processing large batches or long-running operations with real-time progress updates. FastAPI SSE Integration: Added native support and examples for using StreamingRequestMediator with Server-Sent Events (SSE) in FastAPI applications. Parallel Event Processing Concurrent Event Handling: Both RequestMediator and StreamingRequestMediator now support processing domain events in parallel. Concurrency Control: Added max_concurrent_event_handlers parameter to limit the number of simultaneously running event handlers. Configuration: Added concurrent_event_handle_enable flag to toggle between sequential and parallel execution. Dependency Injection dependency-injector Support: Added explicit support and documentation for the dependency-injector library using the DependencyInjectorCQRSContainer adapter. 📚 Documentation Updated README with comprehensive examples for: Streaming Request Handlers. Parallel Event Processing configuration. FastAPI SSE implementation. DI container setup (both di and dependency-injector).
1 parent e205176 commit fccf7ac

34 files changed

Lines changed: 3687 additions & 51 deletions

.pre-commit-config.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ repos:
33
- id: check-toml
44
- id: check-docstring-first
55
- id: check-ast
6-
- exclude: (^tests/mock/|^tests/integration/fixtures)
6+
- exclude: (^tests/mock/|^tests/integration/|^tests/fixtures)
77
id: trailing-whitespace
88
- id: end-of-file-fixer
99
- id: check-yaml
1010
- id: check-toml
1111
- id: check-added-large-files
1212
- args:
1313
- --pytest-test-first
14-
exclude: (^tests/mock/|^tests/integration/fixtures)
14+
exclude: (^tests/mock/|^tests/integration/|^tests/fixtures)
1515
id: name-tests-test
1616
- id: check-merge-conflict
1717
- id: check-json
@@ -22,7 +22,6 @@ repos:
2222
repo: https://github.com/asottile/add-trailing-comma
2323
rev: v3.1.0
2424
- hooks:
25-
- id: pretty-format-ini
2625
- args:
2726
- --autofix
2827
- --indent
@@ -58,7 +57,7 @@ repos:
5857
repo: https://github.com/astral-sh/ruff-pre-commit
5958
rev: v0.6.1
6059
- repo: https://github.com/RobertCraigie/pyright-python
61-
rev: v1.1.387
60+
rev: v1.1.380
6261
hooks:
6362
- id: pyright
6463
types: [python]

README.md

Lines changed: 148 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ project ([documentation](https://akhundmurad.github.io/diator/)) with several en
1818
that `Notification` and `ECST` events are sent to the broker;
1919
7. FastAPI supporting;
2020
8. FastStream supporting;
21-
9. [Protobuf](https://protobuf.dev/) events supporting.
21+
9. [Protobuf](https://protobuf.dev/) events supporting;
22+
10. `StreamingRequestMediator` and `StreamingRequestHandler` for handling streaming requests with real-time progress updates;
23+
11. Parallel event processing with configurable concurrency limits.
2224

2325
## Request Handlers
2426

@@ -98,6 +100,40 @@ class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryR
98100
A complete example can be found in
99101
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/request_handler.py)
100102

103+
### Streaming Request Handler
104+
105+
Streaming Request Handler processes requests incrementally and yields results as they become available.
106+
This is particularly useful for processing large batches of items, file uploads, or any operation that benefits from
107+
real-time progress updates.
108+
109+
`StreamingRequestHandler` works with `StreamingRequestMediator` that streams results to clients in real-time.
110+
111+
```python
112+
import typing
113+
from cqrs.requests.request_handler import StreamingRequestHandler
114+
from cqrs.events.event import Event
115+
116+
class ProcessFilesCommandHandler(StreamingRequestHandler[ProcessFilesCommand, FileProcessedResult]):
117+
def __init__(self):
118+
self._events: list[Event] = []
119+
120+
@property
121+
def events(self) -> list[Event]:
122+
return self._events.copy()
123+
124+
async def handle(self, request: ProcessFilesCommand) -> typing.AsyncIterator[FileProcessedResult]:
125+
for file_id in request.file_ids:
126+
# Process file
127+
result = FileProcessedResult(file_id=file_id, status="completed", ...)
128+
# Emit events
129+
self._events.append(FileProcessedEvent(file_id=file_id, ...))
130+
yield result
131+
self._events.clear()
132+
```
133+
134+
A complete example can be found in
135+
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/streaming_handler_parallel_events.py)
136+
101137
## Event Handlers
102138

103139
Event handlers are designed to process `Notification` and `ECST` events that are consumed from the broker.
@@ -129,6 +165,36 @@ class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
129165
A complete example can be found in
130166
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/domain_event_handler.py)
131167

168+
### Parallel Event Processing
169+
170+
Both `RequestMediator` and `StreamingRequestMediator` support parallel processing of domain events. You can control
171+
the number of event handlers that run simultaneously using the `max_concurrent_event_handlers` parameter.
172+
173+
This feature is especially useful when:
174+
- Multiple event handlers need to process events independently
175+
- You want to improve performance by processing events concurrently
176+
- You need to limit resource consumption by controlling concurrency
177+
178+
**Configuration:**
179+
180+
```python
181+
from cqrs.requests import bootstrap
182+
183+
mediator = bootstrap.bootstrap_streaming(
184+
di_container=container,
185+
commands_mapper=commands_mapper,
186+
domain_events_mapper=domain_events_mapper,
187+
message_broker=broker,
188+
max_concurrent_event_handlers=3, # Process up to 3 events in parallel
189+
concurrent_event_handle_enable=True, # Enable parallel processing
190+
)
191+
```
192+
193+
> [!TIP]
194+
> - Set `max_concurrent_event_handlers` to limit the number of simultaneously running event handlers
195+
> - Set `concurrent_event_handle_enable=False` to disable parallel processing and process events sequentially
196+
> - The default value for `max_concurrent_event_handlers` is `10` for `StreamingRequestMediator` and `1` for `RequestMediator`
197+
132198
## Producing Notification Events
133199

134200
During the handling of a command, `cqrs.NotificationEvent` events may be generated and then sent to the broker.
@@ -317,6 +383,10 @@ The current version of the python-cqrs package does not support the implementati
317383
Use the following example to set up dependency injection in your command, query and event handlers. This will make
318384
dependency management simpler.
319385

386+
The package supports two DI container libraries:
387+
388+
### di library
389+
320390
```python
321391
import di
322392
...
@@ -342,7 +412,35 @@ def setup_di() -> di.Container:
342412
```
343413

344414
A complete example can be found in
345-
the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/dependency_injection.py)
415+
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injection.py)
416+
417+
### dependency-injector library
418+
419+
The package also supports [dependency-injector](https://github.com/ets-labs/python-dependency-injector) library.
420+
You can use `DependencyInjectorCQRSContainer` adapter to integrate dependency-injector containers with python-cqrs.
421+
422+
```python
423+
from dependency_injector import containers, providers
424+
from cqrs.container.dependency_injector import DependencyInjectorCQRSContainer
425+
426+
class ApplicationContainer(containers.DeclarativeContainer):
427+
# Define your providers
428+
service = providers.Factory(ServiceImplementation)
429+
430+
# Create CQRS container adapter
431+
cqrs_container = DependencyInjectorCQRSContainer(ApplicationContainer())
432+
433+
# Use with bootstrap
434+
mediator = bootstrap.bootstrap(
435+
di_container=cqrs_container,
436+
commands_mapper=commands_mapper,
437+
...
438+
)
439+
```
440+
441+
Complete examples can be found in:
442+
- [Simple example](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_simple_example.py)
443+
- [Practical example with FastAPI](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_practical_example.py)
346444

347445
## Mapping
348446

@@ -485,6 +583,54 @@ async def hello_world_event_handler(
485583
A complete example can be found in
486584
the [documentation](https://github.com/vadikko2/python-cqrs/blob/master/examples/kafka_event_consuming.py)
487585

586+
### FastAPI SSE Streaming
587+
588+
`StreamingRequestMediator` is ready and designed for use with Server-Sent Events (SSE) in FastAPI applications.
589+
This allows you to stream results to clients in real-time as they are processed.
590+
591+
**Example FastAPI endpoint with SSE:**
592+
593+
```python
594+
import fastapi
595+
import json
596+
from cqrs.requests import bootstrap
597+
598+
def streaming_mediator_factory() -> cqrs.StreamingRequestMediator:
599+
return bootstrap.bootstrap_streaming(
600+
di_container=container,
601+
commands_mapper=commands_mapper,
602+
domain_events_mapper=domain_events_mapper,
603+
message_broker=broker,
604+
max_concurrent_event_handlers=3,
605+
concurrent_event_handle_enable=True,
606+
)
607+
608+
@app.post("/process-files")
609+
async def process_files_stream(
610+
command: ProcessFilesCommand,
611+
mediator: cqrs.StreamingRequestMediator = fastapi.Depends(streaming_mediator_factory),
612+
) -> fastapi.responses.StreamingResponse:
613+
async def generate_sse():
614+
yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\n\n"
615+
616+
async for result in mediator.stream(command):
617+
sse_data = {
618+
"type": "progress",
619+
"data": result.model_dump(),
620+
}
621+
yield f"data: {json.dumps(sse_data)}\n\n"
622+
623+
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
624+
625+
return fastapi.responses.StreamingResponse(
626+
generate_sse(),
627+
media_type="text/event-stream",
628+
)
629+
```
630+
631+
A complete example can be found in
632+
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastapi_sse_streaming.py)
633+
488634
## Protobuf messaging
489635

490636
The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\

examples/dependency_injection.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,56 @@
1+
"""
2+
Example: Dependency Injection with CQRS Request Handlers
3+
4+
This example demonstrates how to use dependency injection (DI) with CQRS request handlers.
5+
The system shows how to integrate DI containers with CQRS mediators, allowing handlers
6+
to receive dependencies through constructor injection.
7+
8+
Use case: Decoupling business logic from infrastructure dependencies. Handlers can
9+
declare their dependencies in constructors, and the DI container automatically resolves
10+
and injects them when handlers are instantiated.
11+
12+
================================================================================
13+
HOW TO RUN THIS EXAMPLE
14+
================================================================================
15+
16+
Run the example:
17+
python examples/dependency_injection.py
18+
19+
The example will:
20+
- Set up a DI container with dependency bindings
21+
- Create a command handler that requires a dependency
22+
- Execute the command and verify the dependency was injected correctly
23+
24+
================================================================================
25+
WHAT THIS EXAMPLE DEMONSTRATES
26+
================================================================================
27+
28+
1. Dependency Injection Setup:
29+
- Define abstract dependencies (AbstractDependency) and concrete implementations
30+
- Configure DI container with type bindings
31+
- Use scope="request" to create new instances per request
32+
33+
2. Constructor Injection:
34+
- Command handlers receive dependencies through constructor parameters
35+
- Dependencies are automatically resolved by the DI container
36+
- No need for manual instantiation or service locator pattern
37+
38+
3. DI Container Integration:
39+
- Pass DI container to bootstrap.bootstrap()
40+
- Mediator uses the container to resolve handler dependencies
41+
- Handlers are created with all required dependencies injected
42+
43+
================================================================================
44+
REQUIREMENTS
45+
================================================================================
46+
47+
Make sure you have installed:
48+
- cqrs (this package)
49+
- di (dependency injection library)
50+
51+
================================================================================
52+
"""
53+
154
import abc
255
import asyncio
356
import logging

examples/domain_event_handler.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,63 @@
1+
"""
2+
Example: Domain Event Handling
3+
4+
This example demonstrates the basic pattern of domain event handling in CQRS.
5+
The system shows how command handlers emit domain events that are automatically
6+
dispatched to their corresponding event handlers.
7+
8+
Use case: Separating command execution from side effects. When a command is executed,
9+
it emits domain events that represent what happened. These events are then processed
10+
by event handlers that perform side effects like sending notifications, updating
11+
read models, or triggering other workflows.
12+
13+
================================================================================
14+
HOW TO RUN THIS EXAMPLE
15+
================================================================================
16+
17+
Run the example:
18+
python examples/domain_event_handler.py
19+
20+
The example will:
21+
- Execute multiple JoinMeetingCommand commands
22+
- Emit UserJoined domain events for each command
23+
- Process events through UserJoinedEventHandler
24+
- Display the number of users in the meeting and events handled
25+
26+
================================================================================
27+
WHAT THIS EXAMPLE DEMONSTRATES
28+
================================================================================
29+
30+
1. Domain Event Definition:
31+
- Create domain events as frozen dataclasses (UserJoined)
32+
- Events represent something that happened in the domain
33+
- Events are immutable and contain all relevant data
34+
35+
2. Event Emission from Command Handlers:
36+
- Command handlers collect events in the events property
37+
- Events are emitted after command execution succeeds
38+
- Multiple events can be emitted from a single command
39+
40+
3. Event Handler Registration:
41+
- Register event handlers using domain_events_mapper
42+
- Map event types to their handlers
43+
- Mediator automatically dispatches events to registered handlers
44+
45+
4. Automatic Event Dispatching:
46+
- Mediator collects events from command handlers
47+
- Events are automatically sent to their registered handlers
48+
- Event handlers process events asynchronously
49+
50+
================================================================================
51+
REQUIREMENTS
52+
================================================================================
53+
54+
Make sure you have installed:
55+
- cqrs (this package)
56+
- di (dependency injection)
57+
58+
================================================================================
59+
"""
60+
161
import asyncio
262
import logging
363
import typing

0 commit comments

Comments
 (0)