|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import enum |
| 4 | +from typing import TYPE_CHECKING, cast |
| 5 | + |
| 6 | +from bec_lib import messages |
| 7 | +from bec_lib.connector import MessageObject |
| 8 | +from bec_lib.endpoints import MessageEndpoints |
| 9 | +from bec_lib.logger import bec_logger |
| 10 | +from bec_lib.messaging_services import ( |
| 11 | + SciLogMessagingService, |
| 12 | + SignalMessagingService, |
| 13 | + TeamsMessagingService, |
| 14 | +) |
| 15 | + |
| 16 | +if TYPE_CHECKING: |
| 17 | + from bec_lib.redis_connector import RedisConnector |
| 18 | + |
| 19 | +logger = bec_logger.logger |
| 20 | + |
| 21 | + |
| 22 | +class MessagingEvent(str, enum.Enum): |
| 23 | + """ |
| 24 | + Enumeration of messaging events that can trigger configured hooks. |
| 25 | + """ |
| 26 | + |
| 27 | + SCAN = "new_scan" |
| 28 | + SCAN_COMPLETED = "scan_completed" |
| 29 | + ALARM = "alarm" |
| 30 | + SCAN_INTERLOCK = "scan_interlock" |
| 31 | + |
| 32 | + |
| 33 | +class MessagingManager: |
| 34 | + """ |
| 35 | + Manage notification routing from internal events to concrete messaging |
| 36 | + services. |
| 37 | + """ |
| 38 | + |
| 39 | + def __init__(self, connector: RedisConnector): |
| 40 | + self.connector = connector |
| 41 | + self.config: dict[str, list[messages.NotificationServiceTarget]] = {} |
| 42 | + signal = SignalMessagingService(self.connector) |
| 43 | + scilog = SciLogMessagingService(self.connector) |
| 44 | + teams = TeamsMessagingService(self.connector) |
| 45 | + self._service_by_name = {"signal": signal, "scilog": scilog, "teams": teams} |
| 46 | + |
| 47 | + self.connector.register( |
| 48 | + patterns=MessageEndpoints.notification("*"), cb=self._handle_notification |
| 49 | + ) |
| 50 | + self.connector.register( |
| 51 | + topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config |
| 52 | + ) |
| 53 | + |
| 54 | + config_msg = self.connector.get(MessageEndpoints.notification_config()) |
| 55 | + if config_msg is not None: |
| 56 | + self.on_notification_config(config_msg) |
| 57 | + |
| 58 | + def _handle_notification(self, msg_obj: MessageObject[messages.NotificationMessage], **_kwargs): |
| 59 | + prefix = MessageEndpoints.notification("").endpoint |
| 60 | + event_type_str = msg_obj.topic.removeprefix(prefix) |
| 61 | + self.on_notification(event_type_str, cast(messages.NotificationMessage, msg_obj.value)) |
| 62 | + |
| 63 | + def _handle_notification_config( |
| 64 | + self, msg_obj: MessageObject[messages.NotificationConfigMessage], **_kwargs |
| 65 | + ): |
| 66 | + self.on_notification_config(cast(messages.NotificationConfigMessage, msg_obj.value)) |
| 67 | + |
| 68 | + def on_notification(self, event_type: str, message: messages.NotificationMessage) -> None: |
| 69 | + """ |
| 70 | + Handle a notification event by routing it to the configured messaging services. |
| 71 | +
|
| 72 | + Args: |
| 73 | + event_type(str): The type of the event that triggered the notification. |
| 74 | + message(messages.NotificationMessage): The notification message containing the content to be sent. |
| 75 | + """ |
| 76 | + routes = self.config.get(event_type, []) |
| 77 | + for route in routes: |
| 78 | + service = self._service_by_name.get(route.service_name) |
| 79 | + if service is None: |
| 80 | + logger.warning(f"Unknown messaging service: {route.service_name}") |
| 81 | + continue |
| 82 | + try: |
| 83 | + logger.info( |
| 84 | + f"Routing notification for {event_type} to {route.service_name}: {message.message}" |
| 85 | + ) |
| 86 | + service.new().add_text(message.message).send(scope=route.scope) |
| 87 | + except RuntimeError as exc: |
| 88 | + logger.warning( |
| 89 | + f"Failed to send notification for {event_type} via {route.service_name}: {exc}" |
| 90 | + ) |
| 91 | + |
| 92 | + def on_notification_config(self, message: messages.NotificationConfigMessage) -> None: |
| 93 | + """ |
| 94 | + Update the notification routing configuration based on the received message. |
| 95 | +
|
| 96 | + Args: |
| 97 | + message(NotificationConfigMessage): The message containing the new routing configuration. |
| 98 | + """ |
| 99 | + config: dict[str, list[messages.NotificationServiceTarget]] = {} |
| 100 | + for event_name, targets in message.routes.items(): |
| 101 | + config[event_name] = targets |
| 102 | + self.config = config |
| 103 | + |
| 104 | + def shutdown(self) -> None: |
| 105 | + """ |
| 106 | + Shutdown the messaging manager by unregistering all notification handlers. |
| 107 | + """ |
| 108 | + |
| 109 | + self.connector.unregister( |
| 110 | + patterns=MessageEndpoints.notification("*"), cb=self._handle_notification |
| 111 | + ) |
| 112 | + self.connector.unregister( |
| 113 | + topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config |
| 114 | + ) |
0 commit comments