Skip to content

Commit 674048d

Browse files
committed
fix(client_utils): stop output reader thread on shutdown
1 parent 593fd3c commit 674048d

3 files changed

Lines changed: 42 additions & 7 deletions

File tree

bec_widgets/cli/client_utils.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
IGNORE_WIDGETS = ["LaunchWindow"]
3737
PROCESS_TERMINATION_TIMEOUT = 10
3838
PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT = 2
39+
PROCESS_OUTPUT_SELECT_TIMEOUT = 0.2
3940
GRACEFUL_SERVER_SHUTDOWN_TIMEOUT = 5
41+
OUTPUT_READER_STOP_EVENT_ATTR = "_bec_output_reader_stop_event"
4042

4143
RegistryState: TypeAlias = dict[
4244
Literal["gui_id", "name", "widget_class", "config", "__rpc__", "container_proxy"],
@@ -57,14 +59,16 @@ def _filter_output(output: str) -> str:
5759
return output
5860

5961

60-
def _get_output(process, logger) -> None:
62+
def _get_output(process, logger, stop_event: threading.Event | None = None) -> None:
6163
log_func = {process.stdout: logger.debug, process.stderr: logger.info}
6264
stream_buffer = {process.stdout: [], process.stderr: []}
6365
try:
6466
os.set_blocking(process.stdout.fileno(), False)
6567
os.set_blocking(process.stderr.fileno(), False)
66-
while process.poll() is None:
67-
readylist, _, _ = select.select([process.stdout, process.stderr], [], [], 1)
68+
while process.poll() is None and not (stop_event and stop_event.is_set()):
69+
readylist, _, _ = select.select(
70+
[process.stdout, process.stderr], [], [], PROCESS_OUTPUT_SELECT_TIMEOUT
71+
)
6872
for stream in (process.stdout, process.stderr):
6973
buf = stream_buffer[stream]
7074
if stream in readylist:
@@ -180,14 +184,16 @@ def _join_process_output_thread(process, thread: threading.Thread | None, logger
180184
if not thread.is_alive():
181185
return
182186

187+
if stop_event := getattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, None):
188+
stop_event.set()
189+
183190
for stream in (process.stdout, process.stderr):
184191
if stream is None:
185192
continue
186193
try:
187194
stream.close()
188195
except OSError as e:
189196
logger.error(f"Failed to close stream {str(e)}")
190-
pass
191197
thread.join(timeout=PROCESS_OUTPUT_THREAD_JOIN_TIMEOUT)
192198
if thread.is_alive():
193199
logger.warning(
@@ -247,8 +253,14 @@ def _start_plot_process(
247253
if logger is None:
248254
process_output_processing_thread = None
249255
else:
256+
process_output_stop_event = threading.Event()
250257
process_output_processing_thread = threading.Thread(
251-
target=_get_output, args=(process, logger)
258+
target=_get_output, args=(process, logger, process_output_stop_event)
259+
)
260+
setattr(
261+
process_output_processing_thread,
262+
OUTPUT_READER_STOP_EVENT_ATTR,
263+
process_output_stop_event,
252264
)
253265
process_output_processing_thread.start()
254266
return process, process_output_processing_thread

tests/end-2-end/test_rpc_widgets_e2e.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def create_widget(
6969
return widget
7070

7171

72-
@pytest.mark.timeout(100)
72+
@pytest.mark.timeout(20)
7373
def test_available_widgets(qtbot, connected_client_gui_obj):
7474
"""This test checks that all widgets that are available via gui.available_widgets can be created and removed."""
7575
gui = connected_client_gui_obj

tests/unit_tests/test_client_utils.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
import pytest
77

88
from bec_widgets.cli.client import BECDockArea
9-
from bec_widgets.cli.client_utils import BECGuiClient, _start_plot_process
9+
from bec_widgets.cli.client_utils import (
10+
OUTPUT_READER_STOP_EVENT_ATTR,
11+
BECGuiClient,
12+
_join_process_output_thread,
13+
_start_plot_process,
14+
)
1015
from bec_widgets.cli.rpc.rpc_base import RPCBase, RPCResponseTimeoutError, rpc_timeout
1116

1217

@@ -346,3 +351,21 @@ def test_client_utils_kill_server_kills_process_group_after_timeout():
346351
text=True,
347352
timeout=2,
348353
)
354+
355+
356+
def test_join_process_output_thread_signals_reader_before_closing_streams():
357+
process = mock.MagicMock(pid=123, args=["bec-gui-server"])
358+
process.stdout = mock.MagicMock()
359+
process.stderr = mock.MagicMock()
360+
thread = mock.MagicMock()
361+
stop_event = mock.MagicMock()
362+
setattr(thread, OUTPUT_READER_STOP_EVENT_ATTR, stop_event)
363+
thread.is_alive.side_effect = [True, False]
364+
logger = mock.MagicMock()
365+
366+
_join_process_output_thread(process, thread, logger)
367+
368+
assert thread.join.call_args_list == [mock.call(timeout=2), mock.call(timeout=2)]
369+
stop_event.set.assert_called_once_with()
370+
process.stdout.close.assert_called_once_with()
371+
process.stderr.close.assert_called_once_with()

0 commit comments

Comments
 (0)