Skip to content

Commit cec17c4

Browse files
authored
Avoid executor hangs on process exit (#205)
Signed-off-by: Michel Hidalgo <mhidalgo@theaiinstitute.com>
1 parent 72cd86f commit cec17c4

2 files changed

Lines changed: 37 additions & 8 deletions

File tree

synchros2/synchros2/executors.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
import logging
1010
import os
1111
import queue
12+
import sys
13+
import textwrap
1214
import threading
15+
import traceback
1316
import typing
1417
import warnings
1518
import weakref
@@ -672,6 +675,18 @@ def thread_pools(self) -> typing.List[AutoScalingThreadPool]:
672675
thread_pools.extend(self._static_thread_pools)
673676
return thread_pools
674677

678+
def _get_thread_stacktraces(self) -> typing.Dict[int, typing.Optional[traceback.StackSummary]]:
679+
"""Get current stacktraces for all threads in the executor."""
680+
frames = sys._current_frames()
681+
thread_stacktraces: typing.Dict[int, typing.Optional[traceback.StackSummary]] = {}
682+
for thread_pool in self.thread_pools:
683+
for thread in thread_pool.workers:
684+
if thread.ident in frames:
685+
stacktrace = traceback.extract_stack(frames[thread.ident])
686+
if stacktrace is not None:
687+
thread_stacktraces[thread.ident] = stacktrace
688+
return thread_stacktraces
689+
675690
def add_static_thread_pool(self, num_threads: typing.Optional[int] = None) -> AutoScalingThreadPool:
676691
"""Add a thread pool that keeps a steady number of workers."""
677692
with self._shutdown_lock:
@@ -841,14 +856,17 @@ def shutdown(self, timeout_sec: typing.Optional[float] = None) -> bool:
841856

842857

843858
@contextlib.contextmanager
844-
def background(executor: rclpy.executors.Executor) -> typing.Iterator[rclpy.executors.Executor]:
859+
def background(executor: rclpy.executors.Executor, daemon: bool = False) -> typing.Iterator[rclpy.executors.Executor]:
845860
"""Pushes an executor to a background thread.
846861
847862
Upon context entry, the executor starts spinning in a background thread.
848863
Upon context exit, the executor is shutdown and the background thread is joined.
849864
850865
Args:
851866
executor: executor to be managed.
867+
daemon: whether to the daemonize the executor. When daemonized, an executor
868+
that refuses to shut down will be left to be cleaned up by the interpreter
869+
(or the caller).
852870
853871
Returns:
854872
a context manager.
@@ -868,7 +886,7 @@ def spinloop() -> None:
868886
continue
869887
break
870888

871-
background_thread = threading.Thread(target=spinloop)
889+
background_thread = threading.Thread(target=spinloop, daemon=daemon)
872890
executor.spin = bind_to_thread(executor.spin, background_thread)
873891
executor.spin_once = bind_to_thread(executor.spin_once, background_thread)
874892
executor.spin_until_future_complete = bind_to_thread(executor.spin_until_future_complete, background_thread)
@@ -882,19 +900,27 @@ def spinloop() -> None:
882900
finally:
883901
if not executor.shutdown(timeout_sec=5.0):
884902
message = "Background executor is taking too long to shutdown"
903+
for thread_id, stacktrace in executor._get_thread_stacktraces().items():
904+
lines = textwrap.indent("".join(traceback.format_list(stacktrace)), " ")
905+
message += f"\nThread #{thread_id}:\n{lines}"
885906
warnings.warn(message, RuntimeWarning, stacklevel=1)
886-
executor.shutdown()
887-
background_thread.join()
907+
if not daemon:
908+
executor.shutdown()
909+
if not daemon:
910+
background_thread.join()
888911

889912

890913
@contextlib.contextmanager
891-
def foreground(executor: rclpy.executors.Executor) -> typing.Iterator[rclpy.executors.Executor]:
914+
def foreground(executor: rclpy.executors.Executor, daemon: bool = False) -> typing.Iterator[rclpy.executors.Executor]:
892915
"""Manages an executor in the current thread.
893916
894917
Upon context exit, the executor is shutdown.
895918
896919
Args:
897920
executor: executor to be managed.
921+
daemon: whether to the daemonize the executor. When daemonized, an executor
922+
that refuses to shut down will be left to be cleaned up by the interpreter
923+
(or the caller).
898924
899925
Returns:
900926
a context manager.
@@ -904,8 +930,11 @@ def foreground(executor: rclpy.executors.Executor) -> typing.Iterator[rclpy.exec
904930
finally:
905931
if not executor.shutdown(timeout_sec=5.0):
906932
message = "Executor is taking too long to shutdown"
933+
for thread_id, stacktrace in executor._get_thread_stacktraces().items():
934+
lines = textwrap.indent("".join(traceback.format_list(stacktrace)), " ")
935+
message += f"\nThread {thread_id}:\n{lines}"
907936
warnings.warn(message, RuntimeWarning, stacklevel=1)
908-
executor.shutdown()
937+
executor.shutdown()
909938

910939

911940
def assign_coroutine(

synchros2/synchros2/scope.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ def __enter__(self) -> "ROSAwareScope":
126126
logger = rclpy.logging.get_logger(self._namespace or fqn(self.__class__))
127127
executor = AutoScalingMultiThreadedExecutor(logger=logger, context=self._context)
128128
if self._autospin:
129-
self._executor = self._stack.enter_context(background(executor))
129+
self._executor = self._stack.enter_context(background(executor, daemon=self._global))
130130
else:
131-
self._executor = self._stack.enter_context(foreground(executor))
131+
self._executor = self._stack.enter_context(foreground(executor, daemon=self._global))
132132

133133
if self._prebaked:
134134
node = Node(self._name, namespace=self._namespace, context=self._context)

0 commit comments

Comments
 (0)