Overview • Architecture • Quick Start • Use Cases • API Reference
The EventBus is an optional in-process domain event dispatcher that provides an abstraction layer over the transactional outbox pattern.
Most services can use the direct outbox approach (simpler, recommended):
# Direct approach (PRIMARY pattern)
await outbox_repo.add_event(event, session=session)
await session.commit()EventBus is useful when you need:
- ✅ Multiple side effects per event - One event triggers many handlers
- ✅ Lifecycle hooks - Observe/log all event dispatches
- ✅ Testing isolation - Enable/disable toggle for unit tests
- ✅ Decorator syntax - Clean
@bus.subscriber(EventType)registration - ✅ Decoupled architecture - Service layer doesn't know about outbox
If you only need to persist events to the outbox, skip EventBus and use direct outbox_repo.add_event().
Service Layer
↓
await outbox_repo.add_event(event, session=session) ← DIRECT
↓
Outbox Table (transactional)
↓
Outbox Worker (background)
↓
Kafka Publisher → External broker
Service Layer
↓
await event_bus.dispatch(event) ← OPTIONAL
↓
EventBus (in-process dispatcher)
├→ OutboxEventHandler → outbox_repo.add_event()
├→ AuditLogHandler → audit_service.log()
├→ MetricsHandler → prometheus.increment()
└→ CacheInvalidator → redis.delete()
Outbox Worker (background)
↓
Kafka Publisher → External broker
Key Insight: EventBus doesn't replace the outbox - it's a dispatcher that can route to multiple handlers, including the outbox.
from fastapi import Depends
from messagekit.core import BaseEvent
from messagekit.infrastructure import SqlAlchemyOutboxRepository
class UserCreated(BaseEvent):
event_type: str = "user.created"
aggregate_id: str
user_id: int
@router.post("/users")
async def create_user(
data: CreateUserRequest,
session = Depends(get_session),
outbox_repo: SqlAlchemyOutboxRepository = Depends(get_outbox_repo)
):
# Business logic
user_id = await users_service.create_user(data, session)
# Persist event to outbox (same transaction)
await outbox_repo.add_event(
UserCreated(aggregate_id=f"user-{user_id}", user_id=user_id),
session=session
)
await session.commit() # Atomic: user + event
return {"user_id": user_id}from fastapi import Request
from messagekit.infrastructure import OutboxEventHandler
@router.post("/users")
async def create_user(data: CreateUserRequest, request: Request):
event_bus = request.app.state.event_bus
outbox_repo = request.app.state.outbox_repository
# Register multiple handlers
event_bus.register(UserCreated, OutboxEventHandler(outbox_repo))
event_bus.register(UserCreated, AuditLogHandler())
event_bus.register(UserCreated, MetricsHandler())
# Business logic
user_id = await users_service.create_user(data)
# Dispatch → all handlers execute
await event_bus.dispatch(UserCreated(
aggregate_id=f"user-{user_id}",
user_id=user_id,
))
return {"user_id": user_id}When to use which:
- Direct outbox: Single destination (just Kafka), simple
- EventBus: Multiple side effects, hooks, testing isolation
Problem: Ensure domain events are reliably published to Kafka, even if the service crashes.
Solution: EventBus dispatches to OutboxEventHandler, which persists events in the same database transaction as business data.
from messagekit.infrastructure import OutboxEventHandler
# In service layer
async def create_order(data, session):
# 1. Business logic
order = Order(**data)
session.add(order)
# 2. Register outbox handler
event_bus.register(OrderPlaced, OutboxEventHandler(outbox_repository))
# 3. Dispatch event (persisted in outbox table)
await event_bus.dispatch(OrderPlaced(
aggregate_id=f"order-{order.id}",
order_id=order.id,
customer_id=order.customer_id,
))
# 4. Commit both order + outbox event atomically
await session.commit()
# 5. Outbox worker publishes to Kafka asynchronouslyResult: Atomic writes, guaranteed delivery, no lost events.
Problem: One business action triggers multiple side effects (audit, metrics, cache, notifications).
Solution: Register multiple handlers for the same event type.
# Multiple handlers for OrderPlaced event
event_bus.register(OrderPlaced, OutboxEventHandler(outbox_repo)) # Kafka
event_bus.register(OrderPlaced, AuditLogHandler(audit_service)) # Audit
event_bus.register(OrderPlaced, MetricsHandler(prometheus_registry)) # Metrics
event_bus.register(OrderPlaced, CacheInvalidator(redis_client)) # Cache
# One dispatch → all handlers execute
await event_bus.dispatch(OrderPlaced(...))Result: Decoupled side effects, clean service layer code, easy to add/remove handlers.
Problem: Need visibility into all domain event flows for debugging and monitoring.
Solution: Use lifecycle hooks to log/trace all dispatches.
from messagekit.core import DispatchHooks
def log_dispatch(trace):
logger.info(
"Event dispatched",
event_type=trace.event.event_type,
handler=trace.handler_name,
backend=trace.backend_name,
)
def log_failure(trace):
logger.error(
"Handler failed",
event_type=trace.event.event_type,
handler=trace.handler_name,
error=str(trace.error),
)
def update_metrics(trace):
metrics.increment(
"event_bus.handler.success",
tags={"event_type": trace.event.event_type}
)
# Wire hooks at startup
event_bus = build_event_bus(
[],
hooks=DispatchHooks(
on_dispatch=log_dispatch,
on_success=update_metrics,
on_failure=log_failure,
)
)Result: Full observability of event flows, structured logs, Prometheus metrics.
Problem: Unit tests trigger real side effects (database writes, external calls).
Solution: Disable EventBus in tests to isolate business logic.
# Production: events dispatched normally
event_bus = EventBus(settings=DispatchSettings(enabled=True))
# Unit tests: disable side effects
event_bus = EventBus(settings=DispatchSettings(enabled=False))
# Test service logic without triggering handlers
await service.create_user(data) # EventBus disabled, no DB writesResult: Fast, isolated unit tests without mocking.
Problem: Need parallel execution of independent handlers for performance.
Solution: Implement custom DispatchBackend.
from messagekit.core import DispatchBackend
import asyncio
class ParallelDispatchBackend(DispatchBackend):
name = "parallel"
async def invoke(self, event, handlers, invoke_one):
# Execute all handlers concurrently
await asyncio.gather(*[
invoke_one(handler) for handler in handlers
])
# Use custom backend
event_bus = EventBus(backend=ParallelDispatchBackend())Result: Concurrent handler execution for performance-critical paths.
Problem: Coordinate multi-step business processes across aggregates.
Solution: Use EventBus to trigger saga steps based on domain events.
# Saga coordinator listens to domain events
@event_bus.subscriber(OrderPlaced)
async def start_order_saga(event: OrderPlaced):
await saga_service.reserve_inventory(event.order_id)
@event_bus.subscriber(InventoryReserved)
async def process_payment(event: InventoryReserved):
await payment_service.charge(event.order_id)
@event_bus.subscriber(PaymentSucceeded)
async def fulfill_order(event: PaymentSucceeded):
await fulfillment_service.ship(event.order_id)Result: Decoupled saga orchestration without tight coupling between services.
class EventBus:
def __init__(
self,
*,
backend: DispatchBackend | None = None, # Default: SequentialDispatchBackend()
hooks: DispatchHooks | None = None, # Optional lifecycle hooks
settings: DispatchSettings | None = None, # Optional config (enabled, debug)
) -> None: ...
def register(
self,
event_type: type[BaseEvent],
handler: IDomainEventHandler | Callable, # Handler instance or async callback
*,
handler_name: str | None = None, # Optional name for tracing
) -> None:
"""Register a handler for an event type."""
def subscriber(
self,
event_type: type[BaseEvent],
*,
handler_name: str | None = None,
) -> Callable:
"""Decorator for registering async callbacks."""
async def dispatch(self, event: BaseEvent) -> None:
"""Dispatch event to all registered handlers."""@dataclass(frozen=True)
class DispatchHooks:
on_dispatch: Callable[[DispatchTrace], None] | None = None # Before dispatch
on_success: Callable[[DispatchTrace], None] | None = None # After success
on_failure: Callable[[DispatchTrace], None] | None = None # After failure
on_disabled: Callable[[DispatchTrace], None] | None = None # When disabled
on_debug: Callable[[DispatchTrace], None] | None = None # Debug tracing@dataclass(frozen=True)
class DispatchTrace:
stage: str # "dispatch", "success", "failure", "disabled"
event: BaseEvent # The dispatched event
backend_name: str # Backend identifier ("sequential", "parallel", etc.)
handler_name: str | None = None # Handler being invoked
error: Exception | None = None # Exception if failure@dataclass(frozen=True)
class DispatchSettings:
enabled: bool = True # If False, dispatch is skipped
debug: bool = False # If True, on_debug hook is emitted❌ Bad - Registers handler on every request:
@router.post("/users")
async def create_user(request: Request):
event_bus = request.app.state.event_bus
event_bus.register(UserCreated, OutboxEventHandler(repo)) # Re-registers every time!
await event_bus.dispatch(UserCreated(...))✅ Good - Register once at startup or module-level:
# In startup lifespan or module init
event_bus.register(UserCreated, OutboxEventHandler(outbox_repository))
# In handlers, just dispatch
@router.post("/users")
async def create_user(request: Request):
await request.app.state.event_bus.dispatch(UserCreated(...))❌ Bad - Generic "something changed" events:
await event_bus.dispatch(GenericEvent(data={"user_id": 123}))✅ Good - Explicit, typed domain events:
class UserCreated(BaseEvent):
event_type: str = "user.created"
aggregate_id: str
user_id: int
email: str
await event_bus.dispatch(UserCreated(
aggregate_id=f"user-{user_id}",
user_id=user_id,
email=email,
))Handlers may be retried, so ensure they're safe to execute multiple times:
class AuditLogHandler(IDomainEventHandler[OrderPlaced]):
async def handle(self, event: OrderPlaced):
# Use event ID to prevent duplicate logs
if not await audit_repo.exists(event.id):
await audit_repo.create(event)Don't pollute handlers with logging/metrics - use hooks instead:
❌ Bad:
async def handle(self, event: UserCreated):
logger.info(f"Handling {event.event_type}") # Repeated in every handler
await do_work(event)
logger.info("Success")✅ Good:
# Hooks configured once at startup
event_bus = EventBus(
hooks=DispatchHooks(
on_dispatch=lambda trace: logger.info(f"Handling {trace.event.event_type}"),
on_success=lambda trace: logger.info(f"Success: {trace.handler_name}"),
)
)
# Handlers stay clean
async def handle(self, event: UserCreated):
await do_work(event) # No logging noise- Domain events (EventBus): Internal to the service, in-process
- Integration events (Kafka): Cross-service, asynchronous
# Domain event (EventBus)
class OrderPlaced(BaseEvent):
event_type: str = "order.placed"
order_id: int
customer_id: int
# Integration event (Kafka - published from outbox)
# Same schema, but goes through outbox → KafkaEventBus is not for:
- ❌ Cross-service communication - Use Kafka/RabbitMQ
- ❌ External API calls - Use direct HTTP clients
- ❌ Long-running background jobs - Use task queue (Celery, etc.)
- ❌ Request/response patterns - Use CQRS mediator pattern
EventBus is for:
- ✅ In-process side effects - Multiple handlers per event
- ✅ Domain event dispatch - Transactional outbox pattern
- ✅ Decoupled business logic - Service → EventBus → Handlers
- ✅ Testing isolation - Enable/disable toggle
EventBus is optional - the direct outbox pattern is often simpler and clearer.
async def create_user(data, session, outbox_repo):
user = User(**data)
session.add(user)
# Direct, simple, clear
await outbox_repo.add_event(
UserCreated(user_id=user.id),
session=session
)
await session.commit()Use when:
- ✅ Single destination (just outbox → Kafka)
- ✅ Simple, straightforward flow
- ✅ No need for multiple side effects
async def create_user(data, session, event_bus):
user = User(**data)
session.add(user)
# Decoupled, flexible, multiple handlers
await event_bus.dispatch(UserCreated(user_id=user.id))
await session.commit()Use when:
- ✅ Multiple side effects per event (outbox + audit + metrics + cache)
- ✅ Need lifecycle hooks for observability
- ✅ Testing isolation (enable/disable toggle)
- ✅ Decorator-based handler registration preferred
EventBus Benefits:
- Multiple handlers per event
- Service layer doesn't import infrastructure
- Easy to add/remove handlers
- Testable without mocking
EventBus Drawbacks:
- Extra abstraction layer (more complexity)
- Must wire up EventBus and register handlers
- Overkill for simple event persistence
- Check registration:
event_bus._handlersshould contain your event type - Verify dispatch is called: Add
on_dispatchhook - Check
enabledsetting:DispatchSettings(enabled=True)
- Verify
OutboxEventHandleris registered - Check session commit happens after dispatch
- Verify outbox worker is running (
settings.outbox_worker_enabled=True)
- Use
ParallelDispatchBackendfor independent handlers - Profile handlers with
on_successhook timing - Consider async handlers with proper
await
See tests/unit/core/test_event_bus.py for comprehensive examples of all features.