Skip to content

Commit 09321b0

Browse files
committed
fix: implement request ID handling for scan abortion
1 parent afb1ad5 commit 09321b0

7 files changed

Lines changed: 300 additions & 37 deletions

File tree

bec_ipython_client/bec_ipython_client/callbacks/ipython_live_updates.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,14 @@ def process_request(self, request: messages.ScanQueueMessage, callbacks: Any) ->
209209
self._wait_for_cleanup()
210210
self._reset(forced=True)
211211
raise scan_interr
212+
except KeyboardInterrupt as exc:
213+
self._stop_status_live()
214+
if self.client._service_config.abort_on_ctrl_c and self._abort_pending_request():
215+
self._wait_for_cleanup()
216+
self._reset(forced=True)
217+
raise ScanInterruption("User abort.") from exc
218+
self._reset(forced=True)
219+
raise
212220
finally:
213221
self._stop_status_live()
214222

@@ -224,6 +232,24 @@ def _wait_for_cleanup(self):
224232
except KeyboardInterrupt:
225233
self.client.queue.request_scan_halt()
226234

235+
def _abort_pending_request(self) -> bool:
236+
"""Abort the pending queue item currently being tracked by this live update."""
237+
if self._current_queue is None or self.client.queue is None:
238+
return False
239+
if self._current_queue.status != "PENDING":
240+
return False
241+
scan_ids = [scan_id for scan_id in self._current_queue.scan_ids if scan_id is not None]
242+
if scan_ids:
243+
self.client.queue.request_scan_abortion(scan_id=scan_ids)
244+
return True
245+
if self._active_request is None:
246+
return False
247+
request_id = self._active_request.metadata.get("RID")
248+
if request_id is None:
249+
return False
250+
self.client.queue.request_scan_abortion(request_id=request_id)
251+
return True
252+
227253
def _element_in_queue(self) -> bool:
228254
if self.client.queue is None:
229255
return False
@@ -237,7 +263,7 @@ def _element_in_queue(self) -> bool:
237263
return False
238264
if self._current_queue is None:
239265
return False
240-
return self._current_queue.queue_id == queue_info[0].queue_id
266+
return any(queue_item.queue_id == self._current_queue.queue_id for queue_item in queue_info)
241267

242268
def _process_queue(
243269
self, queue: QueueItem, request: messages.ScanQueueMessage, req_id: str

bec_ipython_client/tests/client_tests/test_ipython_live_updates.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,25 @@ def test_process_request_repeats_on_ScanRestart_error(
173173
assert live_updates._stop_status_live.call_count == 5
174174

175175

176+
def test_abort_pending_request_requests_abortion_by_scan_ids(bec_client_mock):
177+
live_updates = IPythonLiveUpdates(bec_client_mock)
178+
live_updates._current_queue = mock.MagicMock(status="PENDING", scan_ids=["scan_id", None])
179+
180+
with mock.patch.object(live_updates.client.queue, "request_scan_abortion") as request_abort:
181+
assert live_updates._abort_pending_request() is True
182+
request_abort.assert_called_once_with(scan_id=["scan_id"])
183+
184+
185+
def test_abort_pending_request_falls_back_to_request_id(bec_client_mock, sample_request_msg):
186+
live_updates = IPythonLiveUpdates(bec_client_mock)
187+
live_updates._current_queue = mock.MagicMock(status="PENDING", scan_ids=[None])
188+
live_updates._active_request = sample_request_msg
189+
190+
with mock.patch.object(live_updates.client.queue, "request_scan_abortion") as request_abort:
191+
assert live_updates._abort_pending_request() is True
192+
request_abort.assert_called_once_with(request_id="something")
193+
194+
176195
@pytest.mark.timeout(20)
177196
def test_live_updates_process_queue_without_status(bec_client_mock, queue_elements):
178197
client = bec_client_mock
@@ -382,6 +401,39 @@ def test_element_in_queue_queue_id_in_info(bec_client_mock, sample_request_block
382401
assert live_updates._element_in_queue() is True
383402

384403

404+
@pytest.mark.timeout(20)
405+
def test_element_in_queue_queue_id_in_later_position(bec_client_mock, sample_request_block):
406+
client = bec_client_mock
407+
live_updates = IPythonLiveUpdates(client)
408+
409+
current_queue = mock.MagicMock()
410+
current_queue.queue_id = "my_queue_id"
411+
live_updates._current_queue = current_queue
412+
413+
first_entry = messages.QueueInfoEntry(
414+
queue_id="different_queue_id",
415+
scan_id=["scan_id_1"],
416+
is_scan=[True],
417+
request_blocks=[sample_request_block],
418+
scan_number=[1],
419+
status="RUNNING",
420+
active_request_block=None,
421+
)
422+
second_entry = messages.QueueInfoEntry(
423+
queue_id="my_queue_id",
424+
scan_id=["scan_id_2"],
425+
is_scan=[True],
426+
request_blocks=[sample_request_block],
427+
scan_number=[2],
428+
status="PENDING",
429+
active_request_block=None,
430+
)
431+
scan_queue_status = messages.ScanQueueStatus(info=[first_entry, second_entry], status="LOCKED")
432+
client.queue.queue_storage.current_scan_queue = {"primary": scan_queue_status}
433+
434+
assert live_updates._element_in_queue() is True
435+
436+
385437
@pytest.mark.timeout(20)
386438
def test_process_pending_queue_element_locked_queue(
387439
ipython_live_updates_with_mocked_live, queue_elements

bec_lib/bec_lib/messages.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ class ScanQueueModificationMessage(BECMessage):
270270
271271
Args:
272272
scan_id (str): Unique scan ID
273+
request_id (str | None): Request ID to target when no scan ID is available yet
273274
action (str): One of the actions defined in ACTIONS:
274275
"pause",
275276
"deferred_pause",
@@ -292,6 +293,7 @@ class ScanQueueModificationMessage(BECMessage):
292293

293294
msg_type: ClassVar[str] = "scan_queue_modification"
294295
scan_id: str | list[str] | None | list[None]
296+
request_id: str | None = None
295297
action: Literal[
296298
"pause",
297299
"deferred_pause",

bec_lib/bec_lib/scan_manager.py

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,75 +92,95 @@ def request_scan_interruption(self, deferred_pause=True, scan_id: str = None) ->
9292
messages.ScanQueueModificationMessage(scan_id=scan_id, action=action, parameter={}),
9393
)
9494

95-
def request_scan_abortion(self, scan_id=None):
95+
def request_scan_abortion(self, scan_id=None, request_id: str | None = None):
9696
"""request a scan abortion
9797
9898
Args:
9999
scan_id (str, optional): ScanID. Defaults to None.
100+
request_id (str, optional): Request ID. Used when no scan ID exists yet.
100101
101102
"""
102-
if scan_id is None:
103+
if scan_id is None and request_id is None:
103104
scan_id = self.scan_storage.current_scan_id
104105
logger.info("Requesting scan abortion")
105106
target_queue = self.get_default_scan_queue()
106107
self.connector.send(
107108
MessageEndpoints.scan_queue_modification_request(),
108109
messages.ScanQueueModificationMessage(
109-
scan_id=scan_id, action="abort", parameter={}, queue=target_queue
110+
scan_id=scan_id,
111+
request_id=request_id,
112+
action="abort",
113+
parameter={},
114+
queue=target_queue,
110115
),
111116
)
112117

113-
def request_scan_halt(self, scan_id=None):
118+
def request_scan_halt(self, scan_id=None, request_id: str | None = None):
114119
"""request a scan halt
115120
116121
Args:
117122
scan_id (str, optional): ScanID. Defaults to None.
123+
request_id (str, optional): Request ID. Used when no scan ID exists yet.
118124
119125
"""
120-
if scan_id is None:
126+
if scan_id is None and request_id is None:
121127
scan_id = self.scan_storage.current_scan_id
122128
target_queue = self.get_default_scan_queue()
123129
logger.info("Requesting scan halt")
124130
self.connector.send(
125131
MessageEndpoints.scan_queue_modification_request(),
126132
messages.ScanQueueModificationMessage(
127-
scan_id=scan_id, action="halt", parameter={}, queue=target_queue
133+
scan_id=scan_id,
134+
request_id=request_id,
135+
action="halt",
136+
parameter={},
137+
queue=target_queue,
128138
),
129139
)
130140

131-
def request_set_completed(self, scan_id=None):
141+
def request_set_completed(self, scan_id=None, request_id: str | None = None):
132142
"""request to set a scan as completed
133143
134144
Args:
135145
scan_id (str, optional): ScanID. Defaults to None.
146+
request_id (str, optional): Request ID. Used when no scan ID exists yet.
136147
137148
"""
138-
if scan_id is None:
149+
if scan_id is None and request_id is None:
139150
scan_id = self.scan_storage.current_scan_id
140151
logger.info("Requesting to set scan as completed")
141152
target_queue = self.get_default_scan_queue()
142153
self.connector.send(
143154
MessageEndpoints.scan_queue_modification_request(),
144155
messages.ScanQueueModificationMessage(
145-
scan_id=scan_id, action="user_completed", parameter={}, queue=target_queue
156+
scan_id=scan_id,
157+
request_id=request_id,
158+
action="user_completed",
159+
parameter={},
160+
queue=target_queue,
146161
),
147162
)
148163

149-
def request_scan_continuation(self, scan_id=None):
164+
def request_scan_continuation(self, scan_id=None, request_id: str | None = None):
150165
"""request a scan continuation
151166
152167
Args:
153168
scan_id (str, optional): ScanID. Defaults to None.
169+
request_id (str, optional): Request ID. Used when no scan ID exists yet.
154170
155171
"""
156-
if scan_id is None:
172+
if scan_id is None and request_id is None:
157173
scan_id = self.scan_storage.current_scan_id
158174
logger.info("Requesting scan continuation")
159175
target_queue = self.get_default_scan_queue()
160176
self.connector.send(
161177
MessageEndpoints.scan_queue_modification_request(),
162178
messages.ScanQueueModificationMessage(
163-
scan_id=scan_id, action="continue", parameter={}, queue=target_queue
179+
scan_id=scan_id,
180+
request_id=request_id,
181+
action="continue",
182+
parameter={},
183+
queue=target_queue,
164184
),
165185
)
166186

bec_lib/tests/test_scan_manager.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ def test_scan_manager_request_scan_abortion(scan_manager):
8383
scan_manager.request_scan_abortion("scan_id")
8484
scan_manager.connector.send.assert_called_once_with(
8585
MessageEndpoints.scan_queue_modification_request(),
86-
messages.ScanQueueModificationMessage(scan_id="scan_id", action="abort", parameter={}),
86+
messages.ScanQueueModificationMessage(
87+
scan_id="scan_id", request_id=None, action="abort", parameter={}
88+
),
8789
)
8890

8991

@@ -101,15 +103,29 @@ def current_scan_id(self):
101103
scan_manager.request_scan_abortion()
102104
scan_manager.connector.send.assert_called_once_with(
103105
MessageEndpoints.scan_queue_modification_request(),
104-
messages.ScanQueueModificationMessage(scan_id=scan_id, action="abort", parameter={}),
106+
messages.ScanQueueModificationMessage(
107+
scan_id=scan_id, request_id=None, action="abort", parameter={}
108+
),
109+
)
110+
111+
112+
def test_scan_manager_request_scan_abortion_request_id(scan_manager):
113+
scan_manager.request_scan_abortion(request_id="request-id")
114+
scan_manager.connector.send.assert_called_once_with(
115+
MessageEndpoints.scan_queue_modification_request(),
116+
messages.ScanQueueModificationMessage(
117+
scan_id=None, request_id="request-id", action="abort", parameter={}
118+
),
105119
)
106120

107121

108122
def test_scan_manager_request_scan_halt(scan_manager):
109123
scan_manager.request_scan_halt("scan_id")
110124
scan_manager.connector.send.assert_called_once_with(
111125
MessageEndpoints.scan_queue_modification_request(),
112-
messages.ScanQueueModificationMessage(scan_id="scan_id", action="halt", parameter={}),
126+
messages.ScanQueueModificationMessage(
127+
scan_id="scan_id", request_id=None, action="halt", parameter={}
128+
),
113129
)
114130

115131

@@ -127,15 +143,19 @@ def current_scan_id(self):
127143
scan_manager.request_scan_halt()
128144
scan_manager.connector.send.assert_called_once_with(
129145
MessageEndpoints.scan_queue_modification_request(),
130-
messages.ScanQueueModificationMessage(scan_id=scan_id, action="halt", parameter={}),
146+
messages.ScanQueueModificationMessage(
147+
scan_id=scan_id, request_id=None, action="halt", parameter={}
148+
),
131149
)
132150

133151

134152
def test_scan_manager_request_scan_continuation(scan_manager):
135153
scan_manager.request_scan_continuation("scan_id")
136154
scan_manager.connector.send.assert_called_once_with(
137155
MessageEndpoints.scan_queue_modification_request(),
138-
messages.ScanQueueModificationMessage(scan_id="scan_id", action="continue", parameter={}),
156+
messages.ScanQueueModificationMessage(
157+
scan_id="scan_id", request_id=None, action="continue", parameter={}
158+
),
139159
)
140160

141161

@@ -153,7 +173,9 @@ def current_scan_id(self):
153173
scan_manager.request_scan_continuation()
154174
scan_manager.connector.send.assert_called_once_with(
155175
MessageEndpoints.scan_queue_modification_request(),
156-
messages.ScanQueueModificationMessage(scan_id=scan_id, action="continue", parameter={}),
176+
messages.ScanQueueModificationMessage(
177+
scan_id=scan_id, request_id=None, action="continue", parameter={}
178+
),
157179
)
158180

159181

@@ -258,7 +280,7 @@ def test_scan_manager_request_set_completed(scan_manager):
258280
scan_manager.connector.send.assert_called_once_with(
259281
MessageEndpoints.scan_queue_modification_request(),
260282
messages.ScanQueueModificationMessage(
261-
scan_id="scan_id", action="user_completed", parameter={}
283+
scan_id="scan_id", request_id=None, action="user_completed", parameter={}
262284
),
263285
)
264286

@@ -277,7 +299,7 @@ def current_scan_id(self):
277299
scan_manager.connector.send.assert_called_once_with(
278300
MessageEndpoints.scan_queue_modification_request(),
279301
messages.ScanQueueModificationMessage(
280-
scan_id="current_scan_id", action="user_completed", parameter={}
302+
scan_id="current_scan_id", request_id=None, action="user_completed", parameter={}
281303
),
282304
)
283305

@@ -291,6 +313,7 @@ def test_scan_manager_add_queue_lock(scan_manager):
291313
MessageEndpoints.scan_queue_modification_request(),
292314
messages.ScanQueueModificationMessage(
293315
scan_id=None,
316+
request_id=None,
294317
action="lock",
295318
parameter={"reason": "Testing lock", "identifier": "test_lock"},
296319
queue="primary",
@@ -307,6 +330,7 @@ def test_scan_manager_remove_queue_lock(scan_manager):
307330
MessageEndpoints.scan_queue_modification_request(),
308331
messages.ScanQueueModificationMessage(
309332
scan_id=None,
333+
request_id=None,
310334
action="release_lock",
311335
parameter={"identifier": "test_lock"},
312336
queue="primary",

0 commit comments

Comments
 (0)