66from typing import Literal
77
88import numpy as np
9- from bec_lib import messages
109from bec_lib .endpoints import MessageEndpoints
1110from qtpy .QtCore import QObject , QTimer , Signal
1211
@@ -140,7 +139,6 @@ def __init__(self, bec_dispatcher, parent: QObject | None = None):
140139 self .bec_dispatcher = bec_dispatcher
141140 self ._progress_source : ProgressSource | None = None
142141 self ._progress_device : str | None = None
143- self ._queue_connected = False
144142 self .task : ProgressTask | None = None
145143 self .scan_number : int | None = None
146144 self ._active_scan_id : str | None = None
@@ -167,30 +165,9 @@ def start(
167165 * ,
168166 source : ProgressSource | None = ProgressSource .SCAN_PROGRESS ,
169167 device : str | None = None ,
170- connect_queue : bool = False ,
171- refresh_queue : bool = False ,
172168 ) -> None :
173169 if source is not None :
174170 self .set_progress_source (source , device = device )
175- if connect_queue :
176- self .connect_to_queue ()
177- if refresh_queue :
178- self .refresh_queue ()
179-
180- def connect_to_queue (self ) -> None :
181- if self ._queue_connected :
182- return
183- self .bec_dispatcher .connect_slot (self .on_queue_update , MessageEndpoints .scan_queue_status ())
184- self ._queue_connected = True
185-
186- def refresh_queue (self ) -> None :
187- connector = getattr (self .bec_dispatcher .client , "connector" , None )
188- if connector is None :
189- return
190- msg = connector .get (MessageEndpoints .scan_queue_status ())
191- if msg is None :
192- return
193- self .on_queue_update (msg .content , msg .metadata )
194171
195172 def set_progress_source (self , source : ProgressSource , device : str | None = None ) -> None :
196173 if source == ProgressSource .DEVICE_PROGRESS and not device :
@@ -264,42 +241,6 @@ def clear_task(self, *, emit_finished: bool = True) -> None:
264241 if emit_finished :
265242 self .progress_finished .emit (self .current_snapshot (value = 0 , max_value = 100 , done = True ))
266243
267- def on_queue_update (self , msg_content : dict , metadata : dict ):
268- queue_info = self ._extract_active_queue_info (msg_content )
269- if queue_info is None :
270- self .clear_task ()
271- self .set_progress_source (ProgressSource .SCAN_PROGRESS )
272- return
273-
274- active_request_block = queue_info .active_request_block
275- scan_id = active_request_block .scan_id or str (active_request_block .scan_number )
276- rid = getattr (active_request_block , "RID" , None ) or metadata .get ("RID" )
277- if self .task is None or self ._active_scan_id != scan_id :
278- self ._start_task (scan_id , rid = rid )
279- else :
280- self ._active_rid = rid or self ._active_rid
281-
282- self .scan_number = active_request_block .scan_number
283- self .source_changed .emit (self .current_snapshot (value = 0 , max_value = 100 , done = False ))
284-
285- def _extract_active_queue_info (self , msg_content : dict ):
286- if "queue" not in msg_content :
287- return None
288- if "primary" not in msg_content ["queue" ]:
289- return None
290- primary_queue = msg_content .get ("queue" ).get ("primary" )
291- if primary_queue is None or not isinstance (primary_queue , messages .ScanQueueStatus ):
292- return None
293- primary_queue_info = primary_queue .info
294- if len (primary_queue_info ) == 0 :
295- return None
296- scan_info = primary_queue_info [0 ]
297- if scan_info is None or scan_info .active_request_block is None :
298- return None
299- if scan_info .status .lower () != "running" :
300- return None
301- return scan_info
302-
303244 def on_progress_update (self , msg_content : dict , metadata : dict ):
304245 if self ._progress_source is None :
305246 return
@@ -323,6 +264,9 @@ def process_progress_message(
323264 )
324265 scan_id = metadata .get ("scan_id" ) or metadata .get ("RID" )
325266 rid = metadata .get ("RID" )
267+ scan_number = metadata .get ("scan_number" )
268+ if scan_number is not None :
269+ self .scan_number = scan_number
326270 is_new_scan = False
327271 previous_scan_id = self ._active_scan_id
328272 previous_rid = self ._active_rid
@@ -367,8 +311,3 @@ def process_progress_message(
367311 def cleanup (self ) -> None :
368312 self .clear_task (emit_finished = False )
369313 self ._disconnect_progress_source ()
370- if self ._queue_connected :
371- self .bec_dispatcher .disconnect_slot (
372- self .on_queue_update , MessageEndpoints .scan_queue_status ()
373- )
374- self ._queue_connected = False
0 commit comments