Skip to content

Commit dd5488f

Browse files
r1violletclaude
andcommitted
chore(internal): make PeriodicThread.awake() non-blocking on stopped threads
Two failure modes are fixed: 1. Hang on awake() after a completed stop()/join(). The previous handshake used a one-shot `_served` Event: the worker set it once on exit and then went away. If awake() ran afterwards it cleared `_served`, signalled the request, and waited for an `_served->set()` that would never come. Because the wait ran with the GIL released, not even a Python-level signal handler could interrupt it; only SIGKILL recovered the process. 2. Deadlock between an in-flight awake() and a periodic callback that calls stop() on its own thread (the Timer._periodic pattern in ddtrace/internal/periodic.py). An earlier fix attempt had stop() acquire `_awake_mutex`, but awake() held that same mutex while waiting on `_served`: the worker-thread stop() then blocked on the mutex before the callback could return and the worker could signal completion — both threads waited forever. Replace the `_served` Event handshake with a `std::condition_variable` (`_awake_cond`) plus `_awake_waiting` / `_awake_served` flags. awake() now waits via `cv::wait`, which releases `_awake_mutex` while blocked, so stop() can synchronize with request publication without deadlocking a worker-thread stop() call. awake() also checks `_stopping && !_skip_shutdown` under the mutex and raises RuntimeError in the permanently-stopped case. `_skip_shutdown` keeps the legitimate `_before_fork()/awake()/_after_fork()` flow working: the restarted worker consumes the queued AWAKE request. The worker only notifies the awake waiter on paths where the request can actually be considered served (AWAKE consumed, or permanent stop/error/finalization exit). A fork-stop deliberately does not notify: the pending AWAKE survives for the worker restarted by `_after_fork()` to serve. Internal API; no release note. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 56e77c7 commit dd5488f

2 files changed

Lines changed: 159 additions & 20 deletions

File tree

ddtrace/internal/_threads.cpp

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,15 @@ typedef struct periodic_thread
388388
// stopped_event->set() completes.
389389
std::shared_ptr<Event> _stopped;
390390
std::unique_ptr<Event> _request;
391-
std::unique_ptr<Event> _served;
392391

393392
std::unique_ptr<std::mutex> _awake_mutex;
393+
std::unique_ptr<std::condition_variable> _awake_cond;
394+
// AIDEV-NOTE: awake() serializes on _awake_mutex but waits via
395+
// condition_variable, not Event. condition_variable::wait releases the
396+
// mutex while blocked, so stop() and _before_fork() can synchronize with
397+
// awake() setup without deadlocking a worker callback that calls stop().
398+
bool _awake_waiting;
399+
bool _awake_served;
394400

395401
std::unique_ptr<std::thread> _thread;
396402
} PeriodicThread;
@@ -461,9 +467,11 @@ PeriodicThread_init(PeriodicThread* self, PyObject* args, PyObject* kwargs)
461467
self->_started = std::make_unique<Event>();
462468
self->_stopped = std::make_shared<Event>();
463469
self->_request = std::make_unique<Event>();
464-
self->_served = std::make_unique<Event>();
465470

466471
self->_awake_mutex = std::make_unique<std::mutex>();
472+
self->_awake_cond = std::make_unique<std::condition_variable>();
473+
self->_awake_waiting = false;
474+
self->_awake_served = false;
467475

468476
return 0;
469477
}
@@ -496,6 +504,19 @@ PeriodicThread__on_shutdown(PeriodicThread* self)
496504
Py_XDECREF(result);
497505
}
498506

507+
// ----------------------------------------------------------------------------
508+
static inline void
509+
PeriodicThread__notify_awake_waiter(PeriodicThread* self)
510+
{
511+
std::lock_guard<std::mutex> lock(*self->_awake_mutex);
512+
513+
if (!self->_awake_waiting)
514+
return;
515+
516+
self->_awake_served = true;
517+
self->_awake_cond->notify_all();
518+
}
519+
499520
// ----------------------------------------------------------------------------
500521
// Internal helper: launches the thread after ensuring preconditions.
501522
// If reset_next_call_time is true (normal start), _next_call_time is initialised
@@ -577,10 +598,12 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false
577598
self->_started->set();
578599

579600
bool error = false;
601+
bool stopped_by_fork = false;
580602
if (self->_no_wait_at_start)
581603
self->_request->set(REQUEST_REASON_AWAKE);
582604

583605
while (!self->_stopping) {
606+
bool served_awake = false;
584607
{
585608
AllowThreads _(state);
586609

@@ -592,15 +615,16 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false
592615
// 2. regular stop(): consume all pending reasons.
593616
const unsigned char stop_reasons =
594617
self->_request->consume(REQUEST_REASON_FORK_STOP | REQUEST_REASON_STOP);
595-
const bool has_fork_stop = (stop_reasons & REQUEST_REASON_FORK_STOP) != 0;
596-
if (!has_fork_stop)
618+
stopped_by_fork = (stop_reasons & REQUEST_REASON_FORK_STOP) != 0;
619+
if (!stopped_by_fork)
597620
self->_request->consume_all();
598621
break;
599622
}
600623

601624
// Request wakeup while running (awake/no_wait_at_start).
602625
// Timer wakeups are the wait(...) == false branch.
603-
self->_request->consume_all();
626+
const unsigned char request_reasons = self->_request->consume_all();
627+
served_awake = (request_reasons & REQUEST_REASON_AWAKE) != 0;
604628
}
605629
}
606630

@@ -616,13 +640,15 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false
616640
self->_next_call_time =
617641
std::chrono::steady_clock::now() + std::chrono::milliseconds((long long)(self->interval * 1000));
618642

619-
// If this came from a request mark it as served
620-
self->_served->set();
643+
if (served_awake)
644+
PeriodicThread__notify_awake_waiter(self);
621645
}
622646

623-
// Set request served in case any threads are waiting while a thread is
624-
// stopping.
625-
self->_served->set();
647+
// Permanent stop/error/finalization completes any in-flight
648+
// awake(). A fork-stop does not: that request must survive for
649+
// the worker restarted by _after_fork().
650+
if (!stopped_by_fork)
651+
PeriodicThread__notify_awake_waiter(self);
626652

627653
if (!state->is_finalizing()) {
628654
// Run the shutdown callback if there was no error and we are not
@@ -698,14 +724,36 @@ PeriodicThread_awake(PeriodicThread* self, PyObject* Py_UNUSED(args))
698724
return NULL;
699725
}
700726

727+
bool was_stopped = false;
701728
{
702729
AllowThreads _(self->_state);
703-
std::lock_guard<std::mutex> lock(*self->_awake_mutex);
730+
std::unique_lock<std::mutex> lock(*self->_awake_mutex);
731+
732+
while (self->_awake_waiting)
733+
self->_awake_cond->wait(lock);
734+
735+
// If stop() has been observed and we are not in the fork-paused
736+
// window, the worker has either already exited or will exit without
737+
// being restarted. _skip_shutdown is set only by _before_fork() and
738+
// cleared by _after_fork(), so (_stopping && !_skip_shutdown)
739+
// captures the permanently-stopped case while preserving the
740+
// legitimate _before_fork()/awake()/_after_fork() flow.
741+
if (self->_stopping && !self->_skip_shutdown) {
742+
was_stopped = true;
743+
} else {
744+
self->_awake_waiting = true;
745+
self->_awake_served = false;
746+
self->_request->set(REQUEST_REASON_AWAKE);
704747

705-
self->_served->clear();
706-
self->_request->set(REQUEST_REASON_AWAKE);
748+
self->_awake_cond->wait(lock, [self]() { return self->_awake_served; });
749+
self->_awake_waiting = false;
750+
self->_awake_cond->notify_all();
751+
}
752+
}
707753

708-
self->_served->wait();
754+
if (was_stopped) {
755+
PyErr_SetString(PyExc_RuntimeError, "Periodic thread is stopped");
756+
return NULL;
709757
}
710758

711759
Py_RETURN_NONE;
@@ -720,8 +768,18 @@ PeriodicThread_stop(PeriodicThread* self, PyObject* Py_UNUSED(args))
720768
return NULL;
721769
}
722770

723-
self->_stopping = true;
724-
self->_request->set(REQUEST_REASON_STOP);
771+
// Synchronize with awake() setup via _awake_mutex. Unlike the older
772+
// Event-based handshake, awake() waits on _awake_cond, whose wait()
773+
// releases this mutex while blocked. That preserves ordering with the
774+
// awake() request publication without deadlocking a worker callback that
775+
// calls stop() on itself.
776+
{
777+
AllowThreads _(self->_state);
778+
std::lock_guard<std::mutex> lock(*self->_awake_mutex);
779+
780+
self->_stopping = true;
781+
self->_request->set(REQUEST_REASON_STOP);
782+
}
725783

726784
Py_RETURN_NONE;
727785
}
@@ -827,7 +885,6 @@ PeriodicThread__after_fork(PeriodicThread* self, PyObject* args, PyObject* kwarg
827885

828886
self->_started->clear();
829887
self->_stopped->clear();
830-
self->_served->clear();
831888

832889
// Use _PeriodicThread_do_start instead of PeriodicThread_start to
833890
// preserve _next_call_time from before the fork. This ensures that
@@ -934,9 +991,9 @@ PeriodicThread_dealloc(PeriodicThread* self)
934991
self->_started = nullptr;
935992
self->_stopped = nullptr;
936993
self->_request = nullptr;
937-
self->_served = nullptr;
938994

939995
self->_awake_mutex = nullptr;
996+
self->_awake_cond = nullptr;
940997

941998
Py_TYPE(self)->tp_free((PyObject*)self);
942999
}

tests/internal/test_periodic.py

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,80 @@ def _run_periodic():
4848
t.join()
4949

5050

51+
def test_periodic_awake_after_stop_raises_not_hangs():
52+
"""Regression: awake() after a completed stop() used to block forever.
53+
54+
Once a worker has fully stopped, there is nothing left that can serve a
55+
new awake request. The native awake() path must therefore reject the call
56+
instead of waiting forever for a completion signal that will never come.
57+
"""
58+
t = periodic.PeriodicThread(60.0, lambda: None)
59+
t.start()
60+
t.stop()
61+
t.join() # fully drained
62+
63+
with pytest.raises(RuntimeError):
64+
t.awake()
65+
66+
67+
def test_periodic_awake_does_not_deadlock_with_stop_from_callback():
68+
"""Regression: stop() called from inside a periodic callback must not
69+
deadlock against a concurrent awake() holding the awake mutex.
70+
71+
Timer._periodic uses the pattern:
72+
73+
def _periodic(self):
74+
self.timeout()
75+
self.stop()
76+
77+
A naive fix that has stop() acquire _awake_mutex creates this deadlock:
78+
79+
1. Thread A calls t.awake() — takes _awake_mutex, publishes AWAKE,
80+
then waits for completion.
81+
2. The worker consumes AWAKE and runs the callback.
82+
3. The callback calls t.stop() on the worker thread.
83+
4. stop() blocks on _awake_mutex if awake() keeps it locked while
84+
waiting.
85+
5. The worker cannot finish the callback and cannot reach
86+
the wake-completion path — both threads wait forever.
87+
88+
After the fix, awake() waits on a condition variable that releases
89+
_awake_mutex while blocked, so stop() can synchronize with the awake
90+
request publication without deadlocking the callback thread.
91+
"""
92+
from threading import Thread
93+
94+
def _target():
95+
# Stop ourselves from the worker thread — this is exactly the
96+
# Timer._periodic pattern.
97+
t.stop()
98+
99+
t = periodic.PeriodicThread(60.0, _target)
100+
t.start()
101+
102+
awake_done = Event()
103+
104+
def _do_awake():
105+
try:
106+
t.awake()
107+
finally:
108+
awake_done.set()
109+
110+
awaker = Thread(target=_do_awake)
111+
awaker.start()
112+
113+
# The awake() call should return well within a second. Before the fix this
114+
# deadlocked indefinitely because the worker-thread stop() tried to take
115+
# _awake_mutex while awake() was blocked waiting for completion.
116+
assert awake_done.wait(timeout=5.0), (
117+
"awake() did not return within 5s — stop()-from-callback deadlocked "
118+
"against a concurrent awake() holding the awake mutex"
119+
)
120+
121+
awaker.join(timeout=1.0)
122+
t.join(timeout=1.0)
123+
124+
51125
def test_periodic_error():
52126
x = {"OK": False}
53127

@@ -351,7 +425,11 @@ def _get_native_thread_name():
351425
# Use pthread_getname_np
352426
pthread = ctypes.CDLL("/usr/lib/libpthread.dylib")
353427
pthread_getname_np = pthread.pthread_getname_np
354-
pthread_getname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t]
428+
pthread_getname_np.argtypes = [
429+
ctypes.c_void_p,
430+
ctypes.c_char_p,
431+
ctypes.c_size_t,
432+
]
355433
pthread_getname_np.restype = ctypes.c_int
356434

357435
# Get current thread handle
@@ -471,7 +549,11 @@ def _capture_native_name():
471549
# On macOS (63 char limit), should keep the full class name "StackCollectorThread"
472550
thread_started.clear()
473551
native_name[0] = None
474-
t2 = periodic.PeriodicThread(0.1, _capture_native_name, name="ddtrace.profiling.collector:StackCollectorThread")
552+
t2 = periodic.PeriodicThread(
553+
0.1,
554+
_capture_native_name,
555+
name="ddtrace.profiling.collector:StackCollectorThread",
556+
)
475557
t2.start()
476558
thread_started.wait()
477559
t2.stop()

0 commit comments

Comments
 (0)