-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy patheventbus.py
More file actions
31 lines (26 loc) · 764 Bytes
/
Copy patheventbus.py
File metadata and controls
31 lines (26 loc) · 764 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# =============================================================================
# Copyright (c) 2025 Botts Innovative Research Inc.
# Date: 2025/10/6
# Author: Ian Patterson
# Contact Email: ian@botts-inc.com
# =============================================================================
import collections
from typing import Any
from uuid import UUID
from abc import ABC
class Event(ABC):
"""
A base class for events in the event bus system.
"""
id: UUID
topic: str
payload: Any
def __init__(self, id: UUID, topic: str, payload: Any):
self.id = id
self.topic = topic
self.payload = payload
class EventBus(ABC):
"""
A base class for an event bus system.
"""
_deque: collections.deque