Skip to content

Commit f04b8bb

Browse files
committed
feat(notifications): add support for notifications and notification hooks
1 parent afb1ad5 commit f04b8bb

17 files changed

Lines changed: 826 additions & 3 deletions

bec_lib/bec_lib/endpoints.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,6 +1995,38 @@ def available_messaging_services():
19951995
message_op=MessageOp.STREAM,
19961996
)
19971997

1998+
@staticmethod
1999+
def notification(event_type: str):
2000+
"""
2001+
Endpoint for transient notification events that SciHub can route to
2002+
configured messaging services.
2003+
2004+
Args:
2005+
event_type (str): Notification event name such as ``new_scan``.
2006+
2007+
Returns:
2008+
EndpointInfo: Endpoint for notification events.
2009+
"""
2010+
endpoint = f"{EndpointType.INTERNAL.value}/messaging_services/notification/{event_type}"
2011+
return EndpointInfo(
2012+
endpoint=endpoint, message_type=messages.NotificationMessage, message_op=MessageOp.SEND
2013+
)
2014+
2015+
@staticmethod
2016+
def notification_config():
2017+
"""
2018+
Endpoint for persisted notification routing configuration.
2019+
2020+
Returns:
2021+
EndpointInfo: Endpoint for notification routing config.
2022+
"""
2023+
endpoint = f"{EndpointType.USER.value}/messaging_services/notification_config"
2024+
return EndpointInfo(
2025+
endpoint=endpoint,
2026+
message_type=messages.NotificationConfigMessage,
2027+
message_op=MessageOp.SET_PUBLISH,
2028+
)
2029+
19982030
@staticmethod
19992031
def message_service_ingest(deployment_name: str):
20002032
"""

bec_lib/bec_lib/messages.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1728,6 +1728,39 @@ class MessagingConfig(BaseModel):
17281728
scilog: MessagingServiceScopeConfig
17291729

17301730

1731+
class NotificationServiceTarget(BaseModel):
1732+
service_name: Literal["signal", "teams", "scilog"]
1733+
scope: str | list[str] | None = None
1734+
1735+
1736+
class NotificationMessage(BECMessage):
1737+
"""
1738+
Message for notification events that should be routed through configured
1739+
messaging services.
1740+
1741+
Args:
1742+
message (str): Notification body text.
1743+
"""
1744+
1745+
msg_type: ClassVar[str] = "notification_message"
1746+
message: str
1747+
1748+
1749+
class NotificationConfigMessage(BECMessage):
1750+
"""
1751+
Routing configuration for notification events.
1752+
1753+
Args:
1754+
routes: Mapping of event name to messaging service targets.
1755+
"""
1756+
1757+
msg_type: ClassVar[str] = "notification_config_message"
1758+
routes: dict[
1759+
Literal["new_scan", "scan_completed", "alarm", "scan_interlock"] | str,
1760+
list[NotificationServiceTarget],
1761+
] = Field(default_factory=dict)
1762+
1763+
17311764
AvailableMessagingServices = Annotated[
17321765
Union[SignalServiceInfo, SciLogServiceInfo, TeamsServiceInfo],
17331766
Field(discriminator="service_type"),

bec_lib/bec_lib/messaging_hooks.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
from __future__ import annotations
2+
3+
import enum
4+
from typing import TYPE_CHECKING, cast
5+
6+
from bec_lib import messages
7+
from bec_lib.connector import MessageObject
8+
from bec_lib.endpoints import MessageEndpoints
9+
from bec_lib.logger import bec_logger
10+
from bec_lib.messaging_services import (
11+
SciLogMessagingService,
12+
SignalMessagingService,
13+
TeamsMessagingService,
14+
)
15+
16+
if TYPE_CHECKING:
17+
from bec_lib.redis_connector import RedisConnector
18+
19+
logger = bec_logger.logger
20+
21+
22+
class MessagingEvent(str, enum.Enum):
23+
"""
24+
Enumeration of messaging events that can trigger configured hooks.
25+
"""
26+
27+
SCAN = "new_scan"
28+
SCAN_COMPLETED = "scan_completed"
29+
ALARM = "alarm"
30+
SCAN_INTERLOCK = "scan_interlock"
31+
32+
33+
class MessagingManager:
34+
"""
35+
Manage notification routing from internal events to concrete messaging
36+
services.
37+
"""
38+
39+
def __init__(self, connector: RedisConnector):
40+
self.connector = connector
41+
self.config: dict[str, list[messages.NotificationServiceTarget]] = {}
42+
signal = SignalMessagingService(self.connector)
43+
scilog = SciLogMessagingService(self.connector)
44+
teams = TeamsMessagingService(self.connector)
45+
self._service_by_name = {"signal": signal, "scilog": scilog, "teams": teams}
46+
47+
self.connector.register(
48+
patterns=MessageEndpoints.notification("*"), cb=self._handle_notification
49+
)
50+
self.connector.register(
51+
topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config
52+
)
53+
54+
config_msg = self.connector.get(MessageEndpoints.notification_config())
55+
if config_msg is not None:
56+
self.on_notification_config(config_msg)
57+
58+
def _handle_notification(self, msg_obj: MessageObject[messages.NotificationMessage], **_kwargs):
59+
prefix = MessageEndpoints.notification("").endpoint
60+
event_type_str = msg_obj.topic.removeprefix(prefix)
61+
self.on_notification(event_type_str, cast(messages.NotificationMessage, msg_obj.value))
62+
63+
def _handle_notification_config(
64+
self, msg_obj: MessageObject[messages.NotificationConfigMessage], **_kwargs
65+
):
66+
self.on_notification_config(cast(messages.NotificationConfigMessage, msg_obj.value))
67+
68+
def on_notification(self, event_type: str, message: messages.NotificationMessage) -> None:
69+
"""
70+
Handle a notification event by routing it to the configured messaging services.
71+
72+
Args:
73+
event_type(str): The type of the event that triggered the notification.
74+
message(messages.NotificationMessage): The notification message containing the content to be sent.
75+
"""
76+
routes = self.config.get(event_type, [])
77+
for route in routes:
78+
service = self._service_by_name.get(route.service_name)
79+
if service is None:
80+
logger.warning(f"Unknown messaging service: {route.service_name}")
81+
continue
82+
try:
83+
logger.info(
84+
f"Routing notification for {event_type} to {route.service_name}: {message.message}"
85+
)
86+
service.new().add_text(message.message).send(scope=route.scope)
87+
except RuntimeError as exc:
88+
logger.warning(
89+
f"Failed to send notification for {event_type} via {route.service_name}: {exc}"
90+
)
91+
92+
def on_notification_config(self, message: messages.NotificationConfigMessage) -> None:
93+
"""
94+
Update the notification routing configuration based on the received message.
95+
96+
Args:
97+
message(NotificationConfigMessage): The message containing the new routing configuration.
98+
"""
99+
config: dict[str, list[messages.NotificationServiceTarget]] = {}
100+
for event_name, targets in message.routes.items():
101+
config[event_name] = targets
102+
self.config = config
103+
104+
def shutdown(self) -> None:
105+
"""
106+
Shutdown the messaging manager by unregistering all notification handlers.
107+
"""
108+
109+
self.connector.unregister(
110+
patterns=MessageEndpoints.notification("*"), cb=self._handle_notification
111+
)
112+
self.connector.unregister(
113+
topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config
114+
)

bec_lib/bec_lib/messaging_services.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import enum
34
import mimetypes
45
import os
56
from abc import ABC
@@ -9,6 +10,7 @@
910
from bec_lib.endpoints import MessageEndpoints
1011

1112
if TYPE_CHECKING:
13+
from bec_lib.connector import MessageObject
1214
from bec_lib.redis_connector import RedisConnector
1315

1416
# Type variable for the message object class
@@ -126,6 +128,7 @@ class MessagingService(ABC, Generic[MessageObjectT]):
126128
def __init__(self, redis_connector: RedisConnector) -> None:
127129
self._redis_connector = redis_connector
128130
self._scopes: set[str] = set()
131+
self._auto_notifications: dict[str, list[str]] = {}
129132
self._enabled = False
130133
self._default_scope: str | list[str] | None = None
131134
self._service_config: messages.AvailableMessagingServicesMessage | None = None
@@ -134,6 +137,12 @@ def __init__(self, redis_connector: RedisConnector) -> None:
134137
cb=self._on_new_scope_change_msg,
135138
from_start=True,
136139
)
140+
self._redis_connector.register(
141+
MessageEndpoints.notification_config(), cb=self._on_notification_config_change_msg
142+
)
143+
config_msg = self._redis_connector.get(MessageEndpoints.notification_config())
144+
if config_msg is not None:
145+
self._update_auto_notifications(config_msg)
137146

138147
def set_default_scope(self, scope: str | list[str] | None) -> None:
139148
"""
@@ -146,6 +155,133 @@ def set_default_scope(self, scope: str | list[str] | None) -> None:
146155
raise ValueError(f"Scope '{scope}' is not available for this messaging service.")
147156
self._default_scope = scope
148157

158+
def set_auto_notifications(
159+
self,
160+
event_type: Literal["new_scan", "scan_completed", "alarm", "scan_interlock"] | str,
161+
enabled: bool,
162+
scopes: list[str] | str | None = None,
163+
) -> None:
164+
"""
165+
Set automatic notifications for a specific event type.
166+
167+
Args:
168+
event_type (Literal["new_scan", "scan_completed", "alarm", "scan_interlock"] | str): The type of event to set notifications for.
169+
enabled (bool): Whether to enable or disable notifications for the event.
170+
scopes (list[str] | str | None): The scopes to apply the notifications to.
171+
"""
172+
event_name = event_type.value if isinstance(event_type, enum.Enum) else event_type
173+
scopes_list: list[str] = []
174+
if scopes is not None:
175+
if isinstance(scopes, str):
176+
scopes_list = [scopes]
177+
else:
178+
scopes_list = scopes
179+
180+
for scope in scopes_list:
181+
if scope not in self._scopes:
182+
raise ValueError(
183+
f"Scope '{scope}' is not available for this messaging service."
184+
)
185+
else:
186+
if self._default_scope is not None:
187+
scopes_list = (
188+
[self._default_scope]
189+
if isinstance(self._default_scope, str)
190+
else self._default_scope
191+
)
192+
elif not self._SUPPORTS_EMPTY_SCOPES:
193+
raise ValueError(
194+
"Scopes must be provided when there is no default scope and empty scopes are not supported."
195+
)
196+
197+
if enabled:
198+
# merge with existing scopes if already enabled for this event type
199+
existing_scopes = set(self._auto_notifications.get(event_name, []))
200+
existing_scopes.update(scopes_list)
201+
self._auto_notifications[event_name] = list(existing_scopes)
202+
else:
203+
# if disabling, remove the scopes for this event type, or the entire event type if no scopes are provided
204+
if scopes_list:
205+
existing_scopes = set(self._auto_notifications.get(event_name, []))
206+
existing_scopes.difference_update(scopes_list)
207+
if existing_scopes:
208+
self._auto_notifications[event_name] = list(existing_scopes)
209+
else:
210+
self._auto_notifications.pop(event_name, None)
211+
else:
212+
self._auto_notifications.pop(event_name, None)
213+
214+
self._sync_auto_notifications_config(event_name, enabled=enabled, scopes=scopes_list)
215+
216+
def _sync_auto_notifications_config(
217+
self, event_name: str, enabled: bool, scopes: list[str]
218+
) -> None:
219+
config_msg = self._redis_connector.get(MessageEndpoints.notification_config())
220+
if config_msg is None:
221+
config_msg = messages.NotificationConfigMessage()
222+
223+
routes = {name: list(targets) for name, targets in config_msg.routes.items()}
224+
event_routes = list(routes.get(event_name, []))
225+
226+
if enabled:
227+
scopes_to_add = scopes or [None]
228+
for scope in scopes_to_add:
229+
target = messages.NotificationServiceTarget(
230+
service_name=self._SERVICE_NAME, scope=scope
231+
)
232+
if not any(existing == target for existing in event_routes):
233+
event_routes.append(target)
234+
else:
235+
if scopes:
236+
event_routes = [
237+
target
238+
for target in event_routes
239+
if not (target.service_name == self._SERVICE_NAME and target.scope in scopes)
240+
]
241+
else:
242+
event_routes = [
243+
target for target in event_routes if target.service_name != self._SERVICE_NAME
244+
]
245+
246+
if event_routes:
247+
routes[event_name] = event_routes
248+
else:
249+
routes.pop(event_name, None)
250+
251+
self._redis_connector.set_and_publish(
252+
MessageEndpoints.notification_config(),
253+
messages.NotificationConfigMessage(routes=routes, metadata=config_msg.metadata),
254+
)
255+
256+
def _on_notification_config_change_msg(
257+
self,
258+
message: (
259+
MessageObject[messages.NotificationConfigMessage]
260+
| dict[str, messages.NotificationConfigMessage]
261+
),
262+
) -> None:
263+
config_msg = message.value if hasattr(message, "value") else message["data"]
264+
self._update_auto_notifications(config_msg)
265+
266+
def _update_auto_notifications(self, config_msg: messages.NotificationConfigMessage) -> None:
267+
auto_notifications: dict[str, list[str]] = {}
268+
for event_name, targets in config_msg.routes.items():
269+
scopes: list[str] = []
270+
has_matching_service = False
271+
for target in targets:
272+
if target.service_name != self._SERVICE_NAME:
273+
continue
274+
has_matching_service = True
275+
if isinstance(target.scope, str):
276+
scopes.append(target.scope)
277+
elif isinstance(target.scope, list):
278+
scopes.extend(target.scope)
279+
280+
if has_matching_service:
281+
auto_notifications[event_name] = list(dict.fromkeys(scopes))
282+
283+
self._auto_notifications = auto_notifications
284+
149285
def _on_new_scope_change_msg(
150286
self, message: dict[str, messages.AvailableMessagingServicesMessage]
151287
) -> None:

bec_lib/bec_lib/redis_connector.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
DynamicMetricDict,
5858
DynamicMetricMessage,
5959
ErrorInfo,
60+
NotificationMessage,
6061
)
62+
from bec_lib.messaging_hooks import MessagingEvent
6163
from bec_lib.serialization import MsgpackSerialization
6264

6365
logger = bec_logger.logger
@@ -655,6 +657,23 @@ def raise_alarm(self, severity: Alarms, info: ErrorInfo, metadata: dict | None =
655657
"""
656658
alarm_msg = AlarmMessage(severity=severity, info=info, metadata=metadata or {})
657659
self.set_and_publish(MessageEndpoints.alarm(), alarm_msg)
660+
compact_message = info.compact_error_message or info.error_message or info.exception_type
661+
self.notify(MessagingEvent.ALARM, compact_message)
662+
663+
def notify(self, event: MessagingEvent, message: str, pipe: Pipeline | None = None) -> None:
664+
"""
665+
Publish a notification event for downstream routing by SciHub.
666+
667+
Args:
668+
event(MessagingEvent): The type of the event that triggered the notification.
669+
message(str): The notification message containing the content to be sent.
670+
pipe(Pipeline, optional): Optional pipeline to enqueue the publish operation into.
671+
"""
672+
self.send(
673+
MessageEndpoints.notification(event.value),
674+
NotificationMessage(message=message),
675+
pipe=pipe,
676+
)
658677

659678
def pipeline(self) -> redis.client.Pipeline:
660679
"""Create a new pipeline"""

0 commit comments

Comments
 (0)