Skip to content

noxius-studium/eventflowsys

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

eventflowsys

eventflowsys is a modern, extensible Python package for building robust event-driven systems. It provides both threaded and async event bus implementations, advanced subscription and messaging features, and a flexible logger injection base class. Designed for professional use, it follows SOLID principles and is suitable for a wide range of applications—from microservices to desktop apps.


Key Features

  • Threaded & Async Event Buses Choose between thread-safe (synchronous) and asyncio-based (asynchronous) event bus implementations.
  • Group-based Subscriptions Organize services into named groups and deliver messages to targeted audiences.
  • Priority & TTL Messaging Control message delivery order and expiration with priority and time-to-live support.
  • Broadcast Support Instantly send messages to all groups.
  • Event Hooks Register custom hooks for subscribe, unsubscribe, message delivery, and error events.
  • Metrics & Tracking Monitor delivered, failed, pending, and expired messages. Query which services have not yet read a message.
  • Custom Error Handling Built-in error classes for robust, granular error management.
  • Logger Injection Abstract base class for dependency-injected loggers using loguru, supporting custom or default logging.
  • SOLID Principles Clean, extensible, and maintainable architecture.



Documentation


Installation

pip install eventflowsys


Modules & Classes

LoggerInjectable (logger_Injectable.py)

  • Purpose: Abstract base class for logger injection, using loguru by default.
  • Usage:
    from eventflowsys import LoggerInjectable
    class MyService(LoggerInjectable):
        def perform_action(self):
            self.logger.info("Action performed!")

ThreadedServiceBus (bus/thread_bus.py)

  • Purpose: Thread-safe event bus for synchronous applications.
  • Key Methods:
    • subscribe(group, service_name, callback)
    • unsubscribe(group, service_name)
    • publish(group, data, priority=0, ttl=None, broadcast=False)
    • pending_count()
    • get_unread_services(msg_id)
    • set_on_subscribe/hook (event hooks)
    • get_metrics()
  • Features:
    • Group-based subscriptions
    • Message priority & TTL
    • Broadcast support
    • Metrics & unread tracking
    • Event hooks for extensibility

AsyncServiceBus (bus/async_bus.py)

  • Purpose: Asyncio-based event bus for coroutine-based applications.
  • Key Methods:
    • subscribe(group, service_name, callback) (async)
    • unsubscribe(group, service_name) (async)
    • publish(group, data, priority=0, ttl=None, broadcast=False) (async)
    • pending_count() (async)
    • get_unread_services(msg_id) (async)
    • set_on_subscribe/hook (event hooks)
    • get_metrics()
  • Features:
    • Async group-based subscriptions
    • Message priority & TTL
    • Broadcast support
    • Metrics & unread tracking
    • Event hooks for extensibility

Base Interfaces & Errors (bus/base_bus.py)

  • IServiceBus: Abstract base class for event bus implementations.
  • Message: Data class for messages (priority, group, data, expiration).
  • Custom Errors:
    • ServiceBusError, SubscriptionError, MessageNotFoundError, GroupNotFoundError

Quick Start Examples

Threaded Bus

from eventflowsys import ThreadedServiceBus
bus = ThreadedServiceBus()
def callback(msg_id, data):
    print(f"Received: {data}")
bus.subscribe("group1", "serviceA", callback)
bus.publish("group1", "hello world")

Async Bus

from eventflowsys import AsyncServiceBus
import asyncio
async def main():
    bus = AsyncServiceBus()
    async def callback(msg_id, data):
        print(f"Received: {data}")
    await bus.subscribe("group1", "serviceA", callback)
    await bus.publish("group1", "hello async world")
asyncio.run(main())

LoggerInjectable

from eventflowsys import LoggerInjectable
class MyService(LoggerInjectable):
    def perform_action(self):
        self.logger.info("Action performed!")
service = MyService()
service.perform_action()

Advanced Usage

Custom Event Hooks

You can register hooks to observe or extend bus behavior:

def on_subscribe(group, service_name):
    print(f"Service {service_name} subscribed to {group}")
def on_message(msg_id, group, data):
    print(f"Message {msg_id} delivered to {group}: {data}")
bus = ThreadedServiceBus()
bus.set_on_subscribe(on_subscribe)
bus.set_on_message(on_message)
bus.subscribe("group1", "svc", lambda i, d: None)
bus.publish("group1", "test")

Custom Error Handling

Handle errors gracefully by registering an error hook:

def on_error(error):
    print(f"Error occurred: {error}")
bus.set_on_error(on_error)
try:
    bus.subscribe("group1", "svc", None)  # Not callable, triggers error
except Exception:
    pass

Integration: LoggerInjectable with Event Bus

Inject a logger into a service that subscribes to the bus:

class LoggingService(LoggerInjectable):
    def perform_action(self, msg_id, data):
        self.logger.info(f"Received: {data}")
service = LoggingService()
bus.subscribe("group1", "svc", service.perform_action)
bus.publish("group1", "integrated logging!")

Architecture & Flow Diagrams

Event Flow (Threaded/Async Bus)

flowchart TD
    P[Publisher] -- Publish --> B(Event Bus)
    B -- Deliver --> S1[Subscriber 1]
    B -- Deliver --> S2[Subscriber 2]
    B -- Metrics/Error/Hook --> H[Hooks]
Loading

Bus Architecture Overview

classDiagram
    class IServiceBus {
        +subscribe()
        +unsubscribe()
        +publish()
        +pending_count()
        +get_unread_services()
        +set_on_subscribe()
        +set_on_unsubscribe()
        +set_on_message()
        +set_on_error()
        +get_metrics()
    }
    IServiceBus <|-- ThreadedServiceBus
    IServiceBus <|-- AsyncServiceBus
    ThreadedServiceBus : threading
    AsyncServiceBus : asyncio
    ThreadedServiceBus : event hooks
    AsyncServiceBus : event hooks
    ThreadedServiceBus : metrics
    AsyncServiceBus : metrics
Loading

Logger Injection Flow

flowchart TD
    S["Service (LoggerInjectable)"] -->|"uses"| L["Logger (loguru)"]
    S -->|"logs to"| F["File/Console"]
Loading

API Reference Tables

ThreadedServiceBus & AsyncServiceBus

Method Parameters Return Type Description
subscribe group: str, service_name: str, callback: Callable None Subscribe a service to a group.
unsubscribe group: str, service_name: str None Unsubscribe a service from a group.
publish group: str, data: Any, priority: int = 0, ttl: Optional[float] = None, broadcast: bool = False int or list[int] Publish a message to a group or all groups.
pending_count int Return the number of pending messages.
get_unread_services msg_id: int set[str] or None Get services that have not read a message.
set_on_subscribe hook: Callable None Register a hook for subscribe events.
set_on_unsubscribe hook: Callable None Register a hook for unsubscribe events.
set_on_message hook: Callable None Register a hook for message delivery events.
set_on_error hook: Callable None Register a hook for error events.
get_metrics dict Get bus metrics (delivered, failed, pending, expired).

LoggerInjectable

Method Parameters Return Type Description
init logger: Optional, log_path: Optional None Initialize with optional custom logger or log path.
perform_action None Abstract method to be implemented by subclasses.

Message (bus/base_bus.py)

Attribute Type Description
priority int Message priority (lower = higher priority)
msg_id int Unique message identifier
group str Target group
data Any Message payload
expiration float or None Expiration timestamp

Custom Errors

Error Class Description
ServiceBusError Base exception for bus errors
SubscriptionError Raised for subscription problems
MessageNotFoundError Raised if a message ID is not found
GroupNotFoundError Raised if a group does not exist


Testing

Tests are provided for all major features in the tests/ directory. Use pytest to run the test suite:

pytest

Planned Features

The following features are planned and expected to be added in future releases:

  • LoggerInjectable: Multi-Logger Support
    • Official support for additional loggers, including:
      • Python standard library logging.Logger
      • Sentry (sentry_sdk)
      • structlog
      • Logbook
      • Graylog (e.g., graypy)
      • Custom logger implementations
  • Other planned features will be announced as the project evolves.

Unplanned & Possible Future Features

This section lists possible, uncommitted ideas for the future of eventflowsys. These are not planned or promised features—just a collection of creative directions the project could take. Community feedback and real-world needs will shape what (if any) get built.

Unplanned Ideas & Possibilities

  • Event Persistence & Replay: Persist messages to disk or database and replay them for new subscribers or debugging.
  • Dead Letter Queue: Automatically move undeliverable messages to a dead letter queue for inspection or reprocessing.
  • Message Filtering & Transformation: Allow subscribers to filter or transform messages before handling.
  • Wildcard & Pattern Subscriptions: Subscribe to multiple groups/topics using wildcards or regex patterns.
  • Message Acknowledgement & Retry: Require explicit ack from handlers, with configurable retry logic.
  • Event Tracing & Correlation: Add correlation IDs and tracing hooks for distributed debugging.
  • Plugin System: Dynamically load/unload event handler plugins at runtime.
  • Event Schema Validation: Register and enforce schemas for message types.
  • Batching & Scheduling: Support for message batching and scheduled (delayed) delivery.
  • Multi-tenancy & Isolation: Isolate groups of subscribers for multi-tenant scenarios.
  • Cloud Broker Integration: Optional adapters for RabbitMQ, Kafka, Azure Service Bus, etc.
  • Security & Rate Limiting: Message encryption, signing, and per-group/subscriber rate limiting.
  • Custom Priority Strategies: Allow pluggable strategies for message prioritization.
  • Lifecycle Event Hooks: Hooks for bus startup, shutdown, and other lifecycle events.
  • Cloud Broker Adapters: Integrate with RabbitMQ, Kafka, Azure Service Bus, and more.
  • Automated Testing Utilities: Tools for simulating, testing, and validating event flows.
  • Jupyter/Notebook Integration: Interactive event bus usage and visualization in notebooks.
  • Advanced Monitoring/Observability: More built-in metrics, tracing, and logging for all bus operations (no dashboard/CLI planned).
  • Distributed Event Bus: Multi-process or multi-host event bus for distributed systems.
  • WebSocket/REST API Integration: Allow event bus to send/receive over network protocols.
  • Graph-based Routing: Support for complex event routing and transformation graphs.
  • Event Sourcing Patterns: Support for event sourcing and CQRS architectures.
  • AI/ML Event Handlers: Integrate with AI/ML models for smart event processing.
  • Mobile/IoT Adapters: Support for lightweight event bus usage on mobile or IoT devices.
  • Visual Debugging Tools: Export event flow for visualization in external tools.
  • Zero-Config Mode: Auto-discover and wire up services with minimal setup.

This is a living list of ideas, not a roadmap. Suggestions welcome!


Contributing

Contributions are welcome! Please open issues or pull requests for bug fixes, new features, or documentation improvements.


License

MIT License

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages