Skip to content

Commit 3a7df63

Browse files
gpsheaditamaro
andauthored
pythongh-146313: Fix multiprocessing ResourceTracker deadlock after os.fork() (pythonGH-146316)
`ResourceTracker.__del__` (added in pythongh-88887 circa Python 3.12) calls os.waitpid(pid, 0) which blocks indefinitely if a process created via os.fork() still holds the tracker pipe's write end. The tracker never sees EOF, never exits, and the parent hangs at interpreter shutdown. Fix with two layers: - **At-fork handler.** An os.register_at_fork(after_in_child=...) handler closes the inherited pipe fd in the child unless a preserve flag is set. popen_fork.Popen._launch() sets the flag before its fork so mp.Process(fork) children keep the fd and reuse the parent's tracker (preserving pythongh-80849). Raw os.fork() children close the fd, letting the parent reap promptly. - **Timeout safety-net.** _stop_locked() gains a wait_timeout parameter. When called from `__del__`, it polls with WNOHANG using exponential backoff for up to 1 second instead of blocking indefinitely. The at-fork handler makes this unreachable in well-behaved paths; it remains for abnormal shutdowns. Co-authored-by: Itamar Oren <itamarost@gmail.com>
1 parent cbd81d5 commit 3a7df63

File tree

4 files changed

+279
-8
lines changed

4 files changed

+279
-8
lines changed

Lib/multiprocessing/popen_fork.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,17 @@ def _launch(self, process_obj):
6767
code = 1
6868
parent_r, child_w = os.pipe()
6969
child_r, parent_w = os.pipe()
70-
self.pid = os.fork()
70+
# gh-146313: Tell the resource tracker's at-fork handler to keep
71+
# the inherited pipe fd so this child reuses the parent's tracker
72+
# (gh-80849) rather than closing it and launching its own.
73+
from .resource_tracker import _fork_intent
74+
_fork_intent.preserve_fd = True
75+
try:
76+
self.pid = os.fork()
77+
finally:
78+
# Reset in both parent and child so the flag does not leak
79+
# into a subsequent raw os.fork() or nested Process launch.
80+
_fork_intent.preserve_fd = False
7181
if self.pid == 0:
7282
try:
7383
atexit._clear()

Lib/multiprocessing/resource_tracker.py

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import signal
2121
import sys
2222
import threading
23+
import time
2324
import warnings
2425
from collections import deque
2526

@@ -75,6 +76,10 @@ def __init__(self):
7576
# The reader should understand all formats.
7677
self._use_simple_format = False
7778

79+
# Set to True by _stop_locked() if the waitpid polling loop ran to
80+
# its timeout without reaping the tracker. Exposed for tests.
81+
self._waitpid_timed_out = False
82+
7883
def _reentrant_call_error(self):
7984
# gh-109629: this happens if an explicit call to the ResourceTracker
8085
# gets interrupted by a garbage collection, invoking a finalizer (*)
@@ -87,16 +92,51 @@ def __del__(self):
8792
# making sure child processess are cleaned before ResourceTracker
8893
# gets destructed.
8994
# see https://github.com/python/cpython/issues/88887
90-
self._stop(use_blocking_lock=False)
95+
# gh-146313: use a timeout to avoid deadlocking if a forked child
96+
# still holds the pipe's write end open.
97+
self._stop(use_blocking_lock=False, wait_timeout=1.0)
98+
99+
def _after_fork_in_child(self):
100+
# gh-146313: Called in the child right after os.fork().
101+
#
102+
# The tracker process is a child of the *parent*, not of us, so we
103+
# could never waitpid() it anyway. Clearing _pid means our __del__
104+
# becomes a no-op (the early return for _pid is None).
105+
#
106+
# Whether we keep the inherited _fd depends on who forked us:
107+
#
108+
# - multiprocessing.Process with the 'fork' start method sets
109+
# _fork_intent.preserve_fd before forking. The child keeps the
110+
# fd and reuses the parent's tracker (gh-80849). This is safe
111+
# because multiprocessing's atexit handler joins all children
112+
# before the parent's __del__ runs, so by then the fd copies
113+
# are gone and the parent can reap the tracker promptly.
114+
#
115+
# - A raw os.fork() leaves the flag unset. We close the fd in the child after forking so
116+
# the parent's __del__ can reap the tracker without waiting
117+
# for the child to exit. If we later need a tracker, ensure_running()
118+
# will launch a fresh one.
119+
self._lock._at_fork_reinit()
120+
self._reentrant_messages.clear()
121+
self._pid = None
122+
self._exitcode = None
123+
if (self._fd is not None and
124+
not getattr(_fork_intent, 'preserve_fd', False)):
125+
fd = self._fd
126+
self._fd = None
127+
try:
128+
os.close(fd)
129+
except OSError:
130+
pass
91131

92-
def _stop(self, use_blocking_lock=True):
132+
def _stop(self, use_blocking_lock=True, wait_timeout=None):
93133
if use_blocking_lock:
94134
with self._lock:
95-
self._stop_locked()
135+
self._stop_locked(wait_timeout=wait_timeout)
96136
else:
97137
acquired = self._lock.acquire(blocking=False)
98138
try:
99-
self._stop_locked()
139+
self._stop_locked(wait_timeout=wait_timeout)
100140
finally:
101141
if acquired:
102142
self._lock.release()
@@ -106,6 +146,10 @@ def _stop_locked(
106146
close=os.close,
107147
waitpid=os.waitpid,
108148
waitstatus_to_exitcode=os.waitstatus_to_exitcode,
149+
monotonic=time.monotonic,
150+
sleep=time.sleep,
151+
WNOHANG=getattr(os, 'WNOHANG', None),
152+
wait_timeout=None,
109153
):
110154
# This shouldn't happen (it might when called by a finalizer)
111155
# so we check for it anyway.
@@ -122,7 +166,30 @@ def _stop_locked(
122166
self._fd = None
123167

124168
try:
125-
_, status = waitpid(self._pid, 0)
169+
if wait_timeout is None:
170+
_, status = waitpid(self._pid, 0)
171+
else:
172+
# gh-146313: A forked child may still hold the pipe's write
173+
# end open, preventing the tracker from seeing EOF and
174+
# exiting. Poll with WNOHANG to avoid blocking forever.
175+
deadline = monotonic() + wait_timeout
176+
delay = 0.001
177+
while True:
178+
result_pid, status = waitpid(self._pid, WNOHANG)
179+
if result_pid != 0:
180+
break
181+
remaining = deadline - monotonic()
182+
if remaining <= 0:
183+
# The tracker is still running; it will be
184+
# reparented to PID 1 (or the nearest subreaper)
185+
# when we exit, and reaped there once all pipe
186+
# holders release their fd.
187+
self._pid = None
188+
self._exitcode = None
189+
self._waitpid_timed_out = True
190+
return
191+
delay = min(delay * 2, remaining, 0.1)
192+
sleep(delay)
126193
except ChildProcessError:
127194
self._pid = None
128195
self._exitcode = None
@@ -308,12 +375,24 @@ def _send(self, cmd, name, rtype):
308375

309376
self._ensure_running_and_write(msg)
310377

378+
# gh-146313: Per-thread flag set by .popen_fork.Popen._launch() just before
379+
# os.fork(), telling _after_fork_in_child() to keep the inherited pipe fd so
380+
# the child can reuse this tracker (gh-80849). Unset for raw os.fork() calls,
381+
# where the child instead closes the fd so the parent's __del__ can reap the
382+
# tracker. Using threading.local() keeps multiple threads calling
383+
# popen_fork.Popen._launch() at once from clobbering eachothers intent.
384+
_fork_intent = threading.local()
385+
311386
_resource_tracker = ResourceTracker()
312387
ensure_running = _resource_tracker.ensure_running
313388
register = _resource_tracker.register
314389
unregister = _resource_tracker.unregister
315390
getfd = _resource_tracker.getfd
316391

392+
# gh-146313: See _after_fork_in_child docstring.
393+
if hasattr(os, 'register_at_fork'):
394+
os.register_at_fork(after_in_child=_resource_tracker._after_fork_in_child)
395+
317396

318397
def _decode_message(line):
319398
if line.startswith(b'{'):

Lib/test/_test_multiprocessing.py

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6321,8 +6321,9 @@ def test_resource_tracker_sigkill(self):
63216321
def _is_resource_tracker_reused(conn, pid):
63226322
from multiprocessing.resource_tracker import _resource_tracker
63236323
_resource_tracker.ensure_running()
6324-
# The pid should be None in the child process, expect for the fork
6325-
# context. It should not be a new value.
6324+
# The pid should be None in the child (the at-fork handler clears
6325+
# it for fork; spawn/forkserver children never had it set). It
6326+
# should not be a new value.
63266327
reused = _resource_tracker._pid in (None, pid)
63276328
reused &= _resource_tracker._check_alive()
63286329
conn.send(reused)
@@ -6408,6 +6409,183 @@ def test_resource_tracker_blocked_signals(self):
64086409
# restore sigmask to what it was before executing test
64096410
signal.pthread_sigmask(signal.SIG_SETMASK, orig_sigmask)
64106411

6412+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6413+
def test_resource_tracker_fork_deadlock(self):
6414+
# gh-146313: ResourceTracker.__del__ used to deadlock if a forked
6415+
# child still held the pipe's write end open when the parent
6416+
# exited, because the parent would block in waitpid() waiting for
6417+
# the tracker to exit, but the tracker would never see EOF.
6418+
cmd = '''if 1:
6419+
import os, signal
6420+
from multiprocessing.resource_tracker import ensure_running
6421+
ensure_running()
6422+
if os.fork() == 0:
6423+
signal.pause()
6424+
os._exit(0)
6425+
# parent falls through and exits, triggering __del__
6426+
'''
6427+
proc = subprocess.Popen([sys.executable, '-c', cmd],
6428+
start_new_session=True)
6429+
try:
6430+
try:
6431+
proc.wait(timeout=support.SHORT_TIMEOUT)
6432+
except subprocess.TimeoutExpired:
6433+
self.fail(
6434+
"Parent process deadlocked in ResourceTracker.__del__"
6435+
)
6436+
self.assertEqual(proc.returncode, 0)
6437+
finally:
6438+
try:
6439+
os.killpg(proc.pid, signal.SIGKILL)
6440+
except ProcessLookupError:
6441+
pass
6442+
proc.wait()
6443+
6444+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6445+
def test_resource_tracker_mp_fork_reuse_and_prompt_reap(self):
6446+
# gh-146313 / gh-80849: A child started via multiprocessing.Process
6447+
# with the 'fork' start method should reuse the parent's resource
6448+
# tracker (the at-fork handler preserves the inherited pipe fd),
6449+
# *and* the parent should be able to reap the tracker promptly
6450+
# after joining the child, without hitting the waitpid timeout.
6451+
cmd = textwrap.dedent('''
6452+
import multiprocessing as mp
6453+
from multiprocessing.resource_tracker import _resource_tracker
6454+
6455+
def child(conn):
6456+
# Prove we can talk to the parent's tracker by registering
6457+
# and unregistering a dummy resource over the inherited fd.
6458+
# If the fd were closed, ensure_running would launch a new
6459+
# tracker and _pid would be non-None.
6460+
_resource_tracker.register("x", "dummy")
6461+
_resource_tracker.unregister("x", "dummy")
6462+
conn.send((_resource_tracker._fd is not None,
6463+
_resource_tracker._pid is None,
6464+
_resource_tracker._check_alive()))
6465+
6466+
if __name__ == "__main__":
6467+
mp.set_start_method("fork")
6468+
_resource_tracker.ensure_running()
6469+
r, w = mp.Pipe(duplex=False)
6470+
p = mp.Process(target=child, args=(w,))
6471+
p.start()
6472+
child_has_fd, child_pid_none, child_alive = r.recv()
6473+
p.join()
6474+
w.close(); r.close()
6475+
6476+
# Now simulate __del__: the child has exited and released
6477+
# its fd copy, so the tracker should see EOF and exit
6478+
# promptly -- no timeout.
6479+
_resource_tracker._stop(wait_timeout=5.0)
6480+
print(child_has_fd, child_pid_none, child_alive,
6481+
_resource_tracker._waitpid_timed_out,
6482+
_resource_tracker._exitcode)
6483+
''')
6484+
rc, out, err = script_helper.assert_python_ok('-c', cmd)
6485+
parts = out.decode().split()
6486+
self.assertEqual(parts, ['True', 'True', 'True', 'False', '0'],
6487+
f"unexpected: {parts!r} stderr={err!r}")
6488+
6489+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6490+
def test_resource_tracker_raw_fork_prompt_reap(self):
6491+
# gh-146313: After a raw os.fork() the at-fork handler closes the
6492+
# child's inherited fd, so the parent can reap the tracker
6493+
# immediately -- even while the child is still alive -- rather
6494+
# than waiting out the 1s timeout.
6495+
cmd = textwrap.dedent('''
6496+
import os, signal
6497+
from multiprocessing.resource_tracker import _resource_tracker
6498+
6499+
_resource_tracker.ensure_running()
6500+
r, w = os.pipe()
6501+
pid = os.fork()
6502+
if pid == 0:
6503+
os.close(r)
6504+
# Report whether our fd was closed by the at-fork handler.
6505+
os.write(w, b"1" if _resource_tracker._fd is None else b"0")
6506+
os.close(w)
6507+
signal.pause() # stay alive so parent's reap is meaningful
6508+
os._exit(0)
6509+
os.close(w)
6510+
child_fd_closed = os.read(r, 1) == b"1"
6511+
os.close(r)
6512+
6513+
# Child is still alive and paused. Because it closed its fd
6514+
# copy, our close below is the last one and the tracker exits.
6515+
_resource_tracker._stop(wait_timeout=5.0)
6516+
6517+
os.kill(pid, signal.SIGKILL)
6518+
os.waitpid(pid, 0)
6519+
print(child_fd_closed,
6520+
_resource_tracker._waitpid_timed_out,
6521+
_resource_tracker._exitcode)
6522+
''')
6523+
rc, out, err = script_helper.assert_python_ok('-c', cmd)
6524+
parts = out.decode().split()
6525+
self.assertEqual(parts, ['True', 'False', '0'],
6526+
f"unexpected: {parts!r} stderr={err!r}")
6527+
6528+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6529+
def test_resource_tracker_lock_reinit_after_fork(self):
6530+
# gh-146313: If a parent thread held the tracker's lock at fork
6531+
# time, the child would inherit the held lock and deadlock on
6532+
# its next ensure_running(). The at-fork handler reinits it.
6533+
cmd = textwrap.dedent('''
6534+
import os, threading
6535+
from multiprocessing.resource_tracker import _resource_tracker
6536+
6537+
held = threading.Event()
6538+
release = threading.Event()
6539+
def hold():
6540+
with _resource_tracker._lock:
6541+
held.set()
6542+
release.wait()
6543+
t = threading.Thread(target=hold)
6544+
t.start()
6545+
held.wait()
6546+
6547+
pid = os.fork()
6548+
if pid == 0:
6549+
ok = _resource_tracker._lock.acquire(timeout=5.0)
6550+
os._exit(0 if ok else 1)
6551+
6552+
release.set()
6553+
t.join()
6554+
_, status = os.waitpid(pid, 0)
6555+
print(os.waitstatus_to_exitcode(status))
6556+
''')
6557+
rc, out, err = script_helper.assert_python_ok(
6558+
'-W', 'ignore::DeprecationWarning', '-c', cmd)
6559+
self.assertEqual(out.strip(), b'0',
6560+
f"child failed to acquire lock: stderr={err!r}")
6561+
6562+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6563+
def test_resource_tracker_safety_net_timeout(self):
6564+
# gh-146313: When an mp.Process(fork) child holds the preserved
6565+
# fd and the parent calls _stop() without joining (simulating
6566+
# abnormal shutdown), the safety-net timeout should fire rather
6567+
# than deadlocking.
6568+
cmd = textwrap.dedent('''
6569+
import multiprocessing as mp
6570+
import signal
6571+
from multiprocessing.resource_tracker import _resource_tracker
6572+
6573+
if __name__ == "__main__":
6574+
mp.set_start_method("fork")
6575+
_resource_tracker.ensure_running()
6576+
p = mp.Process(target=signal.pause)
6577+
p.start()
6578+
# Stop WITHOUT joining -- child still holds preserved fd
6579+
_resource_tracker._stop(wait_timeout=0.5)
6580+
print(_resource_tracker._waitpid_timed_out)
6581+
p.terminate()
6582+
p.join()
6583+
''')
6584+
rc, out, err = script_helper.assert_python_ok('-c', cmd)
6585+
self.assertEqual(out.strip(), b'True',
6586+
f"safety-net timeout did not fire: stderr={err!r}")
6587+
6588+
64116589
class TestSimpleQueue(unittest.TestCase):
64126590

64136591
@classmethod
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Fix a deadlock in :mod:`multiprocessing`'s resource tracker
2+
where the parent process could hang indefinitely in :func:`os.waitpid`
3+
during interpreter shutdown if a child created via :func:`os.fork` still
4+
held the resource tracker's pipe open.

0 commit comments

Comments
 (0)