From 17cfaacdac2edd4ed21b869cd571c86485ed6c48 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 17 Jun 2026 13:03:38 -0700 Subject: [PATCH 1/2] Add adapter crash-injection and mock cause (0070) Implement proposal 0070's two conformance-adapter capabilities: a `crash_injection` directive (after_fan_out_instance / after_node) that simulates a checkpoint-boundary crash with no asserted first-run outcome, and a recursive mock `cause` that chains a flaky failure to an originating error. Wire conformance fixtures 067 (crash-injection fan-out resume) and 068 (failure-mock cause chain). A general per-instance execution recorder lets plain-node fan-outs report executed / skipped on resume, consulted as a fallback so the existing flaky_per_index fixtures are unchanged. The after_node boundary has a unit test (no fixture exercises it). Advance the pinned spec to v0.58.0; test-vocabulary only, no library behavior change. --- CHANGELOG.md | 2 +- conformance.toml | 17 +- openarmature-spec | 2 +- pyproject.toml | 2 +- src/openarmature/AGENTS.md | 4 +- src/openarmature/__init__.py | 2 +- tests/conformance/adapter.py | 67 +++++- tests/conformance/test_checkpoint.py | 241 ++++++++++++++----- tests/conformance/test_pipeline_utilities.py | 13 +- tests/test_smoke.py | 2 +- 10 files changed, 278 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83f810a..edea648 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The - **Observer privacy flag `disable_llm_payload` renamed to `disable_provider_payload`** (proposal 0059, observability §5.5.4, spec v0.54.0). The observer-level flag on both bundled observers (`OTelObserver` and `LangfuseObserver`) is renamed, and its scope broadens from LLM-completion payload to any provider-call payload (LLM completion today; embedding and rerank when those land). This is a breaking change to both observer constructors: config passing `disable_llm_payload=True` (or `False`) updates to `disable_provider_payload=...` with no other change. The default stays `True` (payload suppressed), and the gating behavior for `LlmCompletionEvent` / `LlmFailedEvent` rendering is unchanged at every existing site. The rename is the only part of proposal 0059 adopted this cycle: the retrieval-provider capability itself (the `EmbeddingProvider` protocol, the `EmbeddingEvent` / `EmbeddingFailedEvent` typed variants, and the embedding span / observation mapping) is not yet implemented and rides as `not-yet` in `conformance.toml`. The §5.5.4 rename touches existing LLM-payload gating, so it lands with the pin. - **Fan-out failure-isolation degrade contribution implemented** (proposal 0066, pipeline-utilities §9.3 / §9.8 / §11.7, spec v0.56.0). When `FailureIsolationMiddleware` degrades a fan-out instance, that instance is a success whose contribution is its `degraded_update`, read in subgraph-field-name space and never merged onto the failed instance's pre-failure state. This also fixes a latent bug: an instance `degraded_update`'s `extra_outputs` values were previously looked up by the parent field name and silently dropped (`collect_field` was unaffected). A static `degraded_update` that omits the node's `collect_field` is now a compile-time error (`FanOutDegradedUpdateMissingCollectField`); a callable `degraded_update` that omits it yields a graceful null slot rather than raising, preserving one collection slot per item. The parallel-branches counterpart (a branch `degraded_update` omitting a projected `outputs` field skips that field) was already correct as of the parallel-branches fix above and is now pinned by fixture 065. Success-path and resume behavior for correctly-configured fan-outs is unchanged. - **Failure-isolation events carry the full structured cause chain** (proposal 0068, pipeline-utilities §6.3, spec v0.57.0). `FailureIsolatedEvent.caught_exception` gains a `chain`: an ordered list of `CauseLink` records (each carrying `category`, `message`, and a `carrier` flag), from the caught exception (outermost) to the originating raise (innermost), with graph-engine `node_exception` carrier wrappers flagged `carrier=True`. The existing `category` and `message` are retained and redefined as a derivation over the chain: the category of the outermost non-carrier link whose category is a non-empty string (else `category` is `null` and `message` is the outermost non-carrier link's message). This supersedes proposal 0065's single "originating cause" representation, which was ambiguous once the post-carrier chain held more than one non-carrier link; the derivation reproduces 0065's single-carrier values, so fixture 064 is unchanged. A new `CauseLink` type is exported from `openarmature.graph`. The bundled OTel and Langfuse observers continue to render the derived `category`; surfacing the full chain is left to custom observers. The change is additive to the event shape, and catch/degrade behavior is unchanged. Conformance fixture 066 (three cases: an instance-site carrier chain, a node-level single non-carrier link, and an uncategorized null-category cause) passes. -- **Pinned spec advances v0.53.0 → v0.57.0 across the v0.14.0 cycle**, in four steps: v0.54.0 (proposal 0059, the observer-flag rename above), v0.55.1 (proposal 0065 above; the v0.55.1 patch also carries an observability §11 span-links text reconciliation that narrows an *Out of scope* bullet, with no python-observable change), v0.56.0 (proposal 0066, the fan-out degrade contribution above), and v0.57.0 (proposal 0068, the failure-isolation cause chain above). `conformance.toml` records 0065, 0066, and 0068 as `implemented` and 0059 as `not-yet` (only its cross-spec flag rename was adopted). +- **Pinned spec advances v0.53.0 → v0.58.0 across the v0.14.0 cycle**, in five steps: v0.54.0 (proposal 0059, the observer-flag rename above), v0.55.1 (proposal 0065 above; the v0.55.1 patch also carries an observability §11 span-links text reconciliation that narrows an *Out of scope* bullet, with no python-observable change), v0.56.0 (proposal 0066, the fan-out degrade contribution above), v0.57.0 (proposal 0068, the failure-isolation cause chain above), and v0.58.0 (proposal 0070, conformance-adapter crash-injection and cause-chaining test vocabulary: a `crash_injection` directive and a recursive mock `cause`, with conformance fixtures 067 and 068, no library behavior change). `conformance.toml` records 0065, 0066, 0068, and 0070 as `implemented` and 0059 as `not-yet` (only its cross-spec flag rename was adopted). ### Fixed diff --git a/conformance.toml b/conformance.toml index 1b02c5d..be63bf1 100644 --- a/conformance.toml +++ b/conformance.toml @@ -32,7 +32,7 @@ [manifest] implementation = "openarmature-python" -spec_pin = "v0.57.0" +spec_pin = "v0.58.0" # Status values: # implemented — shipped behavior matches the proposal's contract @@ -648,3 +648,18 @@ since = "0.14.0" [proposals."0068"] status = "implemented" since = "0.14.0" + +# Spec v0.58.0 (proposal 0070). Conformance-adapter crash/resume vocabulary, +# crash-injection, and cause-chaining (conformance-adapter §5.1 / §5.6 / §5.8). +# Two new adapter capabilities: ``crash_injection`` (``after_fan_out_instance`` +# + ``after_node``) simulates a crash at a checkpoint boundary independent of +# an instance failure, and a recursive mock ``cause`` chains a failure mock's +# raised error to an originating cause. The crash/resume + saved-record + +# resume-outcome directives the proposal formalizes were already implemented. +# Fixture 067 (crash-injection fan-out resume) drives after_fan_out_instance +# end-to-end; after_node has a unit test (no fixture exercises it). Fixture +# 068 (failure-mock cause chain) pins 0068's outermost-wins derivation via the +# mock ``cause``. +[proposals."0070"] +status = "implemented" +since = "0.14.0" diff --git a/openarmature-spec b/openarmature-spec index f14d615..e9b2bcc 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit f14d6158584999318a351358909e3b96e8addece +Subproject commit e9b2bcc0ba6906fa441a6411973b2b7bef0f7152 diff --git a/pyproject.toml b/pyproject.toml index 9bf65d3..5af8dc7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec" openarmature = "openarmature.cli:main" [tool.openarmature] -spec_version = "0.57.0" +spec_version = "0.58.0" [dependency-groups] dev = [ diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index 80c0079..8d8ad7e 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1,6 +1,6 @@ # OpenArmature — Agent documentation -*This is the agent guide bundled with the openarmature Python package, version 0.13.0 (spec v0.57.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* +*This is the agent guide bundled with the openarmature Python package, version 0.13.0 (spec v0.58.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* ## TL;DR @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents: ## Capability contracts -_Sourced from openarmature-spec v0.57.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ +_Sourced from openarmature-spec v0.58.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ ### Capability: `graph-engine` diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index 4498c6d..6e48503 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -25,7 +25,7 @@ """ __version__ = "0.13.0" -__spec_version__ = "0.57.0" +__spec_version__ = "0.58.0" # Proposal 0052 (spec observability §5.1 / §8.4.1): canonical # package-registry name for this implementation. Surfaces on every # OTel invocation span as ``openarmature.implementation.name`` and on diff --git a/tests/conformance/adapter.py b/tests/conformance/adapter.py index 787bd2e..cb83258 100644 --- a/tests/conformance/adapter.py +++ b/tests/conformance/adapter.py @@ -210,6 +210,31 @@ def __init__(self, message: str, category: str) -> None: self.category = category +# Conformance-adapter §5.1 ``cause`` (proposal 0070): a failure mock's raised +# error MAY chain to an originating cause, recursively, so a consumer walking +# the cause chain (pipeline-utilities §6.3 failure isolation) observes each +# link's category / message. +def _build_mock_cause(cause_spec: Mapping[str, Any] | None) -> Exception | None: + """Build the chained originating exception from a failure mock's ``cause`` + directive. ``cause: {category, message, cause: {...}}`` nests recursively; + each link becomes a ``_CategorizedException`` (or a bare ``Exception`` when + its category is null) linked via ``__cause__``. Returns ``None`` when no + cause is configured.""" + if cause_spec is None: + return None + inner = _build_mock_cause(cause_spec.get("cause")) + message = str(cause_spec.get("message", "")) + category = cause_spec.get("category") + exc: Exception = ( + _CategorizedException(message=message, category=category) + if isinstance(category, str) and category + else Exception(message) + ) + if inner is not None: + exc.__cause__ = inner + return exc + + def _make_pure_update_fn( node_name: str, update: Mapping[str, Any], @@ -512,10 +537,16 @@ async def fn(_state: Any) -> Mapping[str, Any]: entry = sequence[idx] if entry is None: return copy.deepcopy(success_update) - raise _CategorizedException( + # An entry MAY carry a recursive ``cause`` (proposal 0070 §5.1) + # that chains the raised error to an originating cause. + cause_exc = _build_mock_cause(entry.get("cause")) + exc = _CategorizedException( message=entry.get("message", "flaky"), category=entry.get("category", "provider_unavailable"), ) + if cause_exc is not None: + raise exc from cause_exc + raise exc return copy.deepcopy(success_update) return fn @@ -538,6 +569,32 @@ async def fn_with_sleep(state: Any) -> Mapping[str, Any]: return fn_with_sleep +def _wrap_with_execution_recorder( + fn: Callable[[Any], Awaitable[Mapping[str, Any]]], + node_name: str, + recorders: dict[str, dict[int, list[int]]], +) -> Callable[[Any], Awaitable[Mapping[str, Any]]]: + """Wrap a node body so that, when it runs inside a fan-out instance, it + records the executing instance's ``current_fan_out_index()`` into + ``recorders`` (keyed by node name then index). Lets the checkpoint resume + driver tell which fan-out instances executed vs. rolled forward for a + plain-node fan-out (e.g. the crash_injection fixture 067), where no + ``flaky_per_index`` body records execution. Records at body entry, so an + instance whose body ran counts as executed even if it then fails.""" + from openarmature.observability.correlation import ( # noqa: PLC0415 + current_attempt_index, + current_fan_out_index, + ) + + async def fn_recording(state: Any) -> Mapping[str, Any]: + idx = current_fan_out_index() + if idx is not None: + recorders.setdefault(node_name, {}).setdefault(idx, []).append(current_attempt_index()) + return await fn(state) + + return fn_recording + + @dataclass(frozen=True) class _TracingFanOutNode(FanOutNode[State, State]): """Conformance helper: a FanOutNode that appends its name to a shared @@ -658,6 +715,7 @@ def build_graph( fan_out_instance_middleware: Mapping[str, Sequence[Any]] | None = None, parallel_branches_branch_middleware: Mapping[str, Mapping[str, Sequence[Any]]] | None = None, flaky_per_index_attempt_recorders: dict[str, dict[int, list[int]]] | None = None, + instance_execution_recorders: dict[str, dict[int, list[int]]] | None = None, ) -> BuiltGraph: """Translate a graph-shaped fixture block into a `BuiltGraph`. @@ -767,6 +825,13 @@ def build_graph( if sleep_ms is not None: body = _wrap_with_sleep(body, int(sleep_ms)) + # Record per-instance execution for plain-node fan-outs so the + # checkpoint resume driver can tell executed from rolled-forward + # instances. flaky_per_index records its own per-instance attempts, + # so it is skipped here; this covers the rest. + if instance_execution_recorders is not None and "flaky_per_index" not in node_spec: + body = _wrap_with_execution_recorder(body, node_name, instance_execution_recorders) + builder.add_node(node_name, body, middleware=per_node_mw) for edge_spec in spec.get("edges", []): diff --git a/tests/conformance/test_checkpoint.py b/tests/conformance/test_checkpoint.py index e2868ac..99f0a46 100644 --- a/tests/conformance/test_checkpoint.py +++ b/tests/conformance/test_checkpoint.py @@ -46,6 +46,7 @@ FanOutInternalSaveBatching, FanOutProgress, InMemoryCheckpointer, + NodePosition, ) from openarmature.graph import ( RuntimeGraphError, @@ -65,8 +66,11 @@ # atomic-restart) was REMOVED in spec v0.18.0 when proposal 0009 # superseded its contract, so it is explicitly excluded from the set # rather than relying on the test runner's file-glob to filter the -# missing fixture out. -_CHECKPOINT_FIXTURE_NUMBERS: frozenset[int] = frozenset((set(range(24, 32)) - {28}) | set(range(48, 57))) +# missing fixture out. 067 (crash-injection fan-out resume, proposal +# 0070) is a crash/resume fixture this runner owns; it joined at v0.58.0. +_CHECKPOINT_FIXTURE_NUMBERS: frozenset[int] = frozenset( + (set(range(24, 32)) - {28}) | set(range(48, 57)) | {67} +) # Fixtures that need resume-aware test seams the conformance adapter # doesn't yet translate. Skipped here with a clear reason — the engine @@ -129,7 +133,11 @@ class _CapturingCheckpointer: :class:`_AbortAfterInstance` AFTER the save that just transitioned the named instance index from ``not_started`` / ``in_flight`` to ``completed``. Simulates a crash at that exact point — used by - fixture 052 to test collect-mode error-record rollforward. + fixture 052 to test collect-mode error-record rollforward, and by + the ``crash_injection: {after_fan_out_instance}`` directive (proposal + 0070). ``abort_after_node``: the same simulated crash AFTER the save + that records the named node in ``completed_positions`` — the + ``crash_injection: {after_node}`` boundary. """ def __init__( @@ -137,12 +145,14 @@ def __init__( *, fan_out_internal_save_batching: FanOutInternalSaveBatching | None = None, abort_after_instance: int | None = None, + abort_after_node: str | None = None, ) -> None: self._inner = InMemoryCheckpointer( fan_out_internal_save_batching=fan_out_internal_save_batching, ) self.saves: list[CheckpointRecord] = [] self._abort_after_instance = abort_after_instance + self._abort_after_node = abort_after_node self._aborted = False # Per proposal 0029 (fixture 056): mutating the saved record's # outer state on ``load`` simulates "user shrank/grew the input @@ -184,20 +194,30 @@ def _raise_if_post_abort(self) -> None: raise _AbortAfterInstance("post-abort save call") def _maybe_abort(self, record: CheckpointRecord) -> None: - """Check whether this save was the one transitioning the - configured ``abort_after_instance`` to ``completed``. If so, - raise the sentinel after the save has been recorded (so the - record is durably persisted before the simulated crash).""" - if self._abort_after_instance is None or self._aborted: + """Check whether this save is the configured crash boundary. If + so, raise the sentinel after the save has been recorded (so the + record is durably persisted before the simulated crash). + ``abort_after_instance`` fires on the save transitioning that + instance index to ``completed``; ``abort_after_node`` fires on + the save recording that node in ``completed_positions``.""" + if self._aborted: return - target_idx = self._abort_after_instance - for fp in record.fan_out_progress: - if target_idx < len(fp.instances) and fp.instances[target_idx].state == "completed": - # Subsequent instances must NOT be completed — otherwise - # we'd abort after a later instance's save instead. - if all(inst.state != "completed" for inst in fp.instances[target_idx + 1 :]): - self._aborted = True - raise _AbortAfterInstance(f"simulated crash after instance {target_idx} completed save") + if self._abort_after_instance is not None: + target_idx = self._abort_after_instance + for fp in record.fan_out_progress: + if target_idx < len(fp.instances) and fp.instances[target_idx].state == "completed": + # Subsequent instances must NOT be completed — otherwise + # we'd abort after a later instance's save instead. + if all(inst.state != "completed" for inst in fp.instances[target_idx + 1 :]): + self._aborted = True + raise _AbortAfterInstance( + f"simulated crash after instance {target_idx} completed save" + ) + if self._abort_after_node is not None and any( + p.node_name == self._abort_after_node for p in record.completed_positions + ): + self._aborted = True + raise _AbortAfterInstance(f"simulated crash after node {self._abort_after_node} save") async def load(self, invocation_id: str) -> CheckpointRecord | None: record = await self._inner.load(invocation_id) @@ -280,9 +300,13 @@ def _build_capturing(spec: Mapping[str, Any]) -> _CapturingCheckpointer: flush_every = int(batching_cfg.get("flush_every", 0)) batching = FanOutInternalSaveBatching(flush_every=flush_every) abort_after = _find_abort_after_instance(spec) + ci_instance, ci_node = _find_crash_injection(spec) + if ci_instance is not None: + abort_after = ci_instance return _CapturingCheckpointer( fan_out_internal_save_batching=batching, abort_after_instance=abort_after, + abort_after_node=ci_node, ) @@ -299,6 +323,26 @@ def _find_abort_after_instance(spec: Mapping[str, Any]) -> int | None: return None +# Conformance-adapter §5.6 ``crash_injection`` (proposal 0070): a simulated +# crash at a checkpoint boundary, independent of an instance failure. +def _find_crash_injection(spec: Mapping[str, Any]) -> tuple[int | None, str | None]: + """Parse the top-level ``crash_injection`` directive. Returns + ``(after_fan_out_instance_index, after_node_name)`` with at most one set. + Pairs with ``resume:`` the way ``first_run_expected_error`` does, but the + first run has no asserted outcome (it "crashed").""" + ci = spec.get("crash_injection") + if not isinstance(ci, dict): + return None, None + ci_dict = cast("Mapping[str, Any]", ci) + after_instance = ci_dict.get("after_fan_out_instance") + if isinstance(after_instance, dict): + return int(cast("Mapping[str, Any]", after_instance)["index"]), None + after_node = ci_dict.get("after_node") + if after_node is not None: + return None, str(after_node) + return None, None + + def _strip_abort_directive(spec: Mapping[str, Any]) -> Mapping[str, Any]: """Return a fresh spec dict with any ``abort_after_instance`` directive removed from fan-out nodes. The engine doesn't recognize @@ -333,10 +377,16 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] # the same per-instance attempt table the resume assertions consult. # Subgraphs and the outer graph both contribute keyed by node name. flaky_per_index_recorders: dict[str, dict[int, list[int]]] = {} + # General per-instance execution recorder — populated for plain-node + # fan-outs (e.g. the crash_injection fixture 067) where no flaky_per_index + # body records which instances ran. Consulted as a fallback for the + # instances_executed / skipped assertions when no flaky_per_index node did. + instance_execution_recorders: dict[str, dict[int, list[int]]] = {} subgraphs = _build_subgraphs_for( spec, top_level, flaky_per_index_recorders=flaky_per_index_recorders, + instance_execution_recorders=instance_execution_recorders, ) trace: list[str] = [] @@ -346,6 +396,7 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] subgraphs=subgraphs, trace=trace, flaky_per_index_attempt_recorders=flaky_per_index_recorders, + instance_execution_recorders=instance_execution_recorders, ) builder = built.builder @@ -391,6 +442,10 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] # Run #1 — first invocation. May succeed or fail per fixture. first_run_expected_error = spec.get("first_run_expected_error") + # crash_injection (proposal 0070): a simulated crash at a checkpoint + # boundary with NO asserted first-run outcome (it "crashed"). When set, + # the abort is expected and swallowed without a first_run_expected_error. + crash_injection = spec.get("crash_injection") invocation_id_first_run: str | None = None final_first_run: State | None = None trace.clear() @@ -407,50 +462,66 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] # node_exception per the fixture's first_run_expected_error # contract. if capturing._aborted: # noqa: SLF001 — test driver intentional - if first_run_expected_error is None: + if crash_injection is not None: + # The simulated crash fired; no first-run outcome is asserted. + pass + elif first_run_expected_error is None: raise AssertionError("abort_after_instance fired but no first_run_expected_error declared") - expected_category = first_run_expected_error["category"] - assert expected_category == "node_exception", ( - f"abort_after_instance simulates node_exception; fixture asserts {expected_category!r}" - ) + else: + expected_category = first_run_expected_error["category"] + assert expected_category == "node_exception", ( + f"abort_after_instance simulates node_exception; fixture asserts {expected_category!r}" + ) + elif crash_injection is not None: + raise AssertionError("crash_injection configured but no crash fired during the first run") elif first_run_expected_error is not None: raise AssertionError( f"expected first run to fail with category " f"{first_run_expected_error!r} but it returned successfully" ) except _AbortAfterInstance: - # Simulated crash from the abort_after_instance directive - # for fail_fast-style flows where the sentinel propagates - # out of the engine. Treat as a node_exception at the - # fan-out level — that's the fixture's + # Simulated crash sentinel propagated out of the engine (serial / + # fail_fast flows). For crash_injection it is the expected crash with + # no asserted outcome; otherwise it pairs with the fixture's # ``first_run_expected_error: node_exception`` shape. - if first_run_expected_error is None: - raise - expected_category = first_run_expected_error["category"] - assert expected_category == "node_exception", ( - f"abort_after_instance simulates node_exception; fixture asserts {expected_category!r}" - ) + if crash_injection is None: + if first_run_expected_error is None: + raise + expected_category = first_run_expected_error["category"] + assert expected_category == "node_exception", ( + f"abort_after_instance simulates node_exception; fixture asserts {expected_category!r}" + ) except CheckpointError: - # When abort_after_instance fires during a subsequent - # post-abort save (instance dispatched after the target's - # save), the engine wraps the abort sentinel as - # ``CheckpointSaveFailed`` and propagates it out. Treat - # the wrapped abort the same way as a direct sentinel - # propagation when the fixture declares it as a + # When the abort fires during a subsequent post-abort save + # (instance dispatched after the target's save), the engine wraps + # the abort sentinel as ``CheckpointSaveFailed`` and propagates it + # out. Treat the wrapped abort like a direct sentinel propagation: + # expected-and-swallowed under crash_injection, else paired with a # ``node_exception`` first-run failure. - if first_run_expected_error is None or not capturing._aborted: # noqa: SLF001 + if crash_injection is not None and capturing._aborted: # noqa: SLF001 + pass + elif first_run_expected_error is None or not capturing._aborted: # noqa: SLF001 raise - expected_category = first_run_expected_error["category"] - assert expected_category == "node_exception", ( - f"abort_after_instance simulates node_exception; fixture asserts {expected_category!r}" - ) + else: + expected_category = first_run_expected_error["category"] + assert expected_category == "node_exception", ( + f"abort_after_instance simulates node_exception; fixture asserts {expected_category!r}" + ) except RuntimeGraphError as e: - if first_run_expected_error is None: + # crash_injection's simulated crash can surface here too: under + # serial fail_fast the abort sentinel is wrapped as + # ``CheckpointSaveFailed`` and re-wrapped as a ``NodeException``. + # When the abort fired (``_aborted``), swallow it as the expected + # crash with no asserted outcome. + if crash_injection is not None and capturing._aborted: # noqa: SLF001 + pass + elif first_run_expected_error is None: raise - expected_category = first_run_expected_error["category"] - assert e.category == expected_category, ( - f"first-run error category mismatch: actual={e.category!r}, expected={expected_category!r}" - ) + else: + expected_category = first_run_expected_error["category"] + assert e.category == expected_category, ( + f"first-run error category mismatch: actual={e.category!r}, expected={expected_category!r}" + ) # Capture invocation_id from the latest save (we attached a # capturing checkpointer; every save record carries the engine's @@ -534,12 +605,15 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] # and ``instance_N_resume_attempt_count`` assertions. for recorder in flaky_per_index_recorders.values(): recorder.clear() + for recorder in instance_execution_recorders.values(): + recorder.clear() # Reset the abort gate so the resume run completes normally. # ``_aborted`` being False disables the ``_raise_if_post_abort`` - # pre-flight check; clearing ``_abort_after_instance`` ensures + # pre-flight check; clearing the abort targets ensures # ``_maybe_abort`` is also a no-op on the resume path. capturing._aborted = False # noqa: SLF001 — test driver intentional capturing._abort_after_instance = None # noqa: SLF001 + capturing._abort_after_node = None # noqa: SLF001 # Clear the trace so post-resume execution capture is isolated. trace.clear() @@ -601,11 +675,19 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] # instances_skipped_during_resume — assert against the # per-instance attempt recorders (each instance whose body ran # appears in the recorder). + # flaky_per_index recorders capture execution for retry-resume fixtures; + # for a plain-node fan-out (the crash_injection fixture 067) no + # flaky_per_index body records, so fall back to the general execution + # recorder. The flaky_per_index path stays primary, so existing fixtures + # are unchanged. + executed_set = set(_flatten_executed_instances(flaky_per_index_recorders)) + if not executed_set: + executed_set = set(_flatten_executed_instances(instance_execution_recorders)) if "instances_executed_during_resume" in resume_expected: expected_executed = sorted( int(i) for i in cast(Iterable[Any], resume_expected["instances_executed_during_resume"]) ) - actual_executed = sorted(_flatten_executed_instances(flaky_per_index_recorders)) + actual_executed = sorted(executed_set) assert actual_executed == expected_executed, ( f"instances_executed_during_resume mismatch: " f"actual={actual_executed}, expected={expected_executed}" @@ -614,14 +696,11 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] expected_skipped = sorted( int(i) for i in cast(Iterable[Any], resume_expected["instances_skipped_during_resume"]) ) - actual_executed_set = set(_flatten_executed_instances(flaky_per_index_recorders)) # An instance is "skipped" if its body did NOT run during resume. - # We can validate by asserting it's not in the executed set — - # the fixtures specify the disjoint partitioning explicitly. for skipped_idx in expected_skipped: - assert skipped_idx not in actual_executed_set, ( + assert skipped_idx not in executed_set, ( f"instance {skipped_idx} expected to be skipped on resume " - f"but its body ran (recorded attempts: {actual_executed_set})" + f"but its body ran (recorded attempts: {executed_set})" ) if "invariants" in resume_expected or "invariants" in resume_block: @@ -667,19 +746,22 @@ def _build_subgraphs_for( top_level: Mapping[str, Any], *, flaky_per_index_recorders: dict[str, dict[int, list[int]]] | None = None, + instance_execution_recorders: dict[str, dict[int, list[int]]] | None = None, ) -> dict[str, Any]: """Build subgraphs from either the case's own ``subgraph`` / ``subgraphs`` block or the cases-fixture's top-level shared ``subgraph`` block. Each case may declare local subgraphs OR inherit from the top level. - ``flaky_per_index_recorders`` (when supplied) threads through to - inner-subgraph build so per-instance flaky bodies inside subgraphs - populate the same recorder map the resume assertions read. + ``flaky_per_index_recorders`` and ``instance_execution_recorders`` + (when supplied) thread through to inner-subgraph build so per-instance + bodies inside subgraphs populate the recorder maps the resume + assertions read. """ return _build_subgraphs( {**dict(top_level), **dict(spec)}, flaky_per_index_recorders=flaky_per_index_recorders, + instance_execution_recorders=instance_execution_recorders, ) @@ -687,14 +769,16 @@ def _build_subgraphs( spec: Mapping[str, Any], *, flaky_per_index_recorders: dict[str, dict[int, list[int]]] | None = None, + instance_execution_recorders: dict[str, dict[int, list[int]]] | None = None, ) -> dict[str, Any]: """Build any subgraphs (`subgraph:` or `subgraphs:`) the fixture declares. Returns a registry the adapter consumes by name. Inner subgraphs may declare flaky_per_index nodes (fixture 048+: the failing/succeeding scorer node lives in the inner subgraph, - not the outer graph). Thread the recorders through so those - flaky bodies populate the same per-instance attempt table. + not the outer graph) or plain nodes (fixture 067's crash_injection + scorer). Thread both recorder maps through so those bodies populate + the per-instance attempt / execution tables the resume assertions read. """ subgraph_specs: dict[str, Any] = {} if "subgraph" in spec: @@ -711,6 +795,7 @@ def _build_subgraphs( sub_spec, trace=sub_trace, flaky_per_index_attempt_recorders=flaky_per_index_recorders, + instance_execution_recorders=instance_execution_recorders, ) compiled_subgraphs[name] = sub_built.builder.compile() return compiled_subgraphs @@ -1068,3 +1153,41 @@ def _assert_invariants( for s in saves: for p in s.completed_positions: assert p.fan_out_index is None, f"unexpected fan-out internal save: {p}" + + +# --------------------------------------------------------------------------- +# crash_injection: after_node boundary (proposal 0070) +# --------------------------------------------------------------------------- + + +async def test_capturing_checkpointer_aborts_after_node() -> None: + # The ``crash_injection: {after_node}`` boundary fires the simulated-crash + # sentinel after the save whose record records the named node in + # ``completed_positions``. Exercised directly because no v0.58.0 fixture + # uses after_node (fixture 067 uses after_fan_out_instance, which covers + # the shared abort path end-to-end); this pins the after_node branch. + cp = _CapturingCheckpointer(abort_after_node="target") + record = CheckpointRecord( + invocation_id="inv", + correlation_id="c", + state={}, + completed_positions=(NodePosition(namespace=(), node_name="target", step=0),), + parent_states=(), + last_saved_at=0.0, + ) + with pytest.raises(_AbortAfterInstance): + await cp.save("inv", record) + assert cp._aborted is True # noqa: SLF001 — test driver intentional + + # A save whose record does not record the target node does not abort. + cp_other = _CapturingCheckpointer(abort_after_node="target") + other_record = CheckpointRecord( + invocation_id="inv", + correlation_id="c", + state={}, + completed_positions=(NodePosition(namespace=(), node_name="other", step=0),), + parent_states=(), + last_saved_at=0.0, + ) + await cp_other.save("inv", other_record) + assert cp_other._aborted is False # noqa: SLF001 — test driver intentional diff --git a/tests/conformance/test_pipeline_utilities.py b/tests/conformance/test_pipeline_utilities.py index c22fab8..c755fdc 100644 --- a/tests/conformance/test_pipeline_utilities.py +++ b/tests/conformance/test_pipeline_utilities.py @@ -84,14 +84,15 @@ def _load(path: Path) -> dict[str, Any]: # the `cases:` shape carries seeded-record + migrations + resume blocks. _LAST_DRIVEN_FIXTURE = 38 -# Failure-isolation fixtures (058-065, proposals 0050 §6.3 + 0065 + 0066) -# are middleware fixtures this runner handles. They sit past -# _LAST_DRIVEN_FIXTURE only because the 039-057 range (state migration / +# Failure-isolation fixtures (058-066 + 068, proposals 0050 §6.3 / 0065 / +# 0066 / 0068 / 0070) are middleware fixtures this runner handles. They sit +# past _LAST_DRIVEN_FIXTURE only because the 039-057 range (state migration / # checkpoint fan-out) is owned by dedicated runners (test_state_migration.py # / test_checkpoint.py), not because this runner can't drive them. Fixture -# 065 (fan-out degrade contribution, proposal 0066) joined when the spec pin -# advanced to v0.56.0. -_FAILURE_ISOLATION_FIXTURES = frozenset(range(58, 67)) +# 066 (cause chain, 0068) joined at v0.57.0; 068 (failure-mock cause chain, +# 0070) at v0.58.0. Fixture 067 (crash-injection fan-out resume) is a +# checkpoint fixture owned by test_checkpoint.py, hence the gap at 67. +_FAILURE_ISOLATION_FIXTURES = frozenset(range(58, 67)) | {68} def _fixture_paths() -> list[Path]: diff --git a/tests/test_smoke.py b/tests/test_smoke.py index dc9449e..a121eed 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.13.0" - assert openarmature.__spec_version__ == "0.57.0" + assert openarmature.__spec_version__ == "0.58.0" def test_spec_version_matches_pyproject() -> None: From 4d202bd9b8033b1525c71ef21a626a1b9fdd56d5 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 17 Jun 2026 13:34:31 -0700 Subject: [PATCH 2/2] Harden crash-injection directive handling (0070) Address PR review on the crash_injection plumbing: - crash_injection now defines the crash boundary exclusively; when set, the legacy fan_out.abort_after_instance directive is ignored, so an instance-boundary and a node-boundary abort cannot both be active. - A non-dict crash_injection is coerced to None in the first-run handler, matching _find_crash_injection's mapping check, so a malformed directive no longer triggers the swallow path. - after_fan_out_instance now honors its documented node field: the parser threads the node name and _maybe_abort filters fan_out_progress by it, so a multi-fan-out graph aborts after the named node. Single-fan-out fixtures and the legacy path are unchanged. --- tests/conformance/test_checkpoint.py | 54 +++++++++++++++++++++------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/tests/conformance/test_checkpoint.py b/tests/conformance/test_checkpoint.py index 99f0a46..5094b86 100644 --- a/tests/conformance/test_checkpoint.py +++ b/tests/conformance/test_checkpoint.py @@ -145,6 +145,7 @@ def __init__( *, fan_out_internal_save_batching: FanOutInternalSaveBatching | None = None, abort_after_instance: int | None = None, + abort_after_instance_node: str | None = None, abort_after_node: str | None = None, ) -> None: self._inner = InMemoryCheckpointer( @@ -152,6 +153,11 @@ def __init__( ) self.saves: list[CheckpointRecord] = [] self._abort_after_instance = abort_after_instance + # The fan-out node ``abort_after_instance`` targets. ``None`` (the + # legacy fan_out.abort_after_instance path) matches any fan-out; + # crash_injection.after_fan_out_instance sets it to scope the abort to + # the named node in a multi-fan-out graph. + self._abort_after_instance_node = abort_after_instance_node self._abort_after_node = abort_after_node self._aborted = False # Per proposal 0029 (fixture 056): mutating the saved record's @@ -205,6 +211,14 @@ def _maybe_abort(self, record: CheckpointRecord) -> None: if self._abort_after_instance is not None: target_idx = self._abort_after_instance for fp in record.fan_out_progress: + # Scope to the targeted fan-out node when one is named + # (crash_injection.after_fan_out_instance); the legacy path + # leaves it None and matches any fan-out. + if ( + self._abort_after_instance_node is not None + and fp.fan_out_node_name != self._abort_after_instance_node + ): + continue if target_idx < len(fp.instances) and fp.instances[target_idx].state == "completed": # Subsequent instances must NOT be completed — otherwise # we'd abort after a later instance's save instead. @@ -299,13 +313,18 @@ def _build_capturing(spec: Mapping[str, Any]) -> _CapturingCheckpointer: ) flush_every = int(batching_cfg.get("flush_every", 0)) batching = FanOutInternalSaveBatching(flush_every=flush_every) - abort_after = _find_abort_after_instance(spec) - ci_instance, ci_node = _find_crash_injection(spec) - if ci_instance is not None: + ci_instance, ci_instance_node, ci_node = _find_crash_injection(spec) + if ci_instance is not None or ci_node is not None: + # crash_injection defines the crash boundary exclusively; the legacy + # fan_out.abort_after_instance directive is ignored when it is set, so + # an instance-boundary and a node-boundary abort can't both be active. abort_after = ci_instance + else: + abort_after = _find_abort_after_instance(spec) return _CapturingCheckpointer( fan_out_internal_save_batching=batching, abort_after_instance=abort_after, + abort_after_instance_node=ci_instance_node, abort_after_node=ci_node, ) @@ -325,22 +344,27 @@ def _find_abort_after_instance(spec: Mapping[str, Any]) -> int | None: # Conformance-adapter §5.6 ``crash_injection`` (proposal 0070): a simulated # crash at a checkpoint boundary, independent of an instance failure. -def _find_crash_injection(spec: Mapping[str, Any]) -> tuple[int | None, str | None]: +def _find_crash_injection(spec: Mapping[str, Any]) -> tuple[int | None, str | None, str | None]: """Parse the top-level ``crash_injection`` directive. Returns - ``(after_fan_out_instance_index, after_node_name)`` with at most one set. - Pairs with ``resume:`` the way ``first_run_expected_error`` does, but the - first run has no asserted outcome (it "crashed").""" + ``(after_fan_out_instance_index, after_fan_out_instance_node, + after_node_name)``: the index + node identify the + ``after_fan_out_instance`` boundary, ``after_node_name`` the ``after_node`` + boundary; at most one boundary is set. Pairs with ``resume:`` the way + ``first_run_expected_error`` does, but the first run has no asserted + outcome (it "crashed").""" ci = spec.get("crash_injection") if not isinstance(ci, dict): - return None, None + return None, None, None ci_dict = cast("Mapping[str, Any]", ci) after_instance = ci_dict.get("after_fan_out_instance") if isinstance(after_instance, dict): - return int(cast("Mapping[str, Any]", after_instance)["index"]), None + ai = cast("Mapping[str, Any]", after_instance) + node = ai.get("node") + return int(ai["index"]), (str(node) if node is not None else None), None after_node = ci_dict.get("after_node") if after_node is not None: - return None, str(after_node) - return None, None + return None, None, str(after_node) + return None, None, None def _strip_abort_directive(spec: Mapping[str, Any]) -> Mapping[str, Any]: @@ -445,7 +469,12 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] # crash_injection (proposal 0070): a simulated crash at a checkpoint # boundary with NO asserted first-run outcome (it "crashed"). When set, # the abort is expected and swallowed without a first_run_expected_error. - crash_injection = spec.get("crash_injection") + # Coerced to None when not a mapping, matching _find_crash_injection, so a + # malformed directive parses to no boundary rather than swallowing aborts + # or tripping the "configured but no crash fired" assertion. + crash_injection: Any = spec.get("crash_injection") + if not isinstance(crash_injection, dict): + crash_injection = None invocation_id_first_run: str | None = None final_first_run: State | None = None trace.clear() @@ -613,6 +642,7 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any] # ``_maybe_abort`` is also a no-op on the resume path. capturing._aborted = False # noqa: SLF001 — test driver intentional capturing._abort_after_instance = None # noqa: SLF001 + capturing._abort_after_instance_node = None # noqa: SLF001 capturing._abort_after_node = None # noqa: SLF001 # Clear the trace so post-resume execution capture is isolated. trace.clear()