Skip to content

Commit 30e118c

Browse files
author
Вадим Козыревский
committed
Fix event processing documentation
1 parent 59cfe7e commit 30e118c

3 files changed

Lines changed: 131 additions & 90 deletions

File tree

docs/event_handler/event_flow.md

Lines changed: 70 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,31 @@ sequenceDiagram
2525
participant Mediator
2626
participant Handler as Command Handler
2727
participant Events as Events Collection
28-
participant Dispatcher as Event Dispatcher
29-
participant Handlers as Event Handlers
28+
participant Processor as Event Processor
3029
participant Emitter as Event Emitter
30+
participant Handlers as Event Handlers
31+
participant Broker as Message Broker
3132
3233
Client->>Mediator: 1. Send Command
3334
Mediator->>Handler: 2. Execute Handler
3435
Handler->>Handler: 3. Business Logic
3536
Handler->>Events: 4. Collect Events
3637
Handler-->>Mediator: 5. Return Response
3738
38-
Mediator->>Dispatcher: 6. Process Events
39-
Dispatcher->>Handlers: 7. Execute Handlers
40-
Handlers-->>Dispatcher: 8. Complete
39+
Mediator->>Processor: 6. Emit Events
40+
Processor->>Emitter: 7. Emit Each Event
4141
42-
Mediator->>Emitter: 9. Emit Events
43-
Emitter->>Emitter: 10. Send to Broker/Handlers
42+
alt DomainEvent
43+
Emitter->>Handlers: 8. Execute Event Handlers
44+
Handlers-->>Emitter: 9. Complete
45+
else NotificationEvent
46+
Emitter->>Broker: 8. Send to Message Broker
47+
Broker-->>Emitter: 9. Complete
48+
end
4449
45-
Mediator-->>Client: 11. Return Response
50+
Emitter-->>Processor: 10. Complete
51+
Processor-->>Mediator: 11. Complete
52+
Mediator-->>Client: 12. Return Response
4653
```
4754

4855
### Detailed Event Processing Flow
@@ -54,27 +61,34 @@ graph TD
5461
C -->|Has Events?| D{Events Exist?}
5562
5663
D -->|No| E[Return Response]
57-
D -->|Yes| F[Process Events Parallel]
64+
D -->|Yes| F[EventProcessor.emit_events]
65+
66+
F -->|For Each Event| G{Parallel Enabled?}
67+
68+
G -->|No| H[Sequential: EventEmitter.emit]
69+
G -->|Yes| I[Parallel: Create Task with Semaphore]
70+
I --> J[EventEmitter.emit]
5871
59-
F -->|For Each Event| G[EventDispatcher]
60-
G -->|Find Handlers| H[EventMap Lookup]
61-
H -->|Resolve Handler| I[DI Container]
62-
I -->|Execute| J[Event Handler]
63-
J -->|Complete| K{More Events?}
72+
H --> K{Event Type?}
73+
J --> K
6474
65-
K -->|Yes| F
66-
K -->|No| L[Emit Events]
75+
K -->|DomainEvent| L[EventEmitter: Find Handlers]
76+
K -->|NotificationEvent| M[EventEmitter: Send to Broker]
6777
68-
L -->|DomainEvent| M[Process via Handlers]
69-
L -->|NotificationEvent| N[Send to Broker]
78+
L --> N[EventMap Lookup]
79+
N --> O[Resolve Handler from DI]
80+
O --> P[Execute Event Handler]
81+
P --> Q{More Events?}
7082
71-
M --> E
72-
N --> E
83+
M --> Q
84+
Q -->|Yes| F
85+
Q -->|No| E
7386
7487
style A fill:#e1f5ff
7588
style B fill:#fff3e0
76-
style G fill:#c8e6c9
77-
style J fill:#c8e6c9
89+
style F fill:#c8e6c9
90+
style L fill:#c8e6c9
91+
style P fill:#c8e6c9
7892
style E fill:#f3e5f5
7993
```
8094

@@ -102,42 +116,54 @@ class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
102116
)
103117
```
104118

105-
### 2. Event Dispatch
119+
### 2. Event Emission
106120

107-
After the command handler completes, the mediator collects events and dispatches them:
121+
After the command handler completes, the mediator collects events and emits them through EventProcessor:
108122

109123
```python
110124
dispatch_result = await self._dispatcher.dispatch(request)
111125

112-
if dispatch_result.events:
113-
# Process events (parallel or sequential)
114-
await self._process_events_parallel(dispatch_result.events.copy())
115-
# Emit events to broker or handlers
116-
await self._send_events(dispatch_result.events.copy())
126+
# Events are emitted through EventProcessor
127+
# EventProcessor uses EventEmitter which handles:
128+
# - DomainEvent: processes via event handlers (in-process)
129+
# - NotificationEvent: sends to message broker
130+
await self._event_processor.emit_events(dispatch_result.events)
117131
```
118132

119-
### 3. Event Processing
133+
The `EventProcessor` handles parallel or sequential processing based on configuration, and `EventEmitter` routes events to appropriate handlers or message brokers.
120134

121-
Events are processed through `EventDispatcher`, which finds registered handlers and executes them:
135+
### 3. Event Processing via EventEmitter
136+
137+
Events are processed through `EventEmitter`, which routes them based on event type:
122138

123139
```mermaid
124140
graph TD
125-
A[EventDispatcher.dispatch] -->|1. Get Event Type| B[EventMap.get]
126-
B -->|2. Find Handlers| C{Handlers Found?}
127-
C -->|No| D[Log Warning]
128-
C -->|Yes| E[Loop Through Handlers]
129-
E -->|3. Resolve Handler| F[DI Container]
130-
F -->|4. Execute Handler| G[Handler.handle]
131-
G -->|5. Process Side Effects| H[Complete]
141+
A[EventEmitter.emit] -->|1. Get Event Type| B{Event Type?}
142+
143+
B -->|DomainEvent| C[EventMap.get]
144+
C -->|2. Find Handlers| D{Handlers Found?}
145+
D -->|No| E[Log Warning]
146+
D -->|Yes| F[Loop Through Handlers]
147+
F -->|3. Resolve Handler| G[DI Container]
148+
G -->|4. Execute Handler| H[Handler.handle]
149+
H -->|5. Process Side Effects| I[Complete]
150+
151+
B -->|NotificationEvent| J{Message Broker?}
152+
J -->|No| K[Raise RuntimeError]
153+
J -->|Yes| L[Send to Message Broker]
154+
L --> I
132155
133156
style A fill:#e1f5ff
134-
style G fill:#c8e6c9
135-
style H fill:#fff3e0
157+
style H fill:#c8e6c9
158+
style I fill:#fff3e0
136159
```
137160

138-
### 4. Event Emission
161+
### 4. Event Routing
162+
163+
`EventEmitter` automatically routes events based on their type:
139164

140-
After processing, events are emitted through `EventEmitter`:
165+
- **DomainEvent** — Processed by event handlers registered in EventMap (in-process, synchronous)
166+
- **NotificationEvent** — Sent to message broker (Kafka, RabbitMQ, etc.) for asynchronous processing
141167

142-
- **DomainEvent** — Processed by event handlers (in-process)
143-
- **NotificationEvent** — Sent to message broker (Kafka, RabbitMQ, etc.)
168+
!!! important "Single Processing"
169+
Events are processed **only once** through EventEmitter. There is no duplicate processing - DomainEvents are handled by event handlers, and NotificationEvents are sent to message brokers.

docs/event_handler/parallel_processing.md

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,32 @@ Events can be processed in parallel to improve performance. This is controlled b
2424

2525
```mermaid
2626
graph TD
27-
Start[Process Events] --> CheckEnable{Parallel Enabled?}
27+
Start[EventProcessor.emit_events] --> CheckEnable{Parallel Enabled?}
2828
2929
CheckEnable -->|No| Sequential[Sequential Processing]
3030
Sequential --> LoopSeq[For Each Event]
31-
LoopSeq --> ProcessSeq[Process Event]
32-
ProcessSeq --> NextSeq{More Events?}
31+
LoopSeq --> EmitSeq[EventEmitter.emit]
32+
EmitSeq --> RouteSeq{Route Event}
33+
RouteSeq -->|DomainEvent| HandlerSeq[Execute Handlers]
34+
RouteSeq -->|NotificationEvent| BrokerSeq[Send to Broker]
35+
HandlerSeq --> NextSeq{More Events?}
36+
BrokerSeq --> NextSeq
3337
NextSeq -->|Yes| LoopSeq
3438
NextSeq -->|No| End1[End]
3539
3640
CheckEnable -->|Yes| Parallel[Parallel Processing]
37-
Parallel --> CreateTasks[Create Tasks for Each Event]
38-
CreateTasks --> Semaphore[Acquire Semaphore]
39-
Semaphore --> ProcessPar[Process Event]
40-
ProcessPar --> ReleaseSem[Release Semaphore]
41-
ReleaseSem --> Gather[asyncio.gather All Tasks]
42-
Gather --> End2[End]
41+
Parallel --> LoopPar[For Each Event]
42+
LoopPar --> CreateTask[Create Task]
43+
CreateTask --> Semaphore[Acquire Semaphore]
44+
Semaphore --> EmitPar[EventEmitter.emit]
45+
EmitPar --> RoutePar{Route Event}
46+
RoutePar -->|DomainEvent| HandlerPar[Execute Handlers]
47+
RoutePar -->|NotificationEvent| BrokerPar[Send to Broker]
48+
HandlerPar --> ReleaseSem[Release Semaphore]
49+
BrokerPar --> ReleaseSem
50+
ReleaseSem --> NextPar{More Events?}
51+
NextPar -->|Yes| LoopPar
52+
NextPar -->|No| End2[End]
4353
4454
style Start fill:#e1f5ff
4555
style Sequential fill:#fff3e0
@@ -49,43 +59,44 @@ graph TD
4959

5060
### Implementation
5161

62+
The `EventProcessor` handles parallel or sequential event emission:
63+
5264
```python
53-
class RequestMediator:
65+
class EventProcessor:
5466
def __init__(
5567
self,
68+
event_map: EventMap,
69+
event_emitter: EventEmitter | None = None,
5670
max_concurrent_event_handlers: int = 1,
5771
concurrent_event_handle_enable: bool = True,
5872
):
59-
# Create semaphore to limit concurrency
60-
self._event_semaphore = asyncio.Semaphore(max_concurrent_event_handlers)
73+
self._event_emitter = event_emitter
74+
self._max_concurrent_event_handlers = max_concurrent_event_handlers
6175
self._concurrent_event_handle_enable = concurrent_event_handle_enable
76+
self._event_semaphore = asyncio.Semaphore(max_concurrent_event_handlers)
6277

63-
async def _process_event_with_semaphore(self, event: Event) -> None:
64-
"""Process a single event with semaphore limit."""
65-
async with self._event_semaphore:
66-
await self._event_dispatcher.dispatch(event)
67-
68-
async def _process_events_parallel(
69-
self,
70-
events: List[Event],
71-
) -> None:
72-
"""Process events in parallel with semaphore limit or sequentially."""
73-
if not events:
78+
async def emit_events(self, events: List[Event]) -> None:
79+
"""Emit events via event emitter (parallel or sequential)."""
80+
if not events or not self._event_emitter:
7481
return
7582

7683
if not self._concurrent_event_handle_enable:
7784
# Sequential processing
7885
for event in events:
79-
await self._event_dispatcher.dispatch(event)
86+
await self._event_emitter.emit(event)
8087
else:
81-
# Parallel processing with semaphore limit
82-
tasks = [
83-
self._process_event_with_semaphore(event)
84-
for event in events
85-
]
86-
await asyncio.gather(*tasks)
88+
# Parallel processing with semaphore limit (fire-and-forget)
89+
for event in events:
90+
asyncio.create_task(self._emit_event_with_semaphore(event))
91+
92+
async def _emit_event_with_semaphore(self, event: Event) -> None:
93+
"""Emit a single event with semaphore limit."""
94+
async with self._event_semaphore:
95+
await self._event_emitter.emit(event)
8796
```
8897

98+
The `EventEmitter` then routes events to handlers or message brokers based on event type.
99+
89100
### Configuration
90101

91102
```python
@@ -129,7 +140,9 @@ class ProcessOrderCommandHandler(RequestHandler[ProcessOrderCommand, None]):
129140
self._events.append(EmailNotificationEvent(...))
130141

131142
# With max_concurrent_event_handlers=3:
132-
# - Events 1-3 process in parallel
133-
# - Event 4 waits for a slot
134-
# - All events complete before response is returned
143+
# - Events 1-3 emit in parallel (fire-and-forget tasks)
144+
# - Event 4 waits for a semaphore slot
145+
# - Each event is routed by EventEmitter:
146+
# - DomainEvents → processed by handlers
147+
# - NotificationEvents → sent to message broker
135148
```

docs/event_handler/runtime_processing.md

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,19 @@ await mediator.send(JoinMeetingCommand(user_id="123", meeting_id="456"))
4040
# What happens:
4141
# 1. Command handler executes (synchronously)
4242
# 2. Events are collected from handler.events
43-
# 3. Events are processed by handlers (synchronously, in parallel if enabled)
44-
# 4. Events are emitted (synchronously)
45-
# 5. Response is returned
43+
# 3. Events are emitted through EventProcessor (synchronously, in parallel if enabled)
44+
# - EventProcessor uses EventEmitter which routes events:
45+
# - DomainEvent → processed by event handlers (in-process)
46+
# - NotificationEvent → sent to message broker
47+
# 4. Response is returned
4648
```
4749

4850

49-
The `EventDispatcher` is responsible for routing events to their handlers:
51+
The `EventEmitter` routes events to their handlers or message brokers. For DomainEvents, it uses EventDispatcher logic internally:
5052

5153
```mermaid
5254
graph TD
53-
Start[EventDispatcher.dispatch] --> GetType[Get Event Type]
55+
Start[EventEmitter.emit DomainEvent] --> GetType[Get Event Type]
5456
GetType --> Lookup[Lookup in EventMap]
5557
Lookup --> Found{Handlers Found?}
5658
@@ -71,29 +73,29 @@ graph TD
7173
style Execute fill:#c8e6c9
7274
```
7375

74-
### Dispatcher Implementation
76+
### EventEmitter Implementation for DomainEvents
7577

7678
```python
77-
class EventDispatcher:
78-
async def dispatch(self, event: Event) -> None:
79+
class EventEmitter:
80+
@emit.register
81+
async def _(self, event: DomainEvent) -> None:
7982
# 1. Find handlers for event type
80-
handler_types = self._event_map.get(type(event), [])
83+
handlers_types = self._event_map.get(type(event), [])
8184

82-
if not handler_types:
85+
if not handlers_types:
8386
logger.warning(f"Handlers for event {type(event).__name__} not found")
8487
return
8588

8689
# 2. Process each handler
87-
for handler_type in handler_types:
90+
for handler_type in handlers_types:
8891
# 3. Resolve handler from DI container
8992
handler = await self._container.resolve(handler_type)
9093

9194
# 4. Execute handler
9295
await handler.handle(event)
9396
```
9497

95-
96-
The `EventEmitter` is responsible for emitting events after processing:
98+
The `EventEmitter` is responsible for emitting events and routing them based on type:
9799

98100
```mermaid
99101
graph TD

0 commit comments

Comments
 (0)