|
26 | 26 | _utcnow_iso, |
27 | 27 | ExecutionPlan, |
28 | 28 | ) |
| 29 | +from ..providers.base import is_terminal_billing_error |
29 | 30 | from ..prompts.test_b_bundles import ( |
30 | 31 | TEST_B_BUNDLE_CONDITIONS, |
31 | 32 | build_test_b_bundle_messages, |
@@ -118,14 +119,32 @@ def _log(msg: str) -> None: |
118 | 119 | sys.stderr.write(msg + "\n") |
119 | 120 | sys.stderr.flush() |
120 | 121 |
|
121 | | - counts = {"ok": 0, "error": 0, "skipped_resumed": len(already_done)} |
| 122 | + counts = { |
| 123 | + "ok": 0, |
| 124 | + "error": 0, |
| 125 | + "skipped_resumed": len(already_done), |
| 126 | + "skipped_terminal_abort": 0, |
| 127 | + } |
122 | 128 | latencies: list[int] = [] |
123 | 129 | by_condition: dict[str, int] = {} |
124 | 130 | by_phase: dict[str, int] = {} |
125 | 131 | by_bundle: dict[str, int] = {} |
126 | 132 | tokens = {"input": 0, "output": 0} |
127 | 133 |
|
| 134 | + # Set once a terminal billing / spend-cap condition is observed. Every |
| 135 | + # subsequent call to the same provider is guaranteed to fail with the |
| 136 | + # same error, so we stop dispatching new work and record the rest as |
| 137 | + # ``skipped_terminal_abort`` instead of burning the full 120-min job. |
| 138 | + abort_reason: dict[str, Any] = {} |
| 139 | + abort_lock = threading.Lock() |
| 140 | + |
128 | 141 | def _do_call(call: dict[str, Any]) -> None: |
| 142 | + if abort_reason: |
| 143 | + # A previous call already tripped the terminal-billing trap. |
| 144 | + # Skip without contacting the provider; the batch loop will |
| 145 | + # break out cleanly after the current batch drains. |
| 146 | + counts["skipped_terminal_abort"] += 1 |
| 147 | + return |
129 | 148 | t_start = _utcnow_iso() |
130 | 149 | with progress_lock: |
131 | 150 | progress["started"] += 1 |
@@ -187,6 +206,22 @@ def _do_call(call: dict[str, Any]) -> None: |
187 | 206 | f"attempts={retried_attempts} " |
188 | 207 | f"wall_ms={dt_ms}" |
189 | 208 | ) |
| 209 | + if final_class == "terminal_billing": |
| 210 | + with abort_lock: |
| 211 | + if not abort_reason: |
| 212 | + last_err = attempts[-1] if attempts else {} |
| 213 | + abort_reason.update({ |
| 214 | + "first_run_id": call["run_id"], |
| 215 | + "error_type": last_err.get("error_type"), |
| 216 | + "error": last_err.get("error"), |
| 217 | + }) |
| 218 | + _log( |
| 219 | + "[bb] TERMINAL-BILLING tripped: provider has " |
| 220 | + "hit a hard billing/spend cap. Aborting the " |
| 221 | + "run; remaining calls will be recorded as " |
| 222 | + f"skipped_terminal_abort. run_id={call['run_id']} " |
| 223 | + f"err={last_err.get('error')}" |
| 224 | + ) |
190 | 225 | return |
191 | 226 | raw_writer.write({ |
192 | 227 | "run_id": call["run_id"], |
@@ -286,6 +321,17 @@ def _record_wall_timeout(c: dict[str, Any]) -> None: |
286 | 321 |
|
287 | 322 | try: |
288 | 323 | for batch_start in range(0, len(pending), plan.batch_size): |
| 324 | + if abort_reason: |
| 325 | + # A prior batch tripped the terminal-billing trap. Stop |
| 326 | + # dispatching new work entirely; the remaining ``pending`` |
| 327 | + # are intentionally not contacted. |
| 328 | + _log( |
| 329 | + f"[bb] abort: skipping remaining " |
| 330 | + f"{len(pending) - batch_start} call(s) after " |
| 331 | + f"terminal_billing first observed in " |
| 332 | + f"run_id={abort_reason.get('first_run_id')}" |
| 333 | + ) |
| 334 | + break |
289 | 335 | batch = pending[batch_start:batch_start + plan.batch_size] |
290 | 336 | if plan.concurrency <= 1: |
291 | 337 | for call in batch: |
@@ -357,6 +403,8 @@ def _worker() -> None: |
357 | 403 | "test": "test_b_bundles", |
358 | 404 | "counts": counts, |
359 | 405 | "total_attempted": len(pending), |
| 406 | + "aborted": bool(abort_reason), |
| 407 | + "abort_reason": dict(abort_reason) if abort_reason else None, |
360 | 408 | "by_condition": by_condition, |
361 | 409 | "by_phase": by_phase, |
362 | 410 | "by_bundle": by_bundle, |
@@ -402,6 +450,8 @@ def _worker() -> None: |
402 | 450 | "fixtures": fixtures_manifest, |
403 | 451 | "repo_commit": repo_commit, |
404 | 452 | "counts": counts, |
| 453 | + "aborted": bool(abort_reason), |
| 454 | + "abort_reason": dict(abort_reason) if abort_reason else None, |
405 | 455 | "n_run_specs_total": len(pending) + counts["skipped_resumed"], |
406 | 456 | "n_already_done_on_start": counts["skipped_resumed"], |
407 | 457 | "conditions": list(conditions), |
|
0 commit comments