Skip to content

Commit 9571043

Browse files
authored
Fix parallel-coordinator hang when summary pipe saturates (#248)
* Fix parallel-coordinator hang when summary pipe saturates The coordinator drained the 'summary' queue only after joining all worker processes. With enough queued data (or a single large testsFailed dict), the summary-pipe buffer (~64 KiB on Linux) saturates and worker feeder threads block in pipe_write, both inside on_timeout's join_thread() and during Python's end-of-process queue finalization. This in turn hangs the coordinator's p.join() indefinitely. Introduce a module-level helper _join_workers_with_summary_drain that joins workers while continuously draining 'summary' from a background thread, and use it in execute(). Also correct the stale comment in the on_timeout closure to describe the actual watcher-thread os._exit(1) flow. * Use SimpleQueue for summary in parallel coordinator SimpleQueue.put is synchronous (no feeder thread, no internal buffer), so a successful put() implies the bytes are already in the kernel pipe. That removes the need for the summary.close() + summary.join_thread() dance in on_timeout before the watcher's os._exit(1), and the comment that explained it. The coordinator-side drain thread is updated to a blocking get() driven by a sentinel on shutdown, eliminating its busy-loop timeout too. results stays a Queue because the progressbar liveness loop relies on get(timeout=...), which SimpleQueue does not expose publicly. * Address review: reorder puts in on_timeout, fix stale comment In the on_timeout closure, put 'results' before 'summary'. The summary queue is a SimpleQueue with a synchronous put(), and the coordinator only starts draining it after every result is in. Putting summary first risked blocking on a full summary pipe while the coordinator was still waiting on this worker's result, which would have stalled the whole results-collection loop. Putting results first guarantees the worker's output reaches the coordinator unconditionally; the subsequent summary put may briefly block but always unblocks once the coordinator moves to the drain phase. Also drop the stale 'feeder threads' wording near the call site: the summary queue no longer has a feeder thread. * Collapse parallel results+summary into a single queue The parallel coordinator used two queues: 'results' for per-test output (read by the progressbar loop) and 'summary' for per-worker aggregates ('done' count and the worker's full testsFailed dict, read after the loop). Workers' summary.put could block on a full pipe because the coordinator only drained summary in a second phase, after every results message had been received. Collapse to a single 'results' queue carrying one self-contained message per test: { test_name, output, done, failures }. The worker resets self.testsFailed = {} before each test so addFailure() writes into a fresh dict that ships verbatim; the worker keeps no cumulative state. The coordinator owns the canonical testsFailed via update() per message. This eliminates the deadlock by construction: the only queue is drained continuously by the coordinator's progressbar loop for the entire lifetime of the workers, so worker put()s can never block on a full pipe. Removes the SimpleQueue import, the _SUMMARY_DRAIN_STOP sentinel, and the _join_workers_with_summary_drain helper. on_timeout shrinks to a single put + close + join_thread. The unit test is updated to exercise the new pattern: workers push many large per-test messages while the main thread drains them live. * Enhance parallel test execution: add shutdown messages and improve result handling * Address review: ship shutdown sentinel from on_timeout; misc fixups - on_timeout now ships both the per-test result (with shutdown=False) and the shutdown sentinel before the watcher thread calls os._exit(1), keeping the coordinator's bounded count of n_jobs + parallelism accurate when a worker dies on timeout. Also fixes a KeyError on the missing 'shutdown' key in the timeout payload. - Grammar: 'no more processors is alive' -> 'are alive'. - test_parallel_drain: use time.monotonic() for elapsed measurement.
1 parent b2c5fd3 commit 9571043

2 files changed

Lines changed: 142 additions & 39 deletions

File tree

RLTest/__main__.py

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -941,44 +941,60 @@ def on_timeout():
941941

942942
self.takeEnvDown(fullShutDown=True)
943943

944-
def run_jobs(jobs, results, summary, port):
944+
def run_jobs(jobs, results, port):
945945
Defaults.port = port
946-
done = 0
947946
while True:
948947
try:
949948
test = jobs.get(timeout=0.1)
950949
except Exception as e:
951950
break
952951

952+
# Reset per-test: addFailure() in this worker writes into this
953+
# dict, which is shipped as-is. The coordinator owns the
954+
# cumulative testsFailed; the worker keeps no state across tests.
955+
self.testsFailed = {}
953956
output = io.StringIO()
954957
with redirect_stdout(output):
955958
def on_timeout():
956-
nonlocal done
957959
try:
958-
done += 1
959960
self.killEnvWithSegFault()
960961
self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Test timeout'))
961962
except Exception as e:
962963
self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e)))
963964
finally:
964-
results.put({'test_name': test.name, "output": output.getvalue()}, block=False)
965-
summary.put({'done': done, 'failures': self.testsFailed}, block=False)
966-
# After we return the processes will be killed, so we must make sure the queues are drained properly.
965+
# The watcher thread calls os._exit(1) right after
966+
# this returns, bypassing Python finalization and
967+
# the normal post-loop shutdown put. Ship both the
968+
# per-test result and the shutdown sentinel here so
969+
# the coordinator's bounded count of
970+
# n_jobs + parallelism remains accurate. close() +
971+
# join_thread() flushes both puts to the pipe first.
972+
results.put({'test_name': test.name, 'output': output.getvalue(),
973+
'done': 1, 'failures': self.testsFailed,
974+
'shutdown': False}, block=False)
975+
results.put({'test_name': '<worker shutdown>', 'output': '',
976+
'done': 0, 'failures': {},
977+
'shutdown': True}, block=False)
967978
results.close()
968-
summary.close()
969-
summary.join_thread()
970979
results.join_thread()
971-
done += self.run_single_test(test, on_timeout)
972980

973-
results.put({'test_name': test.name, "output": output.getvalue()}, block=False)
981+
done_delta = self.run_single_test(test, on_timeout)
974982

975-
self.takeEnvDown(fullShutDown=True)
983+
results.put({'test_name': test.name, 'output': output.getvalue(),
984+
'done': done_delta, 'failures': self.testsFailed,
985+
'shutdown': False}, block=False)
976986

977-
# serialized the results back
978-
summary.put({'done': done, 'failures': self.testsFailed}, block=False)
987+
# Always ship one shutdown message per worker so the coordinator
988+
# reads a known total of n_jobs + parallelism messages. Captures
989+
# failures raised during final shutdown (e.g. "redis did not exit
990+
# cleanly" when env_reuse=True).
991+
self.testsFailed = {}
992+
self.takeEnvDown(fullShutDown=True)
993+
results.put({'test_name': '<worker shutdown>', 'output': '',
994+
'done': 0, 'failures': self.testsFailed,
995+
'shutdown': True}, block=False)
979996

980997
results = Queue()
981-
summary = Queue()
982998
# Open group for all tests at the start (parallel execution)
983999
self._openGitHubActionsTestsGroup()
9841000
if self.parallelism == 1:
@@ -987,39 +1003,40 @@ def on_timeout():
9871003
processes = []
9881004
currPort = Defaults.port
9891005
for i in range(self.parallelism):
990-
p = Process(target=run_jobs, args=(jobs,results,summary,currPort))
1006+
p = Process(target=run_jobs, args=(jobs,results,currPort))
9911007
currPort += 30 # safe distance for cluster and replicas
9921008
processes.append(p)
9931009
p.start()
994-
for _ in self.progressbar(n_jobs):
1010+
# Workers send exactly n_jobs per-test messages plus one shutdown
1011+
# message each, for a known total. The single shared queue does
1012+
# not preserve per-worker ordering, so a fast worker's shutdown
1013+
# may arrive before a slow worker's last test. We read every
1014+
# message in one bounded loop and tick the progressbar only on
1015+
# per-test ones. The has_live_processor guard turns a worker
1016+
# crash before it ships its shutdown message into a clean error
1017+
# instead of an indefinite hang.
1018+
def _get_result():
9951019
while True:
996-
# check if we have some lives executors
997-
has_live_processor = False
998-
for p in processes:
999-
if p.is_alive():
1000-
has_live_processor = True
1001-
break
10021020
try:
1003-
res = results.get(timeout=1)
1004-
break
1005-
except Exception as e:
1006-
if not has_live_processor:
1007-
raise Exception('Failed to get job result and no more processors is alive')
1008-
output = res['output']
1009-
print('%s' % output, end="")
1021+
return results.get(timeout=1)
1022+
except Exception:
1023+
if not any(p.is_alive() for p in processes):
1024+
raise Exception('Failed to get job result and no more processors are alive')
1025+
1026+
bar_iter = iter(self.progressbar(n_jobs))
1027+
for _ in range(n_jobs + self.parallelism):
1028+
res = _get_result()
1029+
if res['output']:
1030+
print('%s' % res['output'], end="")
1031+
done += res['done']
1032+
self.testsFailed.update(res['failures'])
1033+
if not res['shutdown']:
1034+
next(bar_iter, None)
1035+
next(bar_iter, None) # finalize bar.update(n_jobs)
10101036

10111037
for p in processes:
10121038
p.join()
10131039

1014-
# join results
1015-
while True:
1016-
try:
1017-
res = summary.get(timeout=1)
1018-
except Exception as e:
1019-
break
1020-
done += res['done']
1021-
self.testsFailed.update(res['failures'])
1022-
10231040
endTime = time.time()
10241041

10251042
# Close group after all tests complete (parallel execution)

tests/unit/test_parallel_drain.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Regression test for a hang in the parallel test coordinator.
2+
3+
The parallel coordinator uses a single ``results`` queue carrying one message
4+
per test. Workers push these messages while the coordinator drains them in its
5+
progressbar loop. If the coordinator ever stops draining before workers stop
6+
pushing, large per-test outputs saturate the pipe (~64 KiB on Linux), worker
7+
``put()`` calls block in ``pipe_write``, and ``p.join()`` hangs indefinitely.
8+
9+
This test reproduces the saturation scenario by spawning workers that each
10+
push many large messages, then asserts that a coordinator that drains
11+
continuously throughout the workers' lifetime finishes promptly with every
12+
message accounted for and every worker cleanly exited.
13+
"""
14+
15+
import multiprocessing as mp
16+
import sys
17+
import time
18+
from unittest import TestCase
19+
20+
21+
# ~32 KiB per message × 8 workers × 8 messages = 2 MiB total, well over the
22+
# typical 64 KiB pipe buffer on Linux, so writers will block on ``pipe_write``
23+
# unless the parent is actively reading throughout.
24+
_PAYLOAD_BYTES = 32 * 1024
25+
_NUM_WORKERS = 8
26+
_MSGS_PER_WORKER = 8
27+
_JOIN_TIMEOUT_SECS = 30.0
28+
29+
30+
def _worker_puts_many_results(results, n_msgs, payload_bytes):
31+
# Queue.put is async (via a feeder thread), but at process exit the feeder
32+
# must flush the buffered items to the pipe before the worker can exit. If
33+
# the parent is not draining, that flush blocks forever.
34+
payload = 'x' * payload_bytes
35+
for i in range(n_msgs):
36+
results.put({'test_name': 't%d' % i, 'output': payload,
37+
'done': 1, 'failures': {}})
38+
39+
40+
class TestParallelResultsDrain(TestCase):
41+
42+
def setUp(self):
43+
if sys.platform == 'win32':
44+
self.skipTest('fork start method is unavailable on Windows')
45+
self._ctx = mp.get_context('fork')
46+
self._procs = []
47+
48+
def tearDown(self):
49+
# Safety net: if the test ever hangs despite the fix, make sure the
50+
# pytest session can still exit cleanly.
51+
for p in self._procs:
52+
if p.is_alive():
53+
p.kill()
54+
p.join(timeout=5)
55+
56+
def test_continuous_drain_does_not_hang(self):
57+
results = self._ctx.Queue()
58+
self._procs = [
59+
self._ctx.Process(
60+
target=_worker_puts_many_results,
61+
args=(results, _MSGS_PER_WORKER, _PAYLOAD_BYTES),
62+
)
63+
for _ in range(_NUM_WORKERS)
64+
]
65+
for p in self._procs:
66+
p.start()
67+
68+
# Mimic the coordinator's progressbar loop: drain every per-test
69+
# message live, in the same thread, while workers are still running.
70+
expected = _NUM_WORKERS * _MSGS_PER_WORKER
71+
start = time.monotonic()
72+
collected = [results.get(timeout=_JOIN_TIMEOUT_SECS) for _ in range(expected)]
73+
for p in self._procs:
74+
p.join(timeout=_JOIN_TIMEOUT_SECS)
75+
elapsed = time.monotonic() - start
76+
77+
for p in self._procs:
78+
self.assertFalse(
79+
p.is_alive(),
80+
'worker still alive after join; results pipe likely saturated',
81+
)
82+
self.assertEqual(p.exitcode, 0)
83+
self.assertEqual(len(collected), expected)
84+
# The drain should return well under its own timeout; we only assert a
85+
# loose upper bound to avoid flakiness on slow machines.
86+
self.assertLess(elapsed, _JOIN_TIMEOUT_SECS)

0 commit comments

Comments
 (0)