Skip to content

Commit 0d03379

Browse files
committed
f
1 parent 278a941 commit 0d03379

2 files changed

Lines changed: 290 additions & 0 deletions

File tree

bec_lib/bec_lib/messaging_services.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import enum
34
import mimetypes
45
import os
56
from abc import ABC
@@ -9,6 +10,7 @@
910
from bec_lib.endpoints import MessageEndpoints
1011

1112
if TYPE_CHECKING:
13+
from bec_lib.connector import MessageObject
1214
from bec_lib.redis_connector import RedisConnector
1315

1416
# Type variable for the message object class
@@ -126,6 +128,7 @@ class MessagingService(ABC, Generic[MessageObjectT]):
126128
def __init__(self, redis_connector: RedisConnector) -> None:
127129
self._redis_connector = redis_connector
128130
self._scopes: set[str] = set()
131+
self._auto_notifications: dict[str, list[str]] = {}
129132
self._enabled = False
130133
self._default_scope: str | list[str] | None = None
131134
self._service_config: messages.AvailableMessagingServicesMessage | None = None
@@ -134,6 +137,12 @@ def __init__(self, redis_connector: RedisConnector) -> None:
134137
cb=self._on_new_scope_change_msg,
135138
from_start=True,
136139
)
140+
self._redis_connector.register(
141+
MessageEndpoints.notification_config(), cb=self._on_notification_config_change_msg
142+
)
143+
config_msg = self._redis_connector.get(MessageEndpoints.notification_config())
144+
if config_msg is not None:
145+
self._update_auto_notifications(config_msg)
137146

138147
def set_default_scope(self, scope: str | list[str] | None) -> None:
139148
"""
@@ -146,6 +155,133 @@ def set_default_scope(self, scope: str | list[str] | None) -> None:
146155
raise ValueError(f"Scope '{scope}' is not available for this messaging service.")
147156
self._default_scope = scope
148157

158+
def set_auto_notifications(
159+
self,
160+
event_type: Literal["new_scan", "scan_completed", "alarm", "scan_interlock"] | str,
161+
enabled: bool,
162+
scopes: list[str] | str | None = None,
163+
) -> None:
164+
"""
165+
Set automatic notifications for a specific event type.
166+
167+
Args:
168+
event_type (Literal["new_scan", "scan_completed", "alarm", "scan_interlock"] | str): The type of event to set notifications for.
169+
enabled (bool): Whether to enable or disable notifications for the event.
170+
scopes (list[str] | str | None): The scopes to apply the notifications to.
171+
"""
172+
event_name = event_type.value if isinstance(event_type, enum.Enum) else event_type
173+
scopes_list: list[str] = []
174+
if scopes is not None:
175+
if isinstance(scopes, str):
176+
scopes_list = [scopes]
177+
else:
178+
scopes_list = scopes
179+
180+
for scope in scopes_list:
181+
if scope not in self._scopes:
182+
raise ValueError(
183+
f"Scope '{scope}' is not available for this messaging service."
184+
)
185+
else:
186+
if self._default_scope is not None:
187+
scopes_list = (
188+
[self._default_scope]
189+
if isinstance(self._default_scope, str)
190+
else self._default_scope
191+
)
192+
elif not self._SUPPORTS_EMPTY_SCOPES:
193+
raise ValueError(
194+
"Scopes must be provided when there is no default scope and empty scopes are not supported."
195+
)
196+
197+
if enabled:
198+
# merge with existing scopes if already enabled for this event type
199+
existing_scopes = set(self._auto_notifications.get(event_name, []))
200+
existing_scopes.update(scopes_list)
201+
self._auto_notifications[event_name] = list(existing_scopes)
202+
else:
203+
# if disabling, remove the scopes for this event type, or the entire event type if no scopes are provided
204+
if scopes_list:
205+
existing_scopes = set(self._auto_notifications.get(event_name, []))
206+
existing_scopes.difference_update(scopes_list)
207+
if existing_scopes:
208+
self._auto_notifications[event_name] = list(existing_scopes)
209+
else:
210+
self._auto_notifications.pop(event_name, None)
211+
else:
212+
self._auto_notifications.pop(event_name, None)
213+
214+
self._sync_auto_notifications_config(event_name, enabled=enabled, scopes=scopes_list)
215+
216+
def _sync_auto_notifications_config(
217+
self, event_name: str, enabled: bool, scopes: list[str]
218+
) -> None:
219+
config_msg = self._redis_connector.get(MessageEndpoints.notification_config())
220+
if config_msg is None:
221+
config_msg = messages.NotificationConfigMessage()
222+
223+
routes = {name: list(targets) for name, targets in config_msg.routes.items()}
224+
event_routes = list(routes.get(event_name, []))
225+
226+
if enabled:
227+
scopes_to_add = scopes or [None]
228+
for scope in scopes_to_add:
229+
target = messages.NotificationServiceTarget(
230+
service_name=self._SERVICE_NAME, scope=scope
231+
)
232+
if not any(existing == target for existing in event_routes):
233+
event_routes.append(target)
234+
else:
235+
if scopes:
236+
event_routes = [
237+
target
238+
for target in event_routes
239+
if not (target.service_name == self._SERVICE_NAME and target.scope in scopes)
240+
]
241+
else:
242+
event_routes = [
243+
target for target in event_routes if target.service_name != self._SERVICE_NAME
244+
]
245+
246+
if event_routes:
247+
routes[event_name] = event_routes
248+
else:
249+
routes.pop(event_name, None)
250+
251+
self._redis_connector.set_and_publish(
252+
MessageEndpoints.notification_config(),
253+
messages.NotificationConfigMessage(routes=routes, metadata=config_msg.metadata),
254+
)
255+
256+
def _on_notification_config_change_msg(
257+
self,
258+
message: (
259+
MessageObject[messages.NotificationConfigMessage]
260+
| dict[str, messages.NotificationConfigMessage]
261+
),
262+
) -> None:
263+
config_msg = message.value if hasattr(message, "value") else message["data"]
264+
self._update_auto_notifications(config_msg)
265+
266+
def _update_auto_notifications(self, config_msg: messages.NotificationConfigMessage) -> None:
267+
auto_notifications: dict[str, list[str]] = {}
268+
for event_name, targets in config_msg.routes.items():
269+
scopes: list[str] = []
270+
has_matching_service = False
271+
for target in targets:
272+
if target.service_name != self._SERVICE_NAME:
273+
continue
274+
has_matching_service = True
275+
if isinstance(target.scope, str):
276+
scopes.append(target.scope)
277+
elif isinstance(target.scope, list):
278+
scopes.extend(target.scope)
279+
280+
if has_matching_service:
281+
auto_notifications[event_name] = list(dict.fromkeys(scopes))
282+
283+
self._auto_notifications = auto_notifications
284+
149285
def _on_new_scope_change_msg(
150286
self, message: dict[str, messages.AvailableMessagingServicesMessage]
151287
) -> None:

bec_lib/tests/test_messaging_service.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import time
2+
13
import pytest
24

35
from bec_lib import messages
46
from bec_lib.endpoints import MessageEndpoints
7+
from bec_lib.messaging_hooks import MessagingEvent
58
from bec_lib.messaging_services import (
69
MessageServiceObject,
710
SciLogMessagingService,
@@ -552,3 +555,154 @@ def test_scilog_add_text_bold_and_color(scilog_message, connected_connector):
552555
text_part.content
553556
== '<p><mark class="pen-red"><strong>Beamline checks failed</strong></mark></p>'
554557
)
558+
559+
560+
def test_set_auto_notifications_persists_notification_config(scilog_service, connected_connector):
561+
scilog_service.set_auto_notifications(MessagingEvent.SCAN, enabled=True, scopes="default")
562+
563+
config_msg = connected_connector.get(MessageEndpoints.notification_config())
564+
assert config_msg == messages.NotificationConfigMessage(
565+
routes={
566+
"new_scan": [
567+
messages.NotificationServiceTarget(service_name="scilog", scope="default")
568+
]
569+
}
570+
)
571+
assert scilog_service._auto_notifications == {"new_scan": ["default"]} # pylint: disable=protected-access
572+
573+
574+
def test_set_auto_notifications_merges_with_existing_routes(scilog_service, connected_connector):
575+
connected_connector.set_and_publish(
576+
MessageEndpoints.notification_config(),
577+
messages.NotificationConfigMessage(
578+
routes={
579+
"new_scan": [
580+
messages.NotificationServiceTarget(service_name="signal", scope="beamline-ops")
581+
]
582+
}
583+
),
584+
)
585+
586+
scilog_service.set_auto_notifications(MessagingEvent.SCAN, enabled=True, scopes="default")
587+
588+
config_msg = connected_connector.get(MessageEndpoints.notification_config())
589+
assert config_msg == messages.NotificationConfigMessage(
590+
routes={
591+
"new_scan": [
592+
messages.NotificationServiceTarget(service_name="signal", scope="beamline-ops"),
593+
messages.NotificationServiceTarget(service_name="scilog", scope="default"),
594+
]
595+
}
596+
)
597+
assert scilog_service._auto_notifications == {"new_scan": ["default"]} # pylint: disable=protected-access
598+
599+
600+
def test_set_auto_notifications_disable_removes_only_matching_service_scope(
601+
scilog_service, connected_connector
602+
):
603+
connected_connector.set_and_publish(
604+
MessageEndpoints.notification_config(),
605+
messages.NotificationConfigMessage(
606+
routes={
607+
"new_scan": [
608+
messages.NotificationServiceTarget(service_name="signal", scope="beamline-ops"),
609+
messages.NotificationServiceTarget(service_name="scilog", scope="default"),
610+
]
611+
}
612+
),
613+
)
614+
615+
scilog_service.set_auto_notifications(MessagingEvent.SCAN, enabled=False, scopes="default")
616+
617+
config_msg = connected_connector.get(MessageEndpoints.notification_config())
618+
assert config_msg == messages.NotificationConfigMessage(
619+
routes={
620+
"new_scan": [
621+
messages.NotificationServiceTarget(service_name="signal", scope="beamline-ops")
622+
]
623+
}
624+
)
625+
assert scilog_service._auto_notifications == {} # pylint: disable=protected-access
626+
627+
628+
def test_set_auto_notifications_uses_default_scope_when_scopes_omitted(
629+
scilog_service, connected_connector
630+
):
631+
scilog_service.set_default_scope("default")
632+
633+
scilog_service.set_auto_notifications(MessagingEvent.SCAN, enabled=True)
634+
635+
config_msg = connected_connector.get(MessageEndpoints.notification_config())
636+
assert config_msg == messages.NotificationConfigMessage(
637+
routes={
638+
"new_scan": [
639+
messages.NotificationServiceTarget(service_name="scilog", scope="default")
640+
]
641+
}
642+
)
643+
assert scilog_service._auto_notifications == {"new_scan": ["default"]} # pylint: disable=protected-access
644+
645+
646+
def test_messaging_service_tracks_external_notification_config_updates(
647+
scilog_service, connected_connector
648+
):
649+
connected_connector.set_and_publish(
650+
MessageEndpoints.notification_config(),
651+
messages.NotificationConfigMessage(
652+
routes={
653+
"new_scan": [
654+
messages.NotificationServiceTarget(service_name="signal", scope="beamline-ops"),
655+
messages.NotificationServiceTarget(service_name="scilog", scope="default"),
656+
],
657+
"alarm": [
658+
messages.NotificationServiceTarget(
659+
service_name="scilog", scope=["default", "secondary"]
660+
)
661+
],
662+
}
663+
),
664+
)
665+
666+
deadline = time.time() + 1
667+
while (
668+
time.time() < deadline
669+
and scilog_service._auto_notifications # pylint: disable=protected-access
670+
!= {"new_scan": ["default"], "alarm": ["default", "secondary"]}
671+
):
672+
time.sleep(0.01)
673+
674+
assert scilog_service._auto_notifications == { # pylint: disable=protected-access
675+
"new_scan": ["default"],
676+
"alarm": ["default", "secondary"],
677+
}
678+
679+
680+
def test_messaging_service_loads_notification_config_on_init(connected_connector):
681+
connected_connector.set_and_publish(
682+
MessageEndpoints.notification_config(),
683+
messages.NotificationConfigMessage(
684+
routes={
685+
"new_scan": [
686+
messages.NotificationServiceTarget(service_name="scilog", scope="default")
687+
]
688+
}
689+
),
690+
)
691+
692+
service = SciLogMessagingService(connected_connector)
693+
available_services = messages.AvailableMessagingServicesMessage(
694+
config=messages.MessagingConfig(
695+
signal=messages.MessagingServiceScopeConfig(enabled=False),
696+
teams=messages.MessagingServiceScopeConfig(enabled=False),
697+
scilog=messages.MessagingServiceScopeConfig(enabled=True),
698+
),
699+
deployment_services=[
700+
messages.SciLogServiceInfo(
701+
id="test_scilog", scope="default", enabled=True, logbook_id="test_logbook"
702+
)
703+
],
704+
session_services=[],
705+
)
706+
service._on_new_scope_change_msg(message={"data": available_services})
707+
708+
assert service._auto_notifications == {"new_scan": ["default"]} # pylint: disable=protected-access

0 commit comments

Comments
 (0)