Skip to content

Commit a524630

Browse files
committed
fix(bench): self-heal pool on dead worker, drop forced rebuild
1 parent c94c0a7 commit a524630

2 files changed

Lines changed: 51 additions & 50 deletions

File tree

benchmarks/adapters/calibrate.py

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -232,16 +232,21 @@ def _drain(pool: ProcessPoolExecutor) -> None:
232232
futures: dict = {}
233233
submit_times: dict[str, float] = {}
234234
submit_failed: list[tuple[BenchmarkInstance, list[RunParams]]] = []
235-
pool_broken = False
236235
for inst, params_list in pending:
237-
try:
238-
submit_times[inst.instance_id] = _time.monotonic()
239-
futures[pool.submit(eval_all_cells_fn, inst, params_list)] = (inst, params_list)
240-
except BrokenProcessPool:
241-
idx = pending.index((inst, params_list))
242-
submit_failed.extend(pending[idx:])
243-
pool_broken = True
244-
break
236+
for attempt in range(3):
237+
try:
238+
submit_times[inst.instance_id] = _time.monotonic()
239+
futures[pool.submit(eval_all_cells_fn, inst, params_list)] = (inst, params_list)
240+
break
241+
except BrokenProcessPool:
242+
# The kill switch from a previous task left a worker
243+
# mid-respawn; ProcessPoolExecutor itself spawns a
244+
# replacement on the next submit. Brief sleep + retry
245+
# avoids forcing a full pool rebuild for one dead worker.
246+
if attempt == 2:
247+
submit_failed.append((inst, params_list))
248+
break
249+
_time.sleep(0.1)
245250
outer_deadline = _time.monotonic() + timeout_per_instance * len(points) * max(1, (len(pending) + workers - 1) // workers)
246251
completed: set[str] = set()
247252
try:
@@ -253,6 +258,11 @@ def _drain(pool: ProcessPoolExecutor) -> None:
253258
try:
254259
per_cell = future.result(timeout=0)
255260
except BrokenProcessPool:
261+
# Worker death (typically from our os._exit kill switch).
262+
# Treat elapsed-near-deadline as timeout so a pathological
263+
# instance is checkpointed and not retried forever.
264+
# Shorter elapsed => transient (submitted just before
265+
# an earlier task killed the worker); record as error.
256266
elapsed = _time.monotonic() - submit_times.get(inst.instance_id, 0.0)
257267
if elapsed >= timeout_per_instance * 0.9:
258268
per_cell = [
@@ -268,41 +278,28 @@ def _drain(pool: ProcessPoolExecutor) -> None:
268278
for p in params_list
269279
]
270280
else:
271-
pool_broken = True
272281
per_cell = [(p, _failure_eval(inst, p, "error", "BrokenProcessPool: worker died")) for p in params_list]
273282
except Exception as e:
274283
per_cell = [(p, _failure_eval(inst, p, "error", f"{type(e).__name__}: {e}")) for p in params_list]
275284
completed.add(inst.instance_id)
276285
_record_per_cell(per_cell)
277286
except BrokenProcessPool:
278-
pool_broken = True
287+
# Self-healing pool — log nothing here; per-future BPP handler
288+
# above already recorded the dead instance, and ProcessPoolExecutor
289+
# spawns replacement workers on the next submission cycle.
290+
pass
279291
for inst, params_list in submit_failed:
280292
_record_per_cell([(p, _failure_eval(inst, p, "error", "BrokenProcessPool: submit failed")) for p in params_list])
281-
if pool_broken:
282-
raise BrokenProcessPool("pool degraded mid-grid")
283293

284294
pool: ProcessPoolExecutor | None = _make_pool() if workers > 1 else None
285295
try:
286296
if pending and pool is not None:
287-
while True:
288-
try:
289-
_drain(pool)
290-
break
291-
except BrokenProcessPool:
292-
try:
293-
pool.shutdown(wait=False, cancel_futures=True)
294-
except Exception:
295-
pass
296-
pool = _make_pool()
297-
# Recompute pending for the rebuild from current
298-
# checkpoint state — instances completed since last
299-
# rebuild should be skipped.
300-
done_ids_now = {lbl: read_checkpoint(c) if c is not None else set() for lbl, c in ckpts.items()}
301-
pending[:] = [
302-
(inst, [p for p in points if inst.instance_id not in done_ids_now[p.label()]])
303-
for inst, _ in pending
304-
if any(inst.instance_id not in done_ids_now[p.label()] for p in points)
305-
]
297+
# Single drain pass; ProcessPoolExecutor self-heals dead
298+
# workers via spawn-on-next-submit, so we no longer rebuild
299+
# the whole pool every time the kill switch fires (that
300+
# rebuild was 30-60s of dead time per BPP and turned every
301+
# legitimate per-instance timeout into a cell-wide cascade).
302+
_drain(pool)
306303
elif pending and pool is None:
307304
# workers == 1: serial fallback
308305
for inst, params_list in pending:

benchmarks/adapters/runner.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,21 @@ def _drain(active_pool: ProcessPoolExecutor) -> None: # noqa: C901
237237
futures: dict = {}
238238
submit_times: dict[str, float] = {}
239239
submit_failed: list[BenchmarkInstance] = []
240-
pool_broken = False
241240
for inst in pending:
242-
try:
243-
submit_times[inst.instance_id] = time.monotonic()
244-
futures[active_pool.submit(eval_fn, inst, params)] = inst
245-
except BrokenProcessPool:
246-
submit_failed.extend([inst, *pending[pending.index(inst) + 1 :]])
247-
pool_broken = True
248-
break
241+
for attempt in range(3):
242+
try:
243+
submit_times[inst.instance_id] = time.monotonic()
244+
futures[active_pool.submit(eval_fn, inst, params)] = inst
245+
break
246+
except BrokenProcessPool:
247+
# A previous task's kill switch left the worker
248+
# mid-respawn; ProcessPoolExecutor spawns a
249+
# replacement on the next submit cycle. Brief sleep +
250+
# retry beats forcing a full pool rebuild.
251+
if attempt == 2:
252+
submit_failed.append(inst)
253+
break
254+
time.sleep(0.1)
249255
outer_deadline = time.monotonic() + timeout_per_instance * max(1, (len(pending) + workers - 1) // workers)
250256
completed: set[str] = set()
251257
try:
@@ -258,12 +264,12 @@ def _drain(active_pool: ProcessPoolExecutor) -> None: # noqa: C901
258264
except BrokenProcessPool:
259265
# The kill switch arms `os._exit(137)` at the deadline,
260266
# which surfaces here as BrokenProcessPool. Distinguish
261-
# timeout-induced death from a genuine pool crash by
262-
# elapsed wall-clock — within 90% of the deadline the
263-
# likely cause is our timer. Persist as "timeout" (not
264-
# "error") so the checkpoint records it and a retry
265-
# loop does not re-evaluate the same pathological
266-
# instance forever.
267+
# timeout-induced death from a transient pool failure
268+
# by elapsed wall-clock — within 90% of the deadline
269+
# the likely cause is our timer; persist as "timeout"
270+
# so the checkpoint records it. Otherwise mark as a
271+
# transient error (NOT persisted) so the resume loop
272+
# gets another shot.
267273
elapsed = time.monotonic() - submit_times.get(inst.instance_id, 0.0)
268274
if elapsed >= timeout_per_instance * 0.9:
269275
r = _failure_result(
@@ -274,7 +280,6 @@ def _drain(active_pool: ProcessPoolExecutor) -> None: # noqa: C901
274280
)
275281
else:
276282
r = _failure_result(inst, params, "error", "BrokenProcessPool: worker died")
277-
pool_broken = True
278283
except Exception as e:
279284
r = _failure_result(inst, params, "error", f"{type(e).__name__}: {e}")
280285
completed.add(inst.instance_id)
@@ -284,14 +289,13 @@ def _drain(active_pool: ProcessPoolExecutor) -> None: # noqa: C901
284289
if inst.instance_id not in completed:
285290
record(_failure_result(inst, params, "timeout", "exceeded global deadline"))
286291
except BrokenProcessPool:
287-
pool_broken = True
292+
# Self-healing pool — replacement workers spawn on next submit;
293+
# per-future BPP handler above already recorded dead instances.
288294
for inst in futures.values():
289295
if inst.instance_id not in completed:
290296
record(_failure_result(inst, params, "error", "BrokenProcessPool: worker died"))
291297
for inst in submit_failed:
292298
record(_failure_result(inst, params, "error", "BrokenProcessPool: submit failed"))
293-
if pool_broken:
294-
raise BrokenProcessPool("pool degraded mid-cell")
295299

296300
if pool is not None:
297301
_drain(pool) # type: ignore[arg-type]

0 commit comments

Comments
 (0)