Skip to content

Commit 4586038

Browse files
r1violletclaude
andcommitted
chore(internal): raise instead of hanging on awake() after stop()
PeriodicThread.awake() cleared _served and waited on it. When called after stop() had completed, the worker had already set _served one last time during its cleanup and exited — no future set() would satisfy the wait, so the call blocked forever. Since the wait ran with the GIL released, even Python-level signal handlers could not interrupt it; the only escape was SIGKILL. Serialize stop() and awake() via the existing _awake_mutex, and in awake() re-check under the mutex: if _stopping is set and we are not in the _before_fork()/_after_fork() window (tracked by _skip_shutdown), raise RuntimeError rather than risk blocking on a _served event nothing will set. Internal API; no release note. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 56e77c7 commit 4586038

2 files changed

Lines changed: 80 additions & 12 deletions

File tree

ddtrace/internal/_threads.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -698,14 +698,33 @@ PeriodicThread_awake(PeriodicThread* self, PyObject* Py_UNUSED(args))
698698
return NULL;
699699
}
700700

701+
bool was_stopped = false;
701702
{
702703
AllowThreads _(self->_state);
703704
std::lock_guard<std::mutex> lock(*self->_awake_mutex);
704705

705-
self->_served->clear();
706-
self->_request->set(REQUEST_REASON_AWAKE);
706+
// If stop() has fired and we are not in the fork-paused window, the
707+
// worker has either already exited or is about to exit and will not
708+
// be restarted. In that case a future _served->set() is not
709+
// guaranteed, and clearing _served first would strand us in wait()
710+
// forever. _skip_shutdown is set only by _before_fork() and cleared
711+
// by _after_fork(), so (_stopping && !_skip_shutdown) captures the
712+
// permanently-stopped case without blocking the legitimate
713+
// _before_fork()/awake()/_after_fork() sequence (the new worker
714+
// started by _after_fork() will consume the queued request).
715+
if (self->_stopping && !self->_skip_shutdown) {
716+
was_stopped = true;
717+
} else {
718+
self->_served->clear();
719+
self->_request->set(REQUEST_REASON_AWAKE);
720+
721+
self->_served->wait();
722+
}
723+
}
707724

708-
self->_served->wait();
725+
if (was_stopped) {
726+
PyErr_SetString(PyExc_RuntimeError, "Periodic thread is stopped");
727+
return NULL;
709728
}
710729

711730
Py_RETURN_NONE;
@@ -720,8 +739,17 @@ PeriodicThread_stop(PeriodicThread* self, PyObject* Py_UNUSED(args))
720739
return NULL;
721740
}
722741

723-
self->_stopping = true;
724-
self->_request->set(REQUEST_REASON_STOP);
742+
// Synchronize with awake() via _awake_mutex: without this, stop() can
743+
// flip _stopping (and the worker can observe it, exit, and set _served
744+
// one last time) in between awake()'s recheck and its _served->clear(),
745+
// leaving awake() blocked on a _served that nothing will set again.
746+
{
747+
AllowThreads _(self->_state);
748+
std::lock_guard<std::mutex> lock(*self->_awake_mutex);
749+
750+
self->_stopping = true;
751+
self->_request->set(REQUEST_REASON_STOP);
752+
}
725753

726754
Py_RETURN_NONE;
727755
}

tests/internal/test_periodic.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,28 @@ def _run_periodic():
4848
t.join()
4949

5050

51+
def test_periodic_awake_after_stop_raises_not_hangs():
52+
"""Regression: awake() after a completed stop() used to block forever.
53+
54+
The native awake() cleared _served and then waited on it, but the worker
55+
thread had already set _served one last time during its cleanup and was
56+
gone — nothing would set _served again. The wait happened with the GIL
57+
released, so not even a Python-level SIGALRM handler could interrupt it;
58+
the only escape was SIGKILL.
59+
60+
After the fix, awake() rechecks _stopping under the awake mutex (with
61+
stop() acquiring the same mutex) and raises RuntimeError instead of
62+
blocking.
63+
"""
64+
t = periodic.PeriodicThread(60.0, lambda: None)
65+
t.start()
66+
t.stop()
67+
t.join() # fully drained
68+
69+
with pytest.raises(RuntimeError):
70+
t.awake()
71+
72+
5173
def test_periodic_error():
5274
x = {"OK": False}
5375

@@ -351,7 +373,11 @@ def _get_native_thread_name():
351373
# Use pthread_getname_np
352374
pthread = ctypes.CDLL("/usr/lib/libpthread.dylib")
353375
pthread_getname_np = pthread.pthread_getname_np
354-
pthread_getname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t]
376+
pthread_getname_np.argtypes = [
377+
ctypes.c_void_p,
378+
ctypes.c_char_p,
379+
ctypes.c_size_t,
380+
]
355381
pthread_getname_np.restype = ctypes.c_int
356382

357383
# Get current thread handle
@@ -464,14 +490,20 @@ def _capture_native_name():
464490
t1.join()
465491

466492
if native_name[0] is not None:
467-
assert native_name[0] == "ShortName", f"Expected 'ShortName', got '{native_name[0]}'"
493+
assert native_name[0] == "ShortName", (
494+
f"Expected 'ShortName', got '{native_name[0]}'"
495+
)
468496

469497
# Test 2: Long name with module:class format
470498
# On Linux (15 char limit), should keep "StackCollectorThread" -> truncated to "StackCollectorT"
471499
# On macOS (63 char limit), should keep the full class name "StackCollectorThread"
472500
thread_started.clear()
473501
native_name[0] = None
474-
t2 = periodic.PeriodicThread(0.1, _capture_native_name, name="ddtrace.profiling.collector:StackCollectorThread")
502+
t2 = periodic.PeriodicThread(
503+
0.1,
504+
_capture_native_name,
505+
name="ddtrace.profiling.collector:StackCollectorThread",
506+
)
475507
t2.start()
476508
thread_started.wait()
477509
t2.stop()
@@ -480,7 +512,9 @@ def _capture_native_name():
480512
if native_name[0] is not None:
481513
if system == "Linux":
482514
# Linux truncates to 15 characters
483-
assert native_name[0] == "StackCollectorT", f"Expected 'StackCollectorT' on Linux, got '{native_name[0]}'"
515+
assert native_name[0] == "StackCollectorT", (
516+
f"Expected 'StackCollectorT' on Linux, got '{native_name[0]}'"
517+
)
484518
elif system == "Darwin":
485519
# macOS can fit the full class name
486520
assert native_name[0].endswith("StackCollectorThread"), (
@@ -490,7 +524,9 @@ def _capture_native_name():
490524
# Test 3: Long name without colon (should truncate from start)
491525
thread_started.clear()
492526
native_name[0] = None
493-
t3 = periodic.PeriodicThread(0.1, _capture_native_name, name="VeryLongThreadNameWithoutColonSeparator")
527+
t3 = periodic.PeriodicThread(
528+
0.1, _capture_native_name, name="VeryLongThreadNameWithoutColonSeparator"
529+
)
494530
t3.start()
495531
thread_started.wait()
496532
t3.stop()
@@ -499,7 +535,9 @@ def _capture_native_name():
499535
if native_name[0] is not None:
500536
if system == "Linux":
501537
# Should truncate from the start to 15 characters
502-
assert native_name[0] == "VeryLongThreadN", f"Expected 'VeryLongThreadN' on Linux, got '{native_name[0]}'"
538+
assert native_name[0] == "VeryLongThreadN", (
539+
f"Expected 'VeryLongThreadN' on Linux, got '{native_name[0]}'"
540+
)
503541
elif system == "Darwin":
504542
# macOS limit is 63, name is 41 chars, should fit fully
505543
assert native_name[0] == "VeryLongThreadNameWithoutColonSeparator", (
@@ -510,7 +548,9 @@ def _capture_native_name():
510548
# "module:ClassNameFit" on Linux should become "ClassNameFit" (12 chars, fits)
511549
thread_started.clear()
512550
native_name[0] = None
513-
t4 = periodic.PeriodicThread(0.1, _capture_native_name, name="some.module:ClassNameFit")
551+
t4 = periodic.PeriodicThread(
552+
0.1, _capture_native_name, name="some.module:ClassNameFit"
553+
)
514554
t4.start()
515555
thread_started.wait()
516556
t4.stop()

0 commit comments

Comments
 (0)