From 34457487b4904449672e4065ef932e7a7c2fc735 Mon Sep 17 00:00:00 2001 From: Tobias Date: Tue, 12 May 2026 18:48:23 +0200 Subject: [PATCH 01/10] thread safe MultiprocessingLoggingHandler --- .../_utils/multiprocessing_logging_handler.py | 78 ++++++++++++------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py b/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py index a40ee29ec..b5978d07e 100644 --- a/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py +++ b/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py @@ -15,6 +15,10 @@ # Inspired by https://stackoverflow.com/a/894284 +# Since the root logger object is global, accessing it from multiple threads can be problematic +# To be thread safe, we use a lock. +ROOT_LOGGER_LOCK = threading.Lock() + class _MultiprocessingLoggingHandler(logging.Handler): """This class wraps a logging handler and instantiates a multiprocessing queue. @@ -87,14 +91,18 @@ def decrement_usage(self) -> None: self._usage_counter -= 1 if self._usage_counter == 0: # unwrap inner handler: - root_logger = getLogger() - root_logger.removeHandler(self) - root_logger.addHandler(self.wrapped_handler) + with ROOT_LOGGER_LOCK: + root_logger = getLogger() + root_logger.removeHandler(self) + root_logger.addHandler(self.wrapped_handler) self._is_closed = True self._queue_thread.join(30) self._manager.shutdown() - self.wrapped_handler.close() + # Thread-owned handlers (e.g. task-specific file handlers) are closed by their + # owner (attach_logging_handler), not by the pool. + if not getattr(self.wrapped_handler, "_owner_thread_id", None): + self.wrapped_handler.close() super().close() def close(self) -> None: @@ -115,35 +123,51 @@ def _setup_logging_multiprocessing( """ warnings.filters = filters - root_logger = getLogger() - for handler in root_logger.handlers: - root_logger.removeHandler(handler) + with ROOT_LOGGER_LOCK: + root_logger = getLogger() + for handler in root_logger.handlers: + root_logger.removeHandler(handler) - root_logger.setLevel(min(levels) if len(levels) else logging.DEBUG) - for queue, level in zip(queues, levels): - handler = QueueHandler(queue) - handler.setLevel(level) - root_logger.addHandler(handler) + root_logger.setLevel(min(levels) if len(levels) else logging.DEBUG) + for queue, level in zip(queues, levels): + handler = QueueHandler(queue) + handler.setLevel(level) + root_logger.addHandler(handler) class _MultiprocessingLoggingHandlerPool: def __init__(self) -> None: - root_logger = getLogger() - - self.handlers = [] - for i, handler in enumerate(list(root_logger.handlers)): - # Wrap logging handlers in _MultiprocessingLoggingHandlers to make them work in a multiprocessing setup - # when using start_methods other than fork, for example, spawn or forkserver - if not isinstance(handler, _MultiprocessingLoggingHandler): - mp_handler = _MultiprocessingLoggingHandler( - f"multi-processing-handler-{i}", handler + with ROOT_LOGGER_LOCK: + root_logger = getLogger() + current_thread_id = threading.get_ident() + + self.handlers = [] + for i, handler in enumerate(list(root_logger.handlers)): + # Resolve the underlying handler to check for thread ownership. + # An already-wrapped handler exposes its original via .wrapped_handler. + underlying = ( + handler.wrapped_handler + if isinstance(handler, _MultiprocessingLoggingHandler) + else handler ) - root_logger.removeHandler(handler) - root_logger.addHandler(mp_handler) - self.handlers.append(mp_handler) - else: - handler.increment_usage() - self.handlers.append(handler) + owner_thread = getattr(underlying, "_owner_thread_id", None) + # Skip handlers owned by a different thread: wrapping them would transfer + # lifecycle ownership, causing premature closure of another task's file handler. + if owner_thread is not None and owner_thread != current_thread_id: + continue + + # Wrap logging handlers in _MultiprocessingLoggingHandlers to make them work in a multiprocessing setup + # when using start_methods other than fork, for example, spawn or forkserver + if not isinstance(handler, _MultiprocessingLoggingHandler): + mp_handler = _MultiprocessingLoggingHandler( + f"multi-processing-handler-{i}", handler + ) + root_logger.removeHandler(handler) + root_logger.addHandler(mp_handler) + self.handlers.append(mp_handler) + else: + handler.increment_usage() + self.handlers.append(handler) def get_multiprocessing_logging_setup_fn(self) -> Callable[[], None]: # Return a logging setup function that when called will setup QueueHandler loggers From d41223c0a8e7473e8afc396da9f79a46bf3efd67 Mon Sep 17 00:00:00 2001 From: Tobias Date: Tue, 12 May 2026 20:22:28 +0200 Subject: [PATCH 02/10] working thread signal handling --- .../schedulers/cluster_executor.py | 47 ++++++++++++++++--- 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index 75c498656..fe57ba55e 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -1,3 +1,4 @@ +import atexit import logging import os import signal @@ -69,6 +70,7 @@ class ClusterExecutor(futures.Executor): _shutdown_hooks: list[Callable[[], None]] = [] _installed_signal_handler: bool = False + _installed_atexit_handler: bool = False def __init__( self, @@ -142,9 +144,24 @@ def executor_key(cls) -> str: @classmethod def _ensure_signal_handlers_are_installed(cls) -> None: - # Only overwrite the signal handler once + if not cls._installed_atexit_handler: + atexit.register(cls._run_shutdown_hooks) + cls._installed_atexit_handler = True + + # signal.signal() only works from the main thread. If we're on a worker + # thread, skip but don't mark as installed — a later main-thread + # instantiation will still be able to install the handlers. if cls._installed_signal_handler: return + if threading.current_thread() is not threading.main_thread(): + logging.warning( + f"[{cls.__name__}] Cannot install signal handlers because the executor " + "was instantiated from a non-main thread. Cleanup on SIGTERM will not " + "work; SIGINT and normal exits are covered via atexit. " + f"Call {cls.__name__}.install_signal_handlers() from the main thread " + "at startup to enable full signal handling." + ) + return # Clean up if a SIGINT or SIGTERM signal is received. However, do not # interfere with the existing signal handler of the process and execute @@ -162,6 +179,20 @@ def _ensure_signal_handlers_are_installed(cls) -> None: cls._installed_signal_handler = True + @classmethod + def install_signal_handlers(cls) -> None: + """Install SIGINT/SIGTERM handlers for cluster job cleanup. + + Must be called from the main thread. Call this once at program startup + before using ClusterExecutor from worker threads to ensure that + SIGINT and SIGTERM trigger proper job cancellation and cleanup. + """ + if threading.current_thread() is not threading.main_thread(): + raise RuntimeError( + f"{cls.__name__}.install_signal_handlers() must be called from the main thread." + ) + cls._ensure_signal_handlers_are_installed() + @classmethod def _register_shutdown_hook(cls, hook: Callable[[], None]) -> None: cls._shutdown_hooks.append(hook) @@ -176,6 +207,14 @@ def _deregister_shutdown_hook(cls, hook: Callable[[], None]) -> None: "Cannot deregister executors shutdown hook since it's not registered." ) + @classmethod + def _run_shutdown_hooks(cls) -> None: + for hook in cls._shutdown_hooks: + try: + hook() + except Exception as e: + print(f"Error during shutdown: {e}") + @classmethod def _handle_shutdown( cls, @@ -186,11 +225,7 @@ def _handle_shutdown( logging.critical( f"[{cls.__name__}] Caught signal {signal.Signals(signum).name}, running shutdown hooks" ) - try: - for hook in cls._shutdown_hooks: - hook() - except Exception as e: - print(f"Error during shutdown: {e}") + cls._run_shutdown_hooks() if ( callable(existing_signal_handler) From 81eb2faf62e3d5e54b7f87bdd3b793c6cb88c9cb Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 5 Jun 2026 13:51:05 +0200 Subject: [PATCH 03/10] improved locking in multiprocessing logging handler --- .../_utils/multiprocessing_logging_handler.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py b/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py index b5978d07e..ba0693e9b 100644 --- a/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py +++ b/cluster_tools/cluster_tools/_utils/multiprocessing_logging_handler.py @@ -17,7 +17,7 @@ # Since the root logger object is global, accessing it from multiple threads can be problematic # To be thread safe, we use a lock. -ROOT_LOGGER_LOCK = threading.Lock() +LOGGER_LOCK = threading.Lock() class _MultiprocessingLoggingHandler(logging.Handler): @@ -88,21 +88,22 @@ def increment_usage(self) -> None: self._usage_counter += 1 def decrement_usage(self) -> None: - self._usage_counter -= 1 - if self._usage_counter == 0: - # unwrap inner handler: - with ROOT_LOGGER_LOCK: + with LOGGER_LOCK: + self._usage_counter -= 1 + if self._usage_counter == 0: + # unwrap inner handler: root_logger = getLogger() root_logger.removeHandler(self) root_logger.addHandler(self.wrapped_handler) + # Thread-owned handlers (e.g. task-specific file handlers) are closed by their + # owner (attach_logging_handler), not by the pool. + if not getattr(self.wrapped_handler, "_owner_thread_id", None): + self.wrapped_handler.close() + if self._usage_counter == 0: self._is_closed = True self._queue_thread.join(30) self._manager.shutdown() - # Thread-owned handlers (e.g. task-specific file handlers) are closed by their - # owner (attach_logging_handler), not by the pool. - if not getattr(self.wrapped_handler, "_owner_thread_id", None): - self.wrapped_handler.close() super().close() def close(self) -> None: @@ -123,7 +124,7 @@ def _setup_logging_multiprocessing( """ warnings.filters = filters - with ROOT_LOGGER_LOCK: + with LOGGER_LOCK: root_logger = getLogger() for handler in root_logger.handlers: root_logger.removeHandler(handler) @@ -137,7 +138,7 @@ def _setup_logging_multiprocessing( class _MultiprocessingLoggingHandlerPool: def __init__(self) -> None: - with ROOT_LOGGER_LOCK: + with LOGGER_LOCK: root_logger = getLogger() current_thread_id = threading.get_ident() @@ -151,8 +152,8 @@ def __init__(self) -> None: else handler ) owner_thread = getattr(underlying, "_owner_thread_id", None) - # Skip handlers owned by a different thread: wrapping them would transfer - # lifecycle ownership, causing premature closure of another task's file handler. + # Skip handlers owned by a different thread: relevant when starting multiprocessing from multiple threads + # where each thread should only handle the logging for its own multiprocessing executor if owner_thread is not None and owner_thread != current_thread_id: continue From 21738a510f023af96b1084bf4728e115a61ba18f Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 5 Jun 2026 15:22:55 +0200 Subject: [PATCH 04/10] ignore linter for shutdown hooks try catch in loop --- cluster_tools/cluster_tools/schedulers/cluster_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index fe57ba55e..6c3f37e18 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -212,7 +212,7 @@ def _run_shutdown_hooks(cls) -> None: for hook in cls._shutdown_hooks: try: hook() - except Exception as e: + except Exception as e: # noqa: PERF203 print(f"Error during shutdown: {e}") @classmethod From 5cef1ccb6b9b0017c6e1a3d1621ec4185f9e7d92 Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 5 Jun 2026 16:41:15 +0200 Subject: [PATCH 05/10] Add more locking to cluster executor --- .../cluster_tools/schedulers/cluster_executor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index 6c3f37e18..981246218 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -71,6 +71,7 @@ class ClusterExecutor(futures.Executor): _shutdown_hooks: list[Callable[[], None]] = [] _installed_signal_handler: bool = False _installed_atexit_handler: bool = False + _setup_lock: threading.Lock = threading.Lock() def __init__( self, @@ -144,9 +145,10 @@ def executor_key(cls) -> str: @classmethod def _ensure_signal_handlers_are_installed(cls) -> None: - if not cls._installed_atexit_handler: - atexit.register(cls._run_shutdown_hooks) - cls._installed_atexit_handler = True + with cls._setup_lock: + if not cls._installed_atexit_handler: + atexit.register(cls._run_shutdown_hooks) + cls._installed_atexit_handler = True # signal.signal() only works from the main thread. If we're on a worker # thread, skip but don't mark as installed — a later main-thread From 650023b0e036f0641cdcaec4e8d17e3b851e1353 Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 5 Jun 2026 16:55:12 +0200 Subject: [PATCH 06/10] Make sure shutdown hooks run only once --- .../cluster_tools/schedulers/cluster_executor.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index 981246218..ae6e56621 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -71,7 +71,9 @@ class ClusterExecutor(futures.Executor): _shutdown_hooks: list[Callable[[], None]] = [] _installed_signal_handler: bool = False _installed_atexit_handler: bool = False - _setup_lock: threading.Lock = threading.Lock() + _shutdown_hooks_ran: bool = False + # We keep a lock to guard critical parts against race conditions when being called from multiple threads + _state_lock: threading.Lock = threading.Lock() def __init__( self, @@ -145,7 +147,7 @@ def executor_key(cls) -> str: @classmethod def _ensure_signal_handlers_are_installed(cls) -> None: - with cls._setup_lock: + with cls._state_lock: if not cls._installed_atexit_handler: atexit.register(cls._run_shutdown_hooks) cls._installed_atexit_handler = True @@ -211,6 +213,11 @@ def _deregister_shutdown_hook(cls, hook: Callable[[], None]) -> None: @classmethod def _run_shutdown_hooks(cls) -> None: + # Make sure shutdown hooks run only once + with cls._state_lock: + if cls._shutdown_hooks_ran: + return + cls._shutdown_hooks_ran = True for hook in cls._shutdown_hooks: try: hook() From f36772b58967681d62a15c8679eb9d027620677e Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 5 Jun 2026 18:05:56 +0200 Subject: [PATCH 07/10] Fix bug iterating over shutdown hooks list while modifying it --- .../cluster_tools/schedulers/cluster_executor.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index ae6e56621..ca185abbc 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -218,11 +218,15 @@ def _run_shutdown_hooks(cls) -> None: if cls._shutdown_hooks_ran: return cls._shutdown_hooks_ran = True - for hook in cls._shutdown_hooks: + # Copying the list of hooks since hooks might unregister themselves when executed. + # Without the copy, this would cause the cls._shutdown_hooks list to be modified + # while we iterate over it which is dangerous! + hooks = list(cls._shutdown_hooks) + for hook in hooks: try: hook() except Exception as e: # noqa: PERF203 - print(f"Error during shutdown: {e}") + logging.error(f"Error during running shutdown hook {hook}: {e}") @classmethod def _handle_shutdown( @@ -232,7 +236,7 @@ def _handle_shutdown( frame: Any, ) -> None: logging.critical( - f"[{cls.__name__}] Caught signal {signal.Signals(signum).name}, running shutdown hooks" + f"[{cls.__name__}] Caught signal {signal.Signals(signum).name}, starting shutdown hooks" ) cls._run_shutdown_hooks() From e918ff3580a827d2e3be4c5e31a811a34bd50b42 Mon Sep 17 00:00:00 2001 From: Tobias Date: Fri, 5 Jun 2026 20:30:24 +0200 Subject: [PATCH 08/10] Make ClusterExecutor thread-safe for concurrent use from multiple threads - Add _state_lock to guard class-level state (_installed_atexit_handler, _shutdown_hooks_ran) against races when executors are created from multiple threads simultaneously - Register atexit handler in addition to signal handlers so shutdown hooks run on normal process exit (important when signal handlers cannot be installed from worker threads) - Add install_signal_handlers() public class method for explicit main-thread signal handler setup before spawning worker threads - Extract _run_shutdown_hooks() as a shared helper used by both signal and atexit handlers - Make _run_shutdown_hooks() a one-shot via _shutdown_hooks_ran to prevent double invocation when both signal handler and atexit fire; reset the flag in _register_shutdown_hook so new executors created after a prior signal cycle can still run their hooks - Snapshot _shutdown_hooks list before iterating to guard against hooks deregistering themselves during iteration Co-Authored-By: Claude Sonnet 4.6 --- cluster_tools/cluster_tools/schedulers/cluster_executor.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index ca185abbc..ad8f48344 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -199,7 +199,12 @@ def install_signal_handlers(cls) -> None: @classmethod def _register_shutdown_hook(cls, hook: Callable[[], None]) -> None: - cls._shutdown_hooks.append(hook) + with cls._state_lock: + cls._shutdown_hooks.append(hook) + # New active executors exist; allow hooks to run again if signaled. + # This handles the case where a previous signal already ran hooks + # (setting _shutdown_hooks_ran=True) but new executors are now active. + cls._shutdown_hooks_ran = False cls._ensure_signal_handlers_are_installed() @classmethod From e8e6ed7e661ff62c5938cc475d9731713897ef4c Mon Sep 17 00:00:00 2001 From: Tobias Date: Thu, 11 Jun 2026 15:00:34 +0200 Subject: [PATCH 09/10] improve comment about registering new shutdown hooks after shutdown edge case --- cluster_tools/cluster_tools/schedulers/cluster_executor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index ad8f48344..7b243d9e3 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -204,6 +204,10 @@ def _register_shutdown_hook(cls, hook: Callable[[], None]) -> None: # New active executors exist; allow hooks to run again if signaled. # This handles the case where a previous signal already ran hooks # (setting _shutdown_hooks_ran=True) but new executors are now active. + # IMPORTANT: This should not happen during normal execution! + # However, it might happen during tests (since multiple tests run in the same process). + # So we keep it to allow the tests to work and also to make the code more robust against weird edge cases + # where new shutdown hooks are registered after a first shutdown was already triggered. cls._shutdown_hooks_ran = False cls._ensure_signal_handlers_are_installed() From 6b530261b3d23d9b44797a6d484295e0b04124b0 Mon Sep 17 00:00:00 2001 From: Tobias Date: Thu, 11 Jun 2026 16:03:55 +0200 Subject: [PATCH 10/10] changelog --- webknossos/Changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/webknossos/Changelog.md b/webknossos/Changelog.md index ba7a5f18a..0a2f67843 100644 --- a/webknossos/Changelog.md +++ b/webknossos/Changelog.md @@ -19,6 +19,7 @@ For upgrade instructions, please check the respective _Breaking Changes_ section ### Changed ### Fixed +- Make `MultiprocessExecutor` safe to be use from multiple threads simultaneously [#1464](https://github.com/scalableminds/webknossos-libs/pull/1464) ## [3.5.0](https://github.com/scalableminds/webknossos-libs/releases/tag/v3.5.0) - 2026-06-02