Fix the flaky immediate shutdown.#1666
Conversation
The test was trying to ensure that when shutdown is called, it happens "immediately", and not wait until the timeout. The problem in our current code is that we don't know whether another thread is in "spin" during the shutdown. If it is, then there is some possibility that we deleted the shutdown guard condition *before* the spin built the waitset, in which case we would spin until the timeout, and thus fail the test. The fix is to mark when a thread is in spin, and then have shutdown wait for the spin to complete before destroying resources. If spin is waiting, then it will be woken by the guard condition immediately, and the shutdown will proceed quickly. If the spin is stuck in a long-running user callback, then we still need a timeout because we don't want shutdown to take forever. While I was in here, I also enabled this test for the MultiThreadedExecutor since it should succeed for that now as well. Signed-off-by: Chris Lalancette <clalancette@gmail.com>
fujitatomoya
left a comment
There was a problem hiding this comment.
this PR breaks the contract in two distinct ways?
- if a spinner is processing in a long-running callback, shutdown() returns after 5 seconds. the return value is True, so the caller has no way to know shutdown didn't actually finish?
- if the caller passes None, they've explicitly said "I'm fine waiting forever." the library shouldn't second-guess that? if timeout_sec does not work in this way, the API implementation is broken?
i may be wrong, but it seems that shutdown could deadlock for 5 seconds when called from inside a callback? callbacks run on ThreadPoolExecutor workers (MTExecutor case), not on _spinning_thread. if a user callback in a worker thread calls executor.shutdown(), it will wait for 5 seconds or so, cz spinning thread won't return until the guard condition fires.
Previous to this commit, the shutdown had code like this:
with self._shutdown_lock:
if not self._is_shutdown:
self._is_shutdown = True
...
if not self._is_shutdown:
if not self._work_tracker.wait(timeout_sec):
return False
But that doesn't make much sense; that second "if not self._is_shutdown"
is always False, since we set self._is_shutdown to True right
above.
The easy fix here is to save off whether we were the ones
doing the shutdown, and then only do the wait if that is
true. But that then deadlocks if this shutdown is being
called from within a callback. Since that is behavior we
want to preserve, we need more fixes.
The fixes in this PR track when a thread enters and exits
doing work. We then modify "wait" to not wait for the
thread that is ourselves if we are in a callback.
We go a bit further and remove the second "if not self._is_shutdown"
check as well, and unconditionally wait on work to be
completed. This is to support the case where the user
calls shutdown with a timeout, and the timeout expires
before the work finishes. In that case, the user could
call shutdown again and have it succeed.
The result is that all of this should now work together,
not deadlock, and actually do the waiting.
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
Previous to this change, internally in shutdown() timeout_sec was set to 5 seconds in all cases except where the user passed in a positive number. Even then it wasn't completely honored, and since there is more than one wait in there, it could take multiple times the timeout_sec. This change fixes all of that by honoring the documentation: None - wait forever negative - wait forever 0 - don't wait at all positive - wait for this amount of time That last one also only cumulatively waits as well. Signed-off-by: Chris Lalancette <clalancette@gmail.com>
When two callbacks running on different MultiThreadedExecutor worker threads both call executor.shutdown() concurrently, both calls land in _work_tracker.wait(). The previous predicate only excluded the calling thread's own in-flight work, so each call counted the other caller's still-running callback as work it had to wait for. Since both callbacks are blocked in shutdown(), neither can finish, and both waits block until the timeout expires. This commit teaches _WorkTracker to track which threads are currently parked in wait() and exclude any in-flight work owned by those threads from every waiter's drain check. Once a thread is inside wait() it cannot be making progress on its callback, so counting that work as pending only ever causes spurious blocking. While in there, simplify the bookkeeping: _num_work_executing was a cached sum of _executing_thread_counts.values(), but the drain check doesn't need a sum -- only whether each executing thread is also a waiter. The predicate is now a single subset check, and the redundant counter is gone. Adds test_concurrent_shutdown_from_two_callbacks as a regression test. Pre-fix, both shutdowns time out and return False. Signed-off-by: Chris Lalancette <clalancette@gmail.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Calling executor.shutdown() from inside a callback running on one
of MultiThreadedExecutor's own worker threads previously raised
RuntimeError("cannot join current thread").
The wrapper called self._executor.shutdown(wait=wait_for_threads),
which (when wait_for_threads is True, the default) makes
concurrent.futures.ThreadPoolExecutor join every worker via
t.join(). If the caller is itself one of those workers, that loop
hits t.join() on the current thread and Python refuses.
Do the join loop ourselves instead. Mark the pool shut down with
wait=False, then iterate self._executor._threads and skip the
current thread. The current worker can't be waited on from itself
anyway -- the rest of the callback will finish after shutdown()
returns and the worker will exit naturally.
Adds test_shutdown_from_multithreaded_executor_callback as a
regression test. Pre-fix, shutdown() raises out of the callback.
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ends. The previous _WorkTracker fix added the calling thread to a _waiting_threads set on entry to wait() and removed it on exit. That ensured two concurrent waiters wouldn't deadlock on each other's still-running callback, but it created a different race: the second waiter's predicate becomes True the instant it adds itself, returns immediately, and its finally block removes it again -- which flips the first waiter's predicate back to False before the first waiter has had a chance to re-evaluate after the notify. The first waiter then ends up waiting until the second waiter's __exit__ runs (i.e., until the second waiter's callback ends, after the rest of its shutdown completes), serializing the two shutdowns instead of letting them proceed in parallel. Fix it by holding the waiter membership stable for as long as the thread's callback is still in flight: wait() does not remove the entry in its finally block when the thread is still inside the work_tracker; __exit__ removes the entry when the callback ends. External callers (no in-flight callback, never call __enter__) still get removed in wait()'s finally, since no __exit__ will run for them. A regression in test_concurrent_shutdown_from_two_callbacks exposed the serialization: the test gated on spin_thread.join(), but the spinner exits as soon as _is_shutdown is set, which can happen before either callback has finished its shutdown call -- so the test read results too early and saw only the faster callback's entry. Switch the test to wait on an explicit event signaled once both callbacks have appended their result. Signed-off-by: Chris Lalancette <clalancette@gmail.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thank you for the review!
Yeah, you are totally right. I had a separate change that implements this, but it should be included here. I've pushed it now, and this should fix the contract.
Yes, you are correct. Again, I had a separate change for this, but it should have all been included together. The latest push checks to see if the wait_for failed, and if so, it returns False so the caller can try again.
I don't think this deadlock is possible. If a callback calls shutdown(), then there are 2 possible waits:
That said, I did find another deadlock while looking at this, in that two callbacks that call shutdown simultaneously would deadlock with each other. The latest commit fixes that by tracking which threads are waiting, and if another thread is waiting, it wakes it up to check its conditions. (whew, this escalated kind of quickly, and this PR grew a lot. But I think we are getting towards the right shape of things here, please take another look when you have time) |
Signed-off-by: Chris Lalancette <clalancette@gmail.com>
Description
The test was trying to ensure that when shutdown is called, it happens "immediately", and not wait until the timeout. The problem in our current code is that we don't know whether another thread is in "spin"
during the shutdown. If it is, then there is some possibility that we deleted the shutdown guard condition before the spin built the waitset, in which case we would spin until the timeout, and thus fail the test.
The fix is to mark when a thread is in spin, and then have shutdown wait for the spin to complete before destroying resources. If spin is waiting, then it will be woken by the guard condition immediately, and the shutdown will proceed quickly. If the spin is stuck in a long-running user callback, then we still need a timeout because we don't want shutdown to take forever.
While I was in here, I also enabled this test for the MultiThreadedExecutor since it should succeed for that now as well.
Is this user-facing behavior change?
No.
Did you use Generative AI?
Yes, Claude Opus 4.7
Additional Information
N/A