Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 75 additions & 18 deletions ddtrace/internal/_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,15 @@ typedef struct periodic_thread
// stopped_event->set() completes.
std::shared_ptr<Event> _stopped;
std::unique_ptr<Event> _request;
std::unique_ptr<Event> _served;

std::unique_ptr<std::mutex> _awake_mutex;
std::unique_ptr<std::condition_variable> _awake_cond;
// AIDEV-NOTE: awake() serializes on _awake_mutex but waits via
// condition_variable, not Event. condition_variable::wait releases the
// mutex while blocked, so stop() and _before_fork() can synchronize with
// awake() setup without deadlocking a worker callback that calls stop().
bool _awake_waiting;
bool _awake_served;

std::unique_ptr<std::thread> _thread;
} PeriodicThread;
Expand Down Expand Up @@ -461,9 +467,11 @@ PeriodicThread_init(PeriodicThread* self, PyObject* args, PyObject* kwargs)
self->_started = std::make_unique<Event>();
self->_stopped = std::make_shared<Event>();
self->_request = std::make_unique<Event>();
self->_served = std::make_unique<Event>();

self->_awake_mutex = std::make_unique<std::mutex>();
self->_awake_cond = std::make_unique<std::condition_variable>();
self->_awake_waiting = false;
self->_awake_served = false;

return 0;
}
Expand Down Expand Up @@ -496,6 +504,19 @@ PeriodicThread__on_shutdown(PeriodicThread* self)
Py_XDECREF(result);
}

// ----------------------------------------------------------------------------
static inline void
PeriodicThread__notify_awake_waiter(PeriodicThread* self)
{
std::lock_guard<std::mutex> lock(*self->_awake_mutex);

if (!self->_awake_waiting)
return;

self->_awake_served = true;
self->_awake_cond->notify_all();
}

// ----------------------------------------------------------------------------
// Internal helper: launches the thread after ensuring preconditions.
// If reset_next_call_time is true (normal start), _next_call_time is initialised
Expand Down Expand Up @@ -577,10 +598,12 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false
self->_started->set();

bool error = false;
bool stopped_by_fork = false;
if (self->_no_wait_at_start)
self->_request->set(REQUEST_REASON_AWAKE);

while (!self->_stopping) {
bool served_awake = false;
{
AllowThreads _(state);

Expand All @@ -592,15 +615,16 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false
// 2. regular stop(): consume all pending reasons.
const unsigned char stop_reasons =
self->_request->consume(REQUEST_REASON_FORK_STOP | REQUEST_REASON_STOP);
const bool has_fork_stop = (stop_reasons & REQUEST_REASON_FORK_STOP) != 0;
if (!has_fork_stop)
stopped_by_fork = (stop_reasons & REQUEST_REASON_FORK_STOP) != 0;
if (!stopped_by_fork)
self->_request->consume_all();
break;
}

// Request wakeup while running (awake/no_wait_at_start).
// Timer wakeups are the wait(...) == false branch.
self->_request->consume_all();
const unsigned char request_reasons = self->_request->consume_all();
served_awake = (request_reasons & REQUEST_REASON_AWAKE) != 0;
}
}

Expand All @@ -616,13 +640,15 @@ _PeriodicThread_do_start(PeriodicThread* self, bool reset_next_call_time = false
self->_next_call_time =
std::chrono::steady_clock::now() + std::chrono::milliseconds((long long)(self->interval * 1000));

// If this came from a request mark it as served
self->_served->set();
if (served_awake)
PeriodicThread__notify_awake_waiter(self);
}

// Set request served in case any threads are waiting while a thread is
// stopping.
self->_served->set();
// Permanent stop/error/finalization completes any in-flight
// awake(). A fork-stop does not: that request must survive for
// the worker restarted by _after_fork().
if (!stopped_by_fork)
PeriodicThread__notify_awake_waiter(self);

if (!state->is_finalizing()) {
// Run the shutdown callback if there was no error and we are not
Expand Down Expand Up @@ -698,14 +724,36 @@ PeriodicThread_awake(PeriodicThread* self, PyObject* Py_UNUSED(args))
return NULL;
}

bool was_stopped = false;
{
AllowThreads _(self->_state);
std::lock_guard<std::mutex> lock(*self->_awake_mutex);
std::unique_lock<std::mutex> lock(*self->_awake_mutex);

while (self->_awake_waiting)
self->_awake_cond->wait(lock);

// If stop() has been observed and we are not in the fork-paused
// window, the worker has either already exited or will exit without
// being restarted. _skip_shutdown is set only by _before_fork() and
// cleared by _after_fork(), so (_stopping && !_skip_shutdown)
// captures the permanently-stopped case while preserving the
// legitimate _before_fork()/awake()/_after_fork() flow.
if (self->_stopping && !self->_skip_shutdown) {
was_stopped = true;
} else {
self->_awake_waiting = true;
self->_awake_served = false;
self->_request->set(REQUEST_REASON_AWAKE);

self->_served->clear();
self->_request->set(REQUEST_REASON_AWAKE);
self->_awake_cond->wait(lock, [self]() { return self->_awake_served; });
self->_awake_waiting = false;
self->_awake_cond->notify_all();
}
}

self->_served->wait();
if (was_stopped) {
PyErr_SetString(PyExc_RuntimeError, "Periodic thread is stopped");
return NULL;
}

Py_RETURN_NONE;
Expand All @@ -720,8 +768,18 @@ PeriodicThread_stop(PeriodicThread* self, PyObject* Py_UNUSED(args))
return NULL;
}

self->_stopping = true;
self->_request->set(REQUEST_REASON_STOP);
// Synchronize with awake() setup via _awake_mutex. Unlike the older
// Event-based handshake, awake() waits on _awake_cond, whose wait()
// releases this mutex while blocked. That preserves ordering with the
// awake() request publication without deadlocking a worker callback that
// calls stop() on itself.
{
AllowThreads _(self->_state);
std::lock_guard<std::mutex> lock(*self->_awake_mutex);

Comment thread
r1viollet marked this conversation as resolved.
self->_stopping = true;
self->_request->set(REQUEST_REASON_STOP);
}

Py_RETURN_NONE;
}
Expand Down Expand Up @@ -827,7 +885,6 @@ PeriodicThread__after_fork(PeriodicThread* self, PyObject* args, PyObject* kwarg

self->_started->clear();
self->_stopped->clear();
self->_served->clear();

// Use _PeriodicThread_do_start instead of PeriodicThread_start to
// preserve _next_call_time from before the fork. This ensures that
Expand Down Expand Up @@ -934,9 +991,9 @@ PeriodicThread_dealloc(PeriodicThread* self)
self->_started = nullptr;
self->_stopped = nullptr;
self->_request = nullptr;
self->_served = nullptr;

self->_awake_mutex = nullptr;
self->_awake_cond = nullptr;

Py_TYPE(self)->tp_free((PyObject*)self);
}
Expand Down
86 changes: 84 additions & 2 deletions tests/internal/test_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,80 @@ def _run_periodic():
t.join()


def test_periodic_awake_after_stop_raises_not_hangs():
"""Regression: awake() after a completed stop() used to block forever.

Once a worker has fully stopped, there is nothing left that can serve a
new awake request. The native awake() path must therefore reject the call
instead of waiting forever for a completion signal that will never come.
"""
t = periodic.PeriodicThread(60.0, lambda: None)
t.start()
t.stop()
t.join() # fully drained

with pytest.raises(RuntimeError):
t.awake()


def test_periodic_awake_does_not_deadlock_with_stop_from_callback():
"""Regression: stop() called from inside a periodic callback must not
deadlock against a concurrent awake() holding the awake mutex.

Timer._periodic uses the pattern:

def _periodic(self):
self.timeout()
self.stop()

A naive fix that has stop() acquire _awake_mutex creates this deadlock:

1. Thread A calls t.awake() — takes _awake_mutex, publishes AWAKE,
then waits for completion.
2. The worker consumes AWAKE and runs the callback.
3. The callback calls t.stop() on the worker thread.
4. stop() blocks on _awake_mutex if awake() keeps it locked while
waiting.
5. The worker cannot finish the callback and cannot reach
the wake-completion path — both threads wait forever.

After the fix, awake() waits on a condition variable that releases
_awake_mutex while blocked, so stop() can synchronize with the awake
request publication without deadlocking the callback thread.
"""
from threading import Thread

def _target():
# Stop ourselves from the worker thread — this is exactly the
# Timer._periodic pattern.
t.stop()

t = periodic.PeriodicThread(60.0, _target)
t.start()

awake_done = Event()

def _do_awake():
try:
t.awake()
finally:
awake_done.set()

awaker = Thread(target=_do_awake)
awaker.start()

# The awake() call should return well within a second. Before the fix this
# deadlocked indefinitely because the worker-thread stop() tried to take
# _awake_mutex while awake() was blocked waiting for completion.
assert awake_done.wait(timeout=5.0), (
"awake() did not return within 5s — stop()-from-callback deadlocked "
"against a concurrent awake() holding the awake mutex"
)

awaker.join(timeout=1.0)
t.join(timeout=1.0)


def test_periodic_error():
x = {"OK": False}

Expand Down Expand Up @@ -351,7 +425,11 @@ def _get_native_thread_name():
# Use pthread_getname_np
pthread = ctypes.CDLL("/usr/lib/libpthread.dylib")
pthread_getname_np = pthread.pthread_getname_np
pthread_getname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t]
pthread_getname_np.argtypes = [
ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_size_t,
]
pthread_getname_np.restype = ctypes.c_int

# Get current thread handle
Expand Down Expand Up @@ -471,7 +549,11 @@ def _capture_native_name():
# On macOS (63 char limit), should keep the full class name "StackCollectorThread"
thread_started.clear()
native_name[0] = None
t2 = periodic.PeriodicThread(0.1, _capture_native_name, name="ddtrace.profiling.collector:StackCollectorThread")
t2 = periodic.PeriodicThread(
0.1,
_capture_native_name,
name="ddtrace.profiling.collector:StackCollectorThread",
)
t2.start()
thread_started.wait()
t2.stop()
Expand Down
Loading