eventflowsys is a modern, extensible Python package for building robust event-driven systems. It provides both threaded and async event bus implementations, advanced subscription and messaging features, and a flexible logger injection base class. Designed for professional use, it follows SOLID principles and is suitable for a wide range of applications—from microservices to desktop apps.
- Threaded & Async Event Buses Choose between thread-safe (synchronous) and asyncio-based (asynchronous) event bus implementations.
- Group-based Subscriptions Organize services into named groups and deliver messages to targeted audiences.
- Priority & TTL Messaging Control message delivery order and expiration with priority and time-to-live support.
- Broadcast Support Instantly send messages to all groups.
- Event Hooks Register custom hooks for subscribe, unsubscribe, message delivery, and error events.
- Metrics & Tracking Monitor delivered, failed, pending, and expired messages. Query which services have not yet read a message.
- Custom Error Handling Built-in error classes for robust, granular error management.
- Logger Injection Abstract base class for dependency-injected loggers using loguru, supporting custom or default logging.
- SOLID Principles Clean, extensible, and maintainable architecture.
pip install eventflowsys- Purpose: Abstract base class for logger injection, using loguru by default.
- Usage:
from eventflowsys import LoggerInjectable class MyService(LoggerInjectable): def perform_action(self): self.logger.info("Action performed!")
- Purpose: Thread-safe event bus for synchronous applications.
- Key Methods:
subscribe(group, service_name, callback)unsubscribe(group, service_name)publish(group, data, priority=0, ttl=None, broadcast=False)pending_count()get_unread_services(msg_id)set_on_subscribe/hook(event hooks)get_metrics()
- Features:
- Group-based subscriptions
- Message priority & TTL
- Broadcast support
- Metrics & unread tracking
- Event hooks for extensibility
- Purpose: Asyncio-based event bus for coroutine-based applications.
- Key Methods:
subscribe(group, service_name, callback)(async)unsubscribe(group, service_name)(async)publish(group, data, priority=0, ttl=None, broadcast=False)(async)pending_count()(async)get_unread_services(msg_id)(async)set_on_subscribe/hook(event hooks)get_metrics()
- Features:
- Async group-based subscriptions
- Message priority & TTL
- Broadcast support
- Metrics & unread tracking
- Event hooks for extensibility
- IServiceBus: Abstract base class for event bus implementations.
- Message: Data class for messages (priority, group, data, expiration).
- Custom Errors:
ServiceBusError,SubscriptionError,MessageNotFoundError,GroupNotFoundError
from eventflowsys import ThreadedServiceBus
bus = ThreadedServiceBus()
def callback(msg_id, data):
print(f"Received: {data}")
bus.subscribe("group1", "serviceA", callback)
bus.publish("group1", "hello world")from eventflowsys import AsyncServiceBus
import asyncio
async def main():
bus = AsyncServiceBus()
async def callback(msg_id, data):
print(f"Received: {data}")
await bus.subscribe("group1", "serviceA", callback)
await bus.publish("group1", "hello async world")
asyncio.run(main())from eventflowsys import LoggerInjectable
class MyService(LoggerInjectable):
def perform_action(self):
self.logger.info("Action performed!")
service = MyService()
service.perform_action()You can register hooks to observe or extend bus behavior:
def on_subscribe(group, service_name):
print(f"Service {service_name} subscribed to {group}")
def on_message(msg_id, group, data):
print(f"Message {msg_id} delivered to {group}: {data}")
bus = ThreadedServiceBus()
bus.set_on_subscribe(on_subscribe)
bus.set_on_message(on_message)
bus.subscribe("group1", "svc", lambda i, d: None)
bus.publish("group1", "test")Handle errors gracefully by registering an error hook:
def on_error(error):
print(f"Error occurred: {error}")
bus.set_on_error(on_error)
try:
bus.subscribe("group1", "svc", None) # Not callable, triggers error
except Exception:
passInject a logger into a service that subscribes to the bus:
class LoggingService(LoggerInjectable):
def perform_action(self, msg_id, data):
self.logger.info(f"Received: {data}")
service = LoggingService()
bus.subscribe("group1", "svc", service.perform_action)
bus.publish("group1", "integrated logging!")flowchart TD
P[Publisher] -- Publish --> B(Event Bus)
B -- Deliver --> S1[Subscriber 1]
B -- Deliver --> S2[Subscriber 2]
B -- Metrics/Error/Hook --> H[Hooks]
classDiagram
class IServiceBus {
+subscribe()
+unsubscribe()
+publish()
+pending_count()
+get_unread_services()
+set_on_subscribe()
+set_on_unsubscribe()
+set_on_message()
+set_on_error()
+get_metrics()
}
IServiceBus <|-- ThreadedServiceBus
IServiceBus <|-- AsyncServiceBus
ThreadedServiceBus : threading
AsyncServiceBus : asyncio
ThreadedServiceBus : event hooks
AsyncServiceBus : event hooks
ThreadedServiceBus : metrics
AsyncServiceBus : metrics
flowchart TD
S["Service (LoggerInjectable)"] -->|"uses"| L["Logger (loguru)"]
S -->|"logs to"| F["File/Console"]
| Method | Parameters | Return Type | Description |
|---|---|---|---|
| subscribe | group: str, service_name: str, callback: Callable | None | Subscribe a service to a group. |
| unsubscribe | group: str, service_name: str | None | Unsubscribe a service from a group. |
| publish | group: str, data: Any, priority: int = 0, ttl: Optional[float] = None, broadcast: bool = False | int or list[int] | Publish a message to a group or all groups. |
| pending_count | int | Return the number of pending messages. | |
| get_unread_services | msg_id: int | set[str] or None | Get services that have not read a message. |
| set_on_subscribe | hook: Callable | None | Register a hook for subscribe events. |
| set_on_unsubscribe | hook: Callable | None | Register a hook for unsubscribe events. |
| set_on_message | hook: Callable | None | Register a hook for message delivery events. |
| set_on_error | hook: Callable | None | Register a hook for error events. |
| get_metrics | dict | Get bus metrics (delivered, failed, pending, expired). |
| Method | Parameters | Return Type | Description |
|---|---|---|---|
| init | logger: Optional, log_path: Optional | None | Initialize with optional custom logger or log path. |
| perform_action | None | Abstract method to be implemented by subclasses. |
| Attribute | Type | Description |
|---|---|---|
| priority | int | Message priority (lower = higher priority) |
| msg_id | int | Unique message identifier |
| group | str | Target group |
| data | Any | Message payload |
| expiration | float or None | Expiration timestamp |
| Error Class | Description |
|---|---|
| ServiceBusError | Base exception for bus errors |
| SubscriptionError | Raised for subscription problems |
| MessageNotFoundError | Raised if a message ID is not found |
| GroupNotFoundError | Raised if a group does not exist |
Tests are provided for all major features in the tests/ directory. Use pytest to run the test suite:
pytest
The following features are planned and expected to be added in future releases:
- LoggerInjectable: Multi-Logger Support
- Official support for additional loggers, including:
- Python standard library
logging.Logger - Sentry (
sentry_sdk) - structlog
- Logbook
- Graylog (e.g.,
graypy) - Custom logger implementations
- Python standard library
- Official support for additional loggers, including:
- Other planned features will be announced as the project evolves.
This section lists possible, uncommitted ideas for the future of eventflowsys. These are not planned or promised features—just a collection of creative directions the project could take. Community feedback and real-world needs will shape what (if any) get built.
- Event Persistence & Replay: Persist messages to disk or database and replay them for new subscribers or debugging.
- Dead Letter Queue: Automatically move undeliverable messages to a dead letter queue for inspection or reprocessing.
- Message Filtering & Transformation: Allow subscribers to filter or transform messages before handling.
- Wildcard & Pattern Subscriptions: Subscribe to multiple groups/topics using wildcards or regex patterns.
- Message Acknowledgement & Retry: Require explicit ack from handlers, with configurable retry logic.
- Event Tracing & Correlation: Add correlation IDs and tracing hooks for distributed debugging.
- Plugin System: Dynamically load/unload event handler plugins at runtime.
- Event Schema Validation: Register and enforce schemas for message types.
- Batching & Scheduling: Support for message batching and scheduled (delayed) delivery.
- Multi-tenancy & Isolation: Isolate groups of subscribers for multi-tenant scenarios.
- Cloud Broker Integration: Optional adapters for RabbitMQ, Kafka, Azure Service Bus, etc.
- Security & Rate Limiting: Message encryption, signing, and per-group/subscriber rate limiting.
- Custom Priority Strategies: Allow pluggable strategies for message prioritization.
- Lifecycle Event Hooks: Hooks for bus startup, shutdown, and other lifecycle events.
- Cloud Broker Adapters: Integrate with RabbitMQ, Kafka, Azure Service Bus, and more.
- Automated Testing Utilities: Tools for simulating, testing, and validating event flows.
- Jupyter/Notebook Integration: Interactive event bus usage and visualization in notebooks.
- Advanced Monitoring/Observability: More built-in metrics, tracing, and logging for all bus operations (no dashboard/CLI planned).
- Distributed Event Bus: Multi-process or multi-host event bus for distributed systems.
- WebSocket/REST API Integration: Allow event bus to send/receive over network protocols.
- Graph-based Routing: Support for complex event routing and transformation graphs.
- Event Sourcing Patterns: Support for event sourcing and CQRS architectures.
- AI/ML Event Handlers: Integrate with AI/ML models for smart event processing.
- Mobile/IoT Adapters: Support for lightweight event bus usage on mobile or IoT devices.
- Visual Debugging Tools: Export event flow for visualization in external tools.
- Zero-Config Mode: Auto-discover and wire up services with minimal setup.
This is a living list of ideas, not a roadmap. Suggestions welcome!
Contributions are welcome! Please open issues or pull requests for bug fixes, new features, or documentation improvements.
MIT License