Skip to content

Commit 149d953

Browse files
committed
fix(bench): replace ProcessPoolExecutor with pebble.ProcessPool
1 parent a524630 commit 149d953

2 files changed

Lines changed: 66 additions & 102 deletions

File tree

benchmarks/adapters/calibrate.py

Lines changed: 65 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,9 @@ def evaluate_grid_cached( # noqa: C901 — pool teardown + per-cell demux + ret
169169
layout produced by `evaluate_grid` so the existing aggregator,
170170
`top_k_trials`, and `render_grid_report` work unchanged.
171171
"""
172-
import multiprocessing as mp
173-
from concurrent.futures import ProcessPoolExecutor, as_completed
174-
from concurrent.futures.process import BrokenProcessPool
172+
from concurrent.futures import TimeoutError as FuturesTimeoutError
173+
174+
from pebble import ProcessExpired, ProcessPool
175175

176176
from benchmarks.adapters.runner import (
177177
_load_existing_results,
@@ -202,115 +202,78 @@ def evaluate_grid_cached( # noqa: C901 — pool teardown + per-cell demux + ret
202202

203203
from benchmarks.common import _init_worker
204204

205-
def _make_pool() -> ProcessPoolExecutor:
206-
ctx = mp.get_context("spawn")
207-
p = ProcessPoolExecutor(
208-
max_workers=workers,
209-
mp_context=ctx,
210-
max_tasks_per_child=40,
211-
initializer=_init_worker,
212-
)
213-
list(p.map(int, range(workers)))
214-
return p
215-
216205
def _record_per_cell(per_cell_results: list[tuple[RunParams, EvalResult]]) -> None:
217206
for params, result in per_cell_results:
218207
lbl = params.label()
219208
ckpt = ckpts.get(lbl)
220209
if ckpt is not None:
221-
status = (result.extra or {}).get("status", "")
222-
err = str((result.extra or {}).get("error", ""))
223-
# Persist timeouts even though they surface via BPP — they
224-
# are deterministic per instance and must not retry forever.
225-
if status == "timeout" or "BrokenProcessPool" not in err:
226-
append_checkpoint(ckpt, result)
210+
append_checkpoint(ckpt, result)
227211
results_by_cell[lbl].append(result)
228212

229-
def _drain(pool: ProcessPoolExecutor) -> None:
230-
import time as _time
231-
213+
def _drain(pool: ProcessPool) -> None:
214+
# Pebble enforces per-task timeout via SIGTERM/SIGKILL on the
215+
# individual worker, then spawns a replacement — the pool itself
216+
# stays healthy. This replaces the previous threading.Timer +
217+
# os._exit(137) approach that permanently broke
218+
# ProcessPoolExecutor and cascaded one slow instance into pool-
219+
# wide failure (every queued task got BPP'd before it ran).
232220
futures: dict = {}
233-
submit_times: dict[str, float] = {}
234-
submit_failed: list[tuple[BenchmarkInstance, list[RunParams]]] = []
235221
for inst, params_list in pending:
236-
for attempt in range(3):
237-
try:
238-
submit_times[inst.instance_id] = _time.monotonic()
239-
futures[pool.submit(eval_all_cells_fn, inst, params_list)] = (inst, params_list)
240-
break
241-
except BrokenProcessPool:
242-
# The kill switch from a previous task left a worker
243-
# mid-respawn; ProcessPoolExecutor itself spawns a
244-
# replacement on the next submit. Brief sleep + retry
245-
# avoids forcing a full pool rebuild for one dead worker.
246-
if attempt == 2:
247-
submit_failed.append((inst, params_list))
248-
break
249-
_time.sleep(0.1)
250-
outer_deadline = _time.monotonic() + timeout_per_instance * len(points) * max(1, (len(pending) + workers - 1) // workers)
251-
completed: set[str] = set()
252-
try:
253-
for future in as_completed(
254-
futures,
255-
timeout=max(0.0, outer_deadline - _time.monotonic()),
256-
):
257-
inst, params_list = futures[future]
258-
try:
259-
per_cell = future.result(timeout=0)
260-
except BrokenProcessPool:
261-
# Worker death (typically from our os._exit kill switch).
262-
# Treat elapsed-near-deadline as timeout so a pathological
263-
# instance is checkpointed and not retried forever.
264-
# Shorter elapsed => transient (submitted just before
265-
# an earlier task killed the worker); record as error.
266-
elapsed = _time.monotonic() - submit_times.get(inst.instance_id, 0.0)
267-
if elapsed >= timeout_per_instance * 0.9:
268-
per_cell = [
269-
(
270-
p,
271-
_failure_eval(
272-
inst,
273-
p,
274-
"timeout",
275-
f"killed after {timeout_per_instance}s (elapsed {elapsed:.1f}s)",
276-
),
277-
)
278-
for p in params_list
279-
]
280-
else:
281-
per_cell = [(p, _failure_eval(inst, p, "error", "BrokenProcessPool: worker died")) for p in params_list]
282-
except Exception as e:
283-
per_cell = [(p, _failure_eval(inst, p, "error", f"{type(e).__name__}: {e}")) for p in params_list]
284-
completed.add(inst.instance_id)
285-
_record_per_cell(per_cell)
286-
except BrokenProcessPool:
287-
# Self-healing pool — log nothing here; per-future BPP handler
288-
# above already recorded the dead instance, and ProcessPoolExecutor
289-
# spawns replacement workers on the next submission cycle.
290-
pass
291-
for inst, params_list in submit_failed:
292-
_record_per_cell([(p, _failure_eval(inst, p, "error", "BrokenProcessPool: submit failed")) for p in params_list])
293-
294-
pool: ProcessPoolExecutor | None = _make_pool() if workers > 1 else None
295-
try:
296-
if pending and pool is not None:
297-
# Single drain pass; ProcessPoolExecutor self-heals dead
298-
# workers via spawn-on-next-submit, so we no longer rebuild
299-
# the whole pool every time the kill switch fires (that
300-
# rebuild was 30-60s of dead time per BPP and turned every
301-
# legitimate per-instance timeout into a cell-wide cascade).
222+
future = pool.schedule(
223+
eval_all_cells_fn,
224+
args=(inst, params_list),
225+
# Pebble's timeout is wall-clock for the whole task. The
226+
# task encompasses ensure_repo + apply_as_commit + heavy
227+
# diffctx + N selections; budget generously since git
228+
# ops on big repos (vscode, mui) eat 10-30s of pure I/O.
229+
timeout=timeout_per_instance + 30.0,
230+
)
231+
futures[future] = (inst, params_list)
232+
233+
for future, (inst, params_list) in futures.items():
234+
try:
235+
per_cell = future.result()
236+
except FuturesTimeoutError:
237+
# Pebble's overall task deadline (rare safety net — covers
238+
# genuinely hung git ops past timeout_per_instance + 30s).
239+
per_cell = [
240+
(p, _failure_eval(inst, p, "timeout", f"pebble killed after {timeout_per_instance + 30.0:.0f}s"))
241+
for p in params_list
242+
]
243+
except ProcessExpired as e:
244+
# exitcode 137 == os._exit(137) from our narrow algorithm
245+
# kill switch (timer in diffctx_eval_fn around build_diff_context).
246+
# Anything else is a genuine crash.
247+
if e.exitcode == 137:
248+
per_cell = [
249+
(p, _failure_eval(inst, p, "timeout", f"diffctx exceeded {timeout_per_instance:.0f}s budget"))
250+
for p in params_list
251+
]
252+
else:
253+
per_cell = [
254+
(p, _failure_eval(inst, p, "error", f"ProcessExpired exitcode={e.exitcode}")) for p in params_list
255+
]
256+
except Exception as e:
257+
per_cell = [(p, _failure_eval(inst, p, "error", f"{type(e).__name__}: {e}")) for p in params_list]
258+
_record_per_cell(per_cell)
259+
260+
if pending and workers > 1:
261+
# Pebble pool tolerates per-task SIGKILL on timeout — replacement
262+
# workers spawn automatically, the pool itself never breaks.
263+
with ProcessPool(
264+
max_workers=workers,
265+
max_tasks=40,
266+
initializer=_init_worker,
267+
) as pool:
302268
_drain(pool)
303-
elif pending and pool is None:
304-
# workers == 1: serial fallback
305-
for inst, params_list in pending:
306-
try:
307-
per_cell = eval_all_cells_fn(inst, params_list)
308-
except Exception as e:
309-
per_cell = [(p, _failure_eval(inst, p, "error", f"{type(e).__name__}: {e}")) for p in params_list]
310-
_record_per_cell(per_cell)
311-
finally:
312-
if pool is not None:
313-
pool.shutdown(wait=False, cancel_futures=True)
269+
elif pending:
270+
# workers == 1: serial fallback (no timeout enforcement)
271+
for inst, params_list in pending:
272+
try:
273+
per_cell = eval_all_cells_fn(inst, params_list)
274+
except Exception as e:
275+
per_cell = [(p, _failure_eval(inst, p, "error", f"{type(e).__name__}: {e}")) for p in params_list]
276+
_record_per_cell(per_cell)
314277

315278
out: list[TrialResult] = []
316279
for i, params in enumerate(points):

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ optional-dependencies.dev = [
6969
"mistune>=3.0,<4.0",
7070
"mutmut>=3.5,<4.0",
7171
"mypy>=1.0,<2.0",
72+
"pebble>=5.0,<6.0",
7273
"pre-commit>=3.0,<5.0",
7374
"pygit2>=1.12,<2.0",
7475
"pyinstaller>=5.0,<7.0",

0 commit comments

Comments
 (0)