|
10 | 10 | import uuid |
11 | 11 | from pathlib import Path |
12 | 12 |
|
13 | | -WORKERS = 11 |
| 13 | +WORKERS = int(os.environ.get("BENCH_WORKERS", "11")) |
14 | 14 | RESULTS_DIR = Path("results") |
15 | 15 |
|
16 | 16 | LINES_RE = re.compile(r"^(\d+)-(\d+)$") |
@@ -240,8 +240,10 @@ def parse_lines_field(lines_str: str) -> tuple[int, int] | None: |
240 | 240 | def load_results(path: Path) -> list[dict]: |
241 | 241 | data = json.loads(path.read_text()) |
242 | 242 | if isinstance(data, dict) and "results" in data: |
243 | | - return data["results"] |
244 | | - return data |
| 243 | + return list(data["results"]) |
| 244 | + if isinstance(data, list): |
| 245 | + return data |
| 246 | + raise ValueError(f"unexpected results shape in {path}: {type(data).__name__}") |
245 | 247 |
|
246 | 248 |
|
247 | 249 | def _git_commit_sha() -> str: |
@@ -314,15 +316,24 @@ def _run_serial(worker_fn, run_args: list, collect: str) -> list: |
314 | 316 |
|
315 | 317 | def _run_pool(worker_fn, run_args: list, workers: int, collect: str) -> list: |
316 | 318 | from concurrent.futures import ProcessPoolExecutor, as_completed |
| 319 | + from concurrent.futures.process import BrokenProcessPool |
317 | 320 |
|
| 321 | + batch_size = int(os.environ.get("BENCH_BATCH_SIZE", str(max(workers * 4, 20)))) |
318 | 322 | results: list = [] |
319 | | - with ProcessPoolExecutor(max_workers=workers, initializer=_init_worker) as pool: |
320 | | - futures = {pool.submit(worker_fn, a): a[0] for a in run_args} |
321 | | - for future in as_completed(futures): |
322 | | - try: |
323 | | - _collect_result(results, future.result(), collect) |
324 | | - except Exception as e: |
325 | | - print(f" WORKER CRASH [{futures[future]}]: {type(e).__name__}: {e}", flush=True) |
| 323 | + for batch_start in range(0, len(run_args), batch_size): |
| 324 | + batch = run_args[batch_start : batch_start + batch_size] |
| 325 | + try: |
| 326 | + with ProcessPoolExecutor(max_workers=workers, initializer=_init_worker) as pool: |
| 327 | + futures = {pool.submit(worker_fn, a): a[0] for a in batch} |
| 328 | + for future in as_completed(futures): |
| 329 | + try: |
| 330 | + _collect_result(results, future.result(), collect) |
| 331 | + except BrokenProcessPool as e: |
| 332 | + print(f" WORKER CRASH [{futures[future]}]: {type(e).__name__}", flush=True) |
| 333 | + except Exception as e: |
| 334 | + print(f" WORKER CRASH [{futures[future]}]: {type(e).__name__}: {e}", flush=True) |
| 335 | + except BrokenProcessPool as e: |
| 336 | + print(f" POOL CRASH batch {batch_start}-{batch_start+len(batch)}: {e}", flush=True) |
326 | 337 | return results |
327 | 338 |
|
328 | 339 |
|
|
0 commit comments