|
49 | 49 | NodePosition, |
50 | 50 | ) |
51 | 51 | from openarmature.graph import ( |
| 52 | + FailureIsolationMiddleware, |
52 | 53 | RuntimeGraphError, |
53 | 54 | State, |
54 | 55 | ) |
|
68 | 69 | # rather than relying on the test runner's file-glob to filter the |
69 | 70 | # missing fixture out. 067 (crash-injection fan-out resume, proposal |
70 | 71 | # 0070) is a crash/resume fixture this runner owns; it joined at v0.58.0. |
| 72 | +# 069 (fan-out degrade refinements, proposal 0069, v0.59.0) is a mixed |
| 73 | +# fixture: this runner drives its crash_injection/resume case and skips the |
| 74 | +# plain FI-degrade cases (owned by test_pipeline_utilities.py). |
71 | 75 | _CHECKPOINT_FIXTURE_NUMBERS: frozenset[int] = frozenset( |
72 | | - (set(range(24, 32)) - {28}) | set(range(48, 57)) | {67} |
| 76 | + (set(range(24, 32)) - {28}) | set(range(48, 57)) | {67, 69} |
73 | 77 | ) |
74 | 78 |
|
75 | 79 | # Fixtures that need resume-aware test seams the conformance adapter |
@@ -277,12 +281,31 @@ async def test_checkpoint_fixture(fixture_path: Path) -> None: |
277 | 281 | ) |
278 | 282 | spec = _load(fixture_path) |
279 | 283 | if "cases" in spec: |
| 284 | + cases_run = 0 |
280 | 285 | for case in cast("list[dict[str, Any]]", spec["cases"]): |
281 | 286 | case_name = case.get("name", "<unnamed>") |
| 287 | + # This runner drives the checkpoint cases. A mixed fixture (069) |
| 288 | + # interleaves plain FI-degrade cases owned by |
| 289 | + # test_pipeline_utilities.py; skip a case with no checkpoint |
| 290 | + # concern. The marker is checkpointer / resume / crash_injection — |
| 291 | + # NOT resume alone: fixtures like 024 / 026 / 030 / 055 assert |
| 292 | + # checkpoint behavior (saves, record shape, not-found, |
| 293 | + # schema_version) with a checkpointer but no resume. |
| 294 | + if not any(k in case for k in ("checkpointer", "resume", "crash_injection")): |
| 295 | + continue |
| 296 | + cases_run += 1 |
282 | 297 | try: |
283 | 298 | await _run_one_case(case, top_level=spec) |
284 | 299 | except AssertionError as e: |
285 | 300 | raise AssertionError(f"case {case_name!r}: {e}") from e |
| 301 | + # A cases-shaped fixture in this runner's set that drives zero cases |
| 302 | + # (all skipped as non-checkpoint) would pass vacuously; fail loudly |
| 303 | + # instead so a routing mistake surfaces. |
| 304 | + assert cases_run > 0, ( |
| 305 | + f"{fixture_id}: cases-shaped fixture drove zero cases in this runner " |
| 306 | + f"(all skipped as non-checkpoint). Fix the routing or remove it from " |
| 307 | + f"_CHECKPOINT_FIXTURE_NUMBERS." |
| 308 | + ) |
286 | 309 | return |
287 | 310 | await _run_one_case(spec, top_level=spec) |
288 | 311 |
|
@@ -367,6 +390,56 @@ def _find_crash_injection(spec: Mapping[str, Any]) -> tuple[int | None, str | No |
367 | 390 | return None, None, None |
368 | 391 |
|
369 | 392 |
|
| 393 | +def _translate_fi_instance_middleware( |
| 394 | + spec: Mapping[str, Any], |
| 395 | +) -> dict[str, list[FailureIsolationMiddleware]]: |
| 396 | + """Translate a fan-out node's ``instance_middleware: [failure_isolation]`` |
| 397 | + into FailureIsolationMiddleware instances keyed by node name, for |
| 398 | + build_graph's ``fan_out_instance_middleware``. Scoped to the static |
| 399 | + ``degraded_update`` mapping form (the only shape the checkpoint fixtures |
| 400 | + use, e.g. fixture 069 Case 3's degrade-survives-resume); the callable |
| 401 | + forms are owned by test_pipeline_utilities.py, which drives the plain |
| 402 | + FI-degrade cases.""" |
| 403 | + out: dict[str, list[FailureIsolationMiddleware]] = {} |
| 404 | + nodes = cast("dict[str, dict[str, Any]]", spec.get("nodes") or {}) |
| 405 | + for node_name, node_spec in nodes.items(): |
| 406 | + fan_out = node_spec.get("fan_out") |
| 407 | + if not isinstance(fan_out, dict): |
| 408 | + continue |
| 409 | + entries = cast( |
| 410 | + "list[dict[str, Any]]", |
| 411 | + cast("Mapping[str, Any]", fan_out).get("instance_middleware") or [], |
| 412 | + ) |
| 413 | + mws: list[FailureIsolationMiddleware] = [] |
| 414 | + for entry in entries: |
| 415 | + # Only failure_isolation is translated here. Other instance |
| 416 | + # middleware (e.g. fixture 053's retry) is left unwired, as this |
| 417 | + # runner did before — those fixtures drive their behavior via |
| 418 | + # flaky_per_index seams, not a wired middleware. |
| 419 | + if entry.get("type") != "failure_isolation": |
| 420 | + continue |
| 421 | + if "degraded_update" not in entry: |
| 422 | + raise ValueError( |
| 423 | + f"fan-out node {node_name!r}: failure_isolation instance middleware " |
| 424 | + f"entry is missing the required 'degraded_update'" |
| 425 | + ) |
| 426 | + degraded = entry["degraded_update"] |
| 427 | + if not isinstance(degraded, dict): |
| 428 | + raise ValueError( |
| 429 | + f"fan-out node {node_name!r}: checkpoint runner supports only the static " |
| 430 | + f"degraded_update form for instance middleware" |
| 431 | + ) |
| 432 | + mws.append( |
| 433 | + FailureIsolationMiddleware( |
| 434 | + degraded_update=dict(cast("Mapping[str, Any]", degraded)), |
| 435 | + event_name=entry.get("event_name", "degraded"), |
| 436 | + ) |
| 437 | + ) |
| 438 | + if mws: |
| 439 | + out[node_name] = mws |
| 440 | + return out |
| 441 | + |
| 442 | + |
370 | 443 | def _strip_abort_directive(spec: Mapping[str, Any]) -> Mapping[str, Any]: |
371 | 444 | """Return a fresh spec dict with any ``abort_after_instance`` |
372 | 445 | directive removed from fan-out nodes. The engine doesn't recognize |
@@ -421,6 +494,7 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] |
421 | 494 | trace=trace, |
422 | 495 | flaky_per_index_attempt_recorders=flaky_per_index_recorders, |
423 | 496 | instance_execution_recorders=instance_execution_recorders, |
| 497 | + fan_out_instance_middleware=_translate_fi_instance_middleware(sanitized_spec), |
424 | 498 | ) |
425 | 499 | builder = built.builder |
426 | 500 |
|
|
0 commit comments