diff --git a/ddtrace/internal/_threads.cpp b/ddtrace/internal/_threads.cpp index 28020a78663..5ee80ec350e 100644 --- a/ddtrace/internal/_threads.cpp +++ b/ddtrace/internal/_threads.cpp @@ -388,9 +388,15 @@ typedef struct periodic_thread // stopped_event->set() completes. std::shared_ptr _stopped; std::unique_ptr _request; - std::unique_ptr _served; std::unique_ptr _awake_mutex; + std::unique_ptr _awake_cond; + // AIDEV-NOTE: awake() serializes on _awake_mutex but waits via + // condition_variable, not Event. condition_variable::wait releases the + // mutex while blocked, so stop() and _before_fork() can synchronize with + // awake() setup without deadlocking a worker callback that calls stop(). + bool _awake_waiting; + bool _awake_served; std::unique_ptr _thread; } PeriodicThread; @@ -461,9 +467,11 @@ PeriodicThread_init(PeriodicThread* self, PyObject* args, PyObject* kwargs) self->_started = std::make_unique(); self->_stopped = std::make_shared(); self->_request = std::make_unique(); - self->_served = std::make_unique(); self->_awake_mutex = std::make_unique(); + self->_awake_cond = std::make_unique(); + self->_awake_waiting = false; + self->_awake_served = false; return 0; } @@ -496,6 +504,19 @@ PeriodicThread__on_shutdown(PeriodicThread* self) Py_XDECREF(result); } +// ---------------------------------------------------------------------------- +static inline void +PeriodicThread__notify_awake_waiter(PeriodicThread* self) +{ + std::lock_guard lock(*self->_awake_mutex); + + if (!self->_awake_waiting) + return; + + self->_awake_served = true; + self->_awake_cond->notify_all(); +} + // ---------------------------------------------------------------------------- // Internal helper: launches the thread after ensuring preconditions. // If reset_next_call_time is true (normal start), _next_call_time is initialised @@ -577,10 +598,12 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false self->_started->set(); bool error = false; + bool stopped_by_fork = false; if (self->_no_wait_at_start) self->_request->set(REQUEST_REASON_AWAKE); while (!self->_stopping) { + bool served_awake = false; { AllowThreads _(state); @@ -592,15 +615,16 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false // 2. regular stop(): consume all pending reasons. const unsigned char stop_reasons = self->_request->consume(REQUEST_REASON_FORK_STOP | REQUEST_REASON_STOP); - const bool has_fork_stop = (stop_reasons & REQUEST_REASON_FORK_STOP) != 0; - if (!has_fork_stop) + stopped_by_fork = (stop_reasons & REQUEST_REASON_FORK_STOP) != 0; + if (!stopped_by_fork) self->_request->consume_all(); break; } // Request wakeup while running (awake/no_wait_at_start). // Timer wakeups are the wait(...) == false branch. - self->_request->consume_all(); + const unsigned char request_reasons = self->_request->consume_all(); + served_awake = (request_reasons & REQUEST_REASON_AWAKE) != 0; } } @@ -616,13 +640,15 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false self->_next_call_time = std::chrono::steady_clock::now() + std::chrono::milliseconds((long long)(self->interval * 1000)); - // If this came from a request mark it as served - self->_served->set(); + if (served_awake) + PeriodicThread__notify_awake_waiter(self); } - // Set request served in case any threads are waiting while a thread is - // stopping. - self->_served->set(); + // Permanent stop/error/finalization completes any in-flight + // awake(). A fork-stop does not: that request must survive for + // the worker restarted by _after_fork(). + if (!stopped_by_fork) + PeriodicThread__notify_awake_waiter(self); if (!state->is_finalizing()) { // Run the shutdown callback if there was no error and we are not @@ -698,14 +724,36 @@ PeriodicThread_awake(PeriodicThread* self, PyObject* Py_UNUSED(args)) return NULL; } + bool was_stopped = false; { AllowThreads _(self->_state); - std::lock_guard lock(*self->_awake_mutex); + std::unique_lock lock(*self->_awake_mutex); + + while (self->_awake_waiting) + self->_awake_cond->wait(lock); + + // If stop() has been observed and we are not in the fork-paused + // window, the worker has either already exited or will exit without + // being restarted. _skip_shutdown is set only by _before_fork() and + // cleared by _after_fork(), so (_stopping && !_skip_shutdown) + // captures the permanently-stopped case while preserving the + // legitimate _before_fork()/awake()/_after_fork() flow. + if (self->_stopping && !self->_skip_shutdown) { + was_stopped = true; + } else { + self->_awake_waiting = true; + self->_awake_served = false; + self->_request->set(REQUEST_REASON_AWAKE); - self->_served->clear(); - self->_request->set(REQUEST_REASON_AWAKE); + self->_awake_cond->wait(lock, [self]() { return self->_awake_served; }); + self->_awake_waiting = false; + self->_awake_cond->notify_all(); + } + } - self->_served->wait(); + if (was_stopped) { + PyErr_SetString(PyExc_RuntimeError, "Periodic thread is stopped"); + return NULL; } Py_RETURN_NONE; @@ -720,8 +768,18 @@ PeriodicThread_stop(PeriodicThread* self, PyObject* Py_UNUSED(args)) return NULL; } - self->_stopping = true; - self->_request->set(REQUEST_REASON_STOP); + // Synchronize with awake() setup via _awake_mutex. Unlike the older + // Event-based handshake, awake() waits on _awake_cond, whose wait() + // releases this mutex while blocked. That preserves ordering with the + // awake() request publication without deadlocking a worker callback that + // calls stop() on itself. + { + AllowThreads _(self->_state); + std::lock_guard lock(*self->_awake_mutex); + + self->_stopping = true; + self->_request->set(REQUEST_REASON_STOP); + } Py_RETURN_NONE; } @@ -827,7 +885,6 @@ PeriodicThread__after_fork(PeriodicThread* self, PyObject* args, PyObject* kwarg self->_started->clear(); self->_stopped->clear(); - self->_served->clear(); // Use _PeriodicThread_do_start instead of PeriodicThread_start to // preserve _next_call_time from before the fork. This ensures that @@ -934,9 +991,9 @@ PeriodicThread_dealloc(PeriodicThread* self) self->_started = nullptr; self->_stopped = nullptr; self->_request = nullptr; - self->_served = nullptr; self->_awake_mutex = nullptr; + self->_awake_cond = nullptr; Py_TYPE(self)->tp_free((PyObject*)self); } diff --git a/tests/internal/test_periodic.py b/tests/internal/test_periodic.py index 94459a9c4f4..4d7ccb31f0f 100644 --- a/tests/internal/test_periodic.py +++ b/tests/internal/test_periodic.py @@ -48,6 +48,80 @@ def _run_periodic(): t.join() +def test_periodic_awake_after_stop_raises_not_hangs(): + """Regression: awake() after a completed stop() used to block forever. + + Once a worker has fully stopped, there is nothing left that can serve a + new awake request. The native awake() path must therefore reject the call + instead of waiting forever for a completion signal that will never come. + """ + t = periodic.PeriodicThread(60.0, lambda: None) + t.start() + t.stop() + t.join() # fully drained + + with pytest.raises(RuntimeError): + t.awake() + + +def test_periodic_awake_does_not_deadlock_with_stop_from_callback(): + """Regression: stop() called from inside a periodic callback must not + deadlock against a concurrent awake() holding the awake mutex. + + Timer._periodic uses the pattern: + + def _periodic(self): + self.timeout() + self.stop() + + A naive fix that has stop() acquire _awake_mutex creates this deadlock: + + 1. Thread A calls t.awake() — takes _awake_mutex, publishes AWAKE, + then waits for completion. + 2. The worker consumes AWAKE and runs the callback. + 3. The callback calls t.stop() on the worker thread. + 4. stop() blocks on _awake_mutex if awake() keeps it locked while + waiting. + 5. The worker cannot finish the callback and cannot reach + the wake-completion path — both threads wait forever. + + After the fix, awake() waits on a condition variable that releases + _awake_mutex while blocked, so stop() can synchronize with the awake + request publication without deadlocking the callback thread. + """ + from threading import Thread + + def _target(): + # Stop ourselves from the worker thread — this is exactly the + # Timer._periodic pattern. + t.stop() + + t = periodic.PeriodicThread(60.0, _target) + t.start() + + awake_done = Event() + + def _do_awake(): + try: + t.awake() + finally: + awake_done.set() + + awaker = Thread(target=_do_awake) + awaker.start() + + # The awake() call should return well within a second. Before the fix this + # deadlocked indefinitely because the worker-thread stop() tried to take + # _awake_mutex while awake() was blocked waiting for completion. + assert awake_done.wait(timeout=5.0), ( + "awake() did not return within 5s — stop()-from-callback deadlocked " + "against a concurrent awake() holding the awake mutex" + ) + + awaker.join(timeout=1.0) + t.join(timeout=1.0) + + def test_periodic_error(): x = {"OK": False} @@ -351,7 +425,11 @@ def _get_native_thread_name(): # Use pthread_getname_np pthread = ctypes.CDLL("/usr/lib/libpthread.dylib") pthread_getname_np = pthread.pthread_getname_np - pthread_getname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t] + pthread_getname_np.argtypes = [ + ctypes.c_void_p, + ctypes.c_char_p, + ctypes.c_size_t, + ] pthread_getname_np.restype = ctypes.c_int # Get current thread handle @@ -471,7 +549,11 @@ def _capture_native_name(): # On macOS (63 char limit), should keep the full class name "StackCollectorThread" thread_started.clear() native_name[0] = None - t2 = periodic.PeriodicThread(0.1, _capture_native_name, name="ddtrace.profiling.collector:StackCollectorThread") + t2 = periodic.PeriodicThread( + 0.1, + _capture_native_name, + name="ddtrace.profiling.collector:StackCollectorThread", + ) t2.start() thread_started.wait() t2.stop()