Skip to content

Commit 8896e31

Browse files
committed
feat(notifications): add support for notifications and notification hooks
1 parent 8714ff8 commit 8896e31

14 files changed

Lines changed: 416 additions & 1 deletion

File tree

bec_lib/bec_lib/endpoints.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,6 +1995,38 @@ def available_messaging_services():
19951995
message_op=MessageOp.STREAM,
19961996
)
19971997

1998+
@staticmethod
1999+
def notification(event_type: str):
2000+
"""
2001+
Endpoint for transient notification events that SciHub can route to
2002+
configured messaging services.
2003+
2004+
Args:
2005+
event_type (str): Notification event name such as ``new_scan``.
2006+
2007+
Returns:
2008+
EndpointInfo: Endpoint for notification events.
2009+
"""
2010+
endpoint = f"{EndpointType.INTERNAL.value}/messaging_services/notification/{event_type}"
2011+
return EndpointInfo(
2012+
endpoint=endpoint, message_type=messages.NotificationMessage, message_op=MessageOp.SEND
2013+
)
2014+
2015+
@staticmethod
2016+
def notification_config():
2017+
"""
2018+
Endpoint for persisted notification routing configuration.
2019+
2020+
Returns:
2021+
EndpointInfo: Endpoint for notification routing config.
2022+
"""
2023+
endpoint = f"{EndpointType.USER.value}/messaging_services/notification_config"
2024+
return EndpointInfo(
2025+
endpoint=endpoint,
2026+
message_type=messages.NotificationConfigMessage,
2027+
message_op=MessageOp.SET_PUBLISH,
2028+
)
2029+
19982030
@staticmethod
19992031
def message_service_ingest(deployment_name: str):
20002032
"""

bec_lib/bec_lib/messages.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1728,6 +1728,39 @@ class MessagingConfig(BaseModel):
17281728
scilog: MessagingServiceScopeConfig
17291729

17301730

1731+
class NotificationServiceTarget(BaseModel):
1732+
service_name: Literal["signal", "teams", "scilog"]
1733+
scope: str | list[str] | None = None
1734+
1735+
1736+
class NotificationMessage(BECMessage):
1737+
"""
1738+
Message for notification events that should be routed through configured
1739+
messaging services.
1740+
1741+
Args:
1742+
message (str): Notification body text.
1743+
"""
1744+
1745+
msg_type: ClassVar[str] = "notification_message"
1746+
message: str
1747+
1748+
1749+
class NotificationConfigMessage(BECMessage):
1750+
"""
1751+
Routing configuration for notification events.
1752+
1753+
Args:
1754+
routes: Mapping of event name to messaging service targets.
1755+
"""
1756+
1757+
msg_type: ClassVar[str] = "notification_config_message"
1758+
routes: dict[
1759+
Literal["new_scan", "scan_completed", "alarm", "scan_interlock"] | str,
1760+
list[NotificationServiceTarget],
1761+
] = Field(default_factory=dict)
1762+
1763+
17311764
AvailableMessagingServices = Annotated[
17321765
Union[SignalServiceInfo, SciLogServiceInfo, TeamsServiceInfo],
17331766
Field(discriminator="service_type"),

bec_lib/bec_lib/messaging_hooks.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
from __future__ import annotations
2+
3+
import enum
4+
from typing import TYPE_CHECKING, cast
5+
6+
from bec_lib import messages
7+
from bec_lib.connector import MessageObject
8+
from bec_lib.endpoints import MessageEndpoints
9+
from bec_lib.logger import bec_logger
10+
from bec_lib.messaging_services import (
11+
SciLogMessagingService,
12+
SignalMessagingService,
13+
TeamsMessagingService,
14+
)
15+
16+
if TYPE_CHECKING:
17+
from bec_lib.redis_connector import RedisConnector
18+
19+
logger = bec_logger.logger
20+
21+
22+
class MessagingEvent(str, enum.Enum):
23+
"""
24+
Enumeration of messaging events that can trigger configured hooks.
25+
"""
26+
27+
SCAN = "new_scan"
28+
SCAN_COMPLETED = "scan_completed"
29+
ALARM = "alarm"
30+
SCAN_INTERLOCK = "scan_interlock"
31+
32+
33+
class MessagingManager:
34+
"""
35+
Manage notification routing from internal events to concrete messaging
36+
services.
37+
"""
38+
39+
def __init__(self, connector: RedisConnector):
40+
self.connector = connector
41+
self.config: dict[str, list[messages.NotificationServiceTarget]] = {}
42+
signal = SignalMessagingService(self.connector)
43+
scilog = SciLogMessagingService(self.connector)
44+
teams = TeamsMessagingService(self.connector)
45+
self._service_by_name = {"signal": signal, "scilog": scilog, "teams": teams}
46+
47+
self.connector.register(
48+
patterns=MessageEndpoints.notification("*"), cb=self._handle_notification
49+
)
50+
self.connector.register(
51+
topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config
52+
)
53+
54+
config_msg = self.connector.get(MessageEndpoints.notification_config())
55+
if config_msg is not None:
56+
self.on_notification_config(config_msg)
57+
58+
def _handle_notification(self, msg_obj: MessageObject[messages.NotificationMessage], **_kwargs):
59+
prefix = MessageEndpoints.notification("").endpoint
60+
event_type_str = msg_obj.topic.removeprefix(prefix)
61+
self.on_notification(event_type_str, cast(messages.NotificationMessage, msg_obj.value))
62+
63+
def _handle_notification_config(
64+
self, msg_obj: MessageObject[messages.NotificationConfigMessage], **_kwargs
65+
):
66+
self.on_notification_config(cast(messages.NotificationConfigMessage, msg_obj.value))
67+
68+
def on_notification(self, event_type: str, message: messages.NotificationMessage) -> None:
69+
"""
70+
Handle a notification event by routing it to the configured messaging services.
71+
72+
Args:
73+
event_type(str): The type of the event that triggered the notification.
74+
message(messages.NotificationMessage): The notification message containing the content to be sent.
75+
"""
76+
routes = self.config.get(event_type, [])
77+
for route in routes:
78+
service = self._service_by_name.get(route.service_name)
79+
if service is None:
80+
logger.warning(f"Unknown messaging service: {route.service_name}")
81+
continue
82+
try:
83+
logger.info(
84+
f"Routing notification for {event_type} to {route.service_name}: {message.message}"
85+
)
86+
service.new().add_text(message.message).send(scope=route.scope)
87+
except RuntimeError as exc:
88+
logger.warning(
89+
f"Failed to send notification for {event_type} via {route.service_name}: {exc}"
90+
)
91+
92+
def on_notification_config(self, message: messages.NotificationConfigMessage) -> None:
93+
"""
94+
Update the notification routing configuration based on the received message.
95+
96+
Args:
97+
message(NotificationConfigMessage): The message containing the new routing configuration.
98+
"""
99+
config: dict[str, list[messages.NotificationServiceTarget]] = {}
100+
for event_name, targets in message.routes.items():
101+
config[event_name] = targets
102+
self.config = config
103+
104+
def shutdown(self) -> None:
105+
"""
106+
Shutdown the messaging manager by unregistering all notification handlers.
107+
"""
108+
109+
self.connector.unregister(
110+
patterns=MessageEndpoints.notification("*"), cb=self._handle_notification
111+
)
112+
self.connector.unregister(
113+
topics=MessageEndpoints.notification_config(), cb=self._handle_notification_config
114+
)

bec_lib/bec_lib/redis_connector.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
DynamicMetricDict,
5858
DynamicMetricMessage,
5959
ErrorInfo,
60+
NotificationMessage,
6061
)
62+
from bec_lib.messaging_hooks import MessagingEvent
6163
from bec_lib.serialization import MsgpackSerialization
6264

6365
logger = bec_logger.logger
@@ -655,6 +657,23 @@ def raise_alarm(self, severity: Alarms, info: ErrorInfo, metadata: dict | None =
655657
"""
656658
alarm_msg = AlarmMessage(severity=severity, info=info, metadata=metadata or {})
657659
self.set_and_publish(MessageEndpoints.alarm(), alarm_msg)
660+
compact_message = info.compact_error_message or info.error_message or info.exception_type
661+
self.notify(MessagingEvent.ALARM, compact_message)
662+
663+
def notify(self, event: MessagingEvent, message: str, pipe: Pipeline | None = None) -> None:
664+
"""
665+
Publish a notification event for downstream routing by SciHub.
666+
667+
Args:
668+
event(MessagingEvent): The type of the event that triggered the notification.
669+
message(str): The notification message containing the content to be sent.
670+
pipe(Pipeline, optional): Optional pipeline to enqueue the publish operation into.
671+
"""
672+
self.send(
673+
MessageEndpoints.notification(event.value),
674+
NotificationMessage(message=message),
675+
pipe=pipe,
676+
)
658677

659678
def pipeline(self) -> redis.client.Pipeline:
660679
"""Create a new pipeline"""

bec_lib/bec_lib/tests/utils.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,13 @@ def send(self, topic, msg, pipe=None):
561561
raise TypeError("Message must be a BECMessage")
562562
return self.raw_send(topic, msg, pipe)
563563

564+
def notify(self, event, message: str, pipe=None):
565+
return self.send(
566+
MessageEndpoints.notification(event.value),
567+
messages.NotificationMessage(message=message),
568+
pipe=pipe,
569+
)
570+
564571
def set_and_publish(self, topic, msg, pipe=None, expire: int = None):
565572
if pipe:
566573
pipe._pipe_buffer.append(("set_and_publish", (topic.endpoint, msg), {"expire": expire}))

bec_lib/tests/test_bec_messages.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,31 @@ def test_ClientInfoMessage_raises():
134134
)
135135

136136

137+
def test_NotificationMessage():
138+
msg = messages.NotificationMessage(message="Scan started")
139+
res = MsgpackSerialization.dumps(msg)
140+
res_loaded = MsgpackSerialization.loads(res)
141+
assert res_loaded == msg
142+
143+
144+
def test_NotificationConfigMessage():
145+
msg = messages.NotificationConfigMessage(
146+
routes={
147+
"new_scan": [
148+
messages.NotificationServiceTarget(service_name="scilog", scope="logbook")
149+
],
150+
"alarm": [
151+
messages.NotificationServiceTarget(
152+
service_name="signal", scope=["+41791234567", "+41797654321"]
153+
)
154+
],
155+
}
156+
)
157+
res = MsgpackSerialization.dumps(msg)
158+
res_loaded = MsgpackSerialization.loads(res)
159+
assert res_loaded == msg
160+
161+
137162
def test_DeviceRPCMessage():
138163
msg = messages.DeviceRPCMessage(
139164
device="samx", return_val=1, out="done", success=True, metadata={"RID": "1234"}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from bec_lib import messages
2+
from bec_lib.endpoints import MessageEndpoints, MessageOp
3+
from bec_lib.messaging_hooks import MessagingEvent, MessagingManager
4+
5+
6+
def _available_services_message():
7+
return messages.AvailableMessagingServicesMessage(
8+
config=messages.MessagingConfig(
9+
signal=messages.MessagingServiceScopeConfig(enabled=True, default=None),
10+
teams=messages.MessagingServiceScopeConfig(enabled=False, default=None),
11+
scilog=messages.MessagingServiceScopeConfig(enabled=True, default=None),
12+
),
13+
deployment_services=[
14+
messages.SciLogServiceInfo(
15+
id="scilog-default", scope="logbook", enabled=True, logbook_id="lb-1"
16+
)
17+
],
18+
session_services=[],
19+
)
20+
21+
22+
def test_notification_endpoints():
23+
event_endpoint = MessageEndpoints.notification("new_scan")
24+
config_endpoint = MessageEndpoints.notification_config()
25+
26+
assert event_endpoint.endpoint == "internal/messaging_services/notification/new_scan"
27+
assert event_endpoint.message_type is messages.NotificationMessage
28+
assert event_endpoint.message_op == MessageOp.SEND
29+
30+
assert config_endpoint.endpoint == "user/messaging_services/notification_config"
31+
assert config_endpoint.message_type is messages.NotificationConfigMessage
32+
assert config_endpoint.message_op == MessageOp.SET_PUBLISH
33+
34+
35+
def test_messaging_manager_loads_initial_config(connected_connector):
36+
config_msg = messages.NotificationConfigMessage(
37+
routes={
38+
"new_scan": [messages.NotificationServiceTarget(service_name="scilog", scope="logbook")]
39+
}
40+
)
41+
connected_connector.set_and_publish(MessageEndpoints.notification_config(), config_msg)
42+
43+
manager = MessagingManager(connected_connector)
44+
try:
45+
assert manager.config == {
46+
MessagingEvent.SCAN: [
47+
messages.NotificationServiceTarget(service_name="scilog", scope="logbook")
48+
]
49+
}
50+
finally:
51+
manager.shutdown()
52+
53+
54+
def test_messaging_manager_routes_notifications_to_message_service_queue(connected_connector):
55+
manager = MessagingManager(connected_connector)
56+
try:
57+
available_services = _available_services_message()
58+
manager.scilog._on_new_scope_change_msg({"data": available_services})
59+
manager.signal._on_new_scope_change_msg({"data": available_services})
60+
manager.teams._on_new_scope_change_msg({"data": available_services})
61+
62+
manager.on_notification_config(
63+
messages.NotificationConfigMessage(
64+
routes={
65+
"new_scan": [
66+
messages.NotificationServiceTarget(service_name="scilog", scope="logbook")
67+
]
68+
}
69+
)
70+
)
71+
72+
manager.on_notification(
73+
MessagingEvent.SCAN, messages.NotificationMessage(message="Scan started")
74+
)
75+
76+
out = connected_connector.xread(MessageEndpoints.message_service_queue(), from_start=True)
77+
assert len(out) == 1
78+
sent_message = out[0]["data"]
79+
assert sent_message.service_name == "scilog"
80+
assert sent_message.scope == "logbook"
81+
assert isinstance(sent_message.message[0], messages.MessagingServiceTextContent)
82+
assert sent_message.message[0].content == "Scan started"
83+
assert isinstance(sent_message.message[1], messages.MessagingServiceTagsContent)
84+
assert sent_message.message[1].tags == ["bec"]
85+
finally:
86+
manager.shutdown()

bec_lib/tests/test_redis_connector.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
ClientInfoMessage,
1616
ProcedureExecutionMessage,
1717
)
18+
from bec_lib.messaging_hooks import MessagingEvent
1819
from bec_lib.redis_connector import (
1920
IncompatibleMessageForEndpoint,
2021
IncompatibleRedisOperation,
@@ -71,7 +72,10 @@ def test_redis_connector_send_client_info(connector):
7172
],
7273
)
7374
def test_redis_connector_raise_alarm(connector, severity, alarm_type, msg, compact_msg, metadata):
74-
with mock.patch.object(connector, "set_and_publish", return_value=None):
75+
with (
76+
mock.patch.object(connector, "set_and_publish", return_value=None),
77+
mock.patch.object(connector, "notify", return_value=None),
78+
):
7579
info = messages.ErrorInfo(
7680
error_message=msg, compact_error_message=compact_msg, exception_type=alarm_type
7781
)
@@ -80,6 +84,7 @@ def test_redis_connector_raise_alarm(connector, severity, alarm_type, msg, compa
8084
connector.set_and_publish.assert_called_once_with(
8185
MessageEndpoints.alarm(), AlarmMessage(severity=severity, info=info, metadata=metadata)
8286
)
87+
connector.notify.assert_called_once_with(MessagingEvent.ALARM, compact_msg)
8388

8489

8590
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)