11from threading import Event , Thread
2+ from typing import TypeVar
23
34from bec_lib .client import BECClient , ServiceConfig
45from bec_lib .connector import MessageObject
78from bec_lib .messages import (
89 BuiltinActorStateChangeNotification ,
910 BuiltinActorStateUpdatedNotification ,
11+ ScanInterlockModifyStateTableMessage ,
12+ ScanInterlockStateTableContent ,
1013)
1114from bec_server .actors .actor import ActorBase
1215from bec_server .actors .scan_interlock import ScanInterlockActor
1316
1417logger = bec_logger .logger
1518
19+ ActorType = TypeVar ("ActorType" , bound = ActorBase )
20+
21+
22+ class ActorDict (dict ):
23+ def __setitem__ (self , key : type [ActorType ], value : tuple [ActorType , Thread , Event ], / ) -> None :
24+ return super ().__setitem__ (key , value )
25+
26+ def __getitem__ (self , key : type [ActorType ], / ) -> tuple [ActorType , Thread , Event ]:
27+ return super ().__getitem__ (key )
28+
29+ def get ( # type: ignore
30+ self , key : type [ActorType ], default : tuple [ActorType , Thread , Event ] | None = None
31+ ) -> tuple [ActorType , Thread , Event ]:
32+ return super ().get (key ) # type: ignore
33+
1634
1735class BuiltinActorManager :
1836 """A simple manager for builtin actors which are always available - only handles enabling and
@@ -25,12 +43,15 @@ def __init__(self, bootstrap_server: str) -> None:
2543 name = "BuiltinActors" ,
2644 )
2745 self ._client .start ()
28- self ._actors_threads_and_stops : dict [ str , tuple [ ActorBase , Thread , Event ]] = {}
46+ self ._actors_threads_and_stops = ActorDict ()
2947 self ._builtin_actors = {cls .__name__ : cls for cls in (ScanInterlockActor ,)}
3048 self ._start_all ()
3149 self ._client .connector .register (
3250 MessageEndpoints .builtin_actor_update_req_notif (), cb = self ._on_state_changed
3351 )
52+ self ._client .connector .register (
53+ MessageEndpoints .modify_interlock_table (), cb = self ._modify_interlock_table
54+ )
3455
3556 def _ping_clients (self , actor_name : str ):
3657 self ._client .connector .send (
@@ -51,33 +72,68 @@ def _on_state_changed(self, msg_obj: MessageObject):
5172 self ._ping_clients (msg .actor_name )
5273
5374 def _start_all (self ):
54- for actor_class in self ._builtin_actors . values () :
55- if self ._client .builtin_actors .check_enabled (actor_class . __name__ ):
56- self ._start_actor (actor_class )
75+ for actor_class_name in self ._builtin_actors :
76+ if self ._client .builtin_actors .check_enabled (actor_class_name ):
77+ self ._start_actor (self . _builtin_actors [ actor_class_name ] )
5778
5879 def _start_actor (self , actor_class : type [ActorBase ]):
5980 name = actor_class .__name__
6081 logger .info (f"Starting { name } " )
61- if name in self ._actors_threads_and_stops :
82+ if actor_class in self ._actors_threads_and_stops :
6283 logger .warning (f"Actor { name } is already active!" )
6384 return
6485 actor = actor_class (self ._client , name = name , exec_id = name )
6586 t = Thread (target = actor .run )
66- self ._actors_threads_and_stops [name ] = (actor , t , actor .stop_event )
87+ self ._actors_threads_and_stops [actor_class ] = (actor , t , actor .stop_event )
6788 t .start ()
6889
6990 def _stop_actor (self , actor_name : str ):
7091 logger .info (f"Stopping { actor_name } " )
71- if (entry := self ._actors_threads_and_stops .get (actor_name )) is None :
92+ actor_class = self ._builtin_actors .get (actor_name )
93+ if (entry := self ._actors_threads_and_stops .get (actor_class )) is None :
7294 logger .warning (f"Actor { actor_name } is not active!" )
7395 return
7496 actor , t , event = entry
7597 event .set ()
7698 t .join ()
77- del self ._actors_threads_and_stops [actor_name ]
99+ del self ._actors_threads_and_stops [actor_class ]
78100 del actor
79101
80102 def shutdown (self ):
81103 for actor in self ._actors_threads_and_stops :
82104 self ._stop_actor (actor )
83105 self ._client .shutdown ()
106+
107+ # Actor specific management methods:
108+ def _modify_interlock_table (self , msg_dict ):
109+ """Update the watched states for ScanInterlockActor - handled by the actor itself if it is
110+ active, otherwise just the config in redis is updated."""
111+ msg : ScanInterlockModifyStateTableMessage = msg_dict ["data" ]
112+ if (ats := self ._actors_threads_and_stops .get (ScanInterlockActor )) is not None :
113+ actor , _ , _ = ats
114+ actor ._on_state_modification (msg )
115+ else :
116+ states : ScanInterlockStateTableContent | None = self ._client .connector .get (
117+ MessageEndpoints .scan_interlock_states ()
118+ )
119+ current_watched = states .states_watched if states is not None else {}
120+ if msg .action == "add" :
121+ logger .info (f"Adding { msg .state_name } to the scan interlock actor" )
122+ current_watched [msg .state_name ] = msg .status
123+ self ._client .connector .set (
124+ MessageEndpoints .scan_interlock_states (),
125+ ScanInterlockStateTableContent (states_watched = current_watched ),
126+ )
127+ elif msg .action == "remove_all" :
128+ self ._client .connector .set (
129+ MessageEndpoints .scan_interlock_states (),
130+ ScanInterlockStateTableContent (states_watched = {}),
131+ )
132+ else :
133+ logger .info (f"Removing { msg .state_name } from the scan interlock actor" )
134+ current_watched .pop (msg .state_name , None )
135+ self ._client .connector .set (
136+ MessageEndpoints .scan_interlock_states (),
137+ ScanInterlockStateTableContent (states_watched = current_watched ),
138+ )
139+ self ._ping_clients ("ScanInterlockActor" )
0 commit comments