From 6a4fedf3958c747385431a3345d609f30ad5d338 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 15 Jun 2026 08:38:49 -0700 Subject: [PATCH] Surface failure-isolation cause in fan-out example The fan-out-with-retry example already runs FailureIsolation wrapping Retry at a fan-out instance, but it only showed the degraded state, never the FailureIsolatedEvent. Add a failure_isolation_observer that captures the events and, in degrade mode, print each one's event_name, caught_exception.category, and attempt_index. The category is the point: at an instance placement it resolves to the originating cause (provider_unavailable) rather than the node_exception the engine wrapped it in, so the demo now shows the failure telemetry naming what actually failed. Update the example's docstring and doc page to describe the new block, and add a MODE=degrade line to the Run with section. --- docs/examples/fan-out-with-retry.md | 19 +++++++++++++ examples/fan-out-with-retry/main.py | 44 ++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/docs/examples/fan-out-with-retry.md b/docs/examples/fan-out-with-retry.md index 77ad84f..08d64d6 100644 --- a/docs/examples/fan-out-with-retry.md +++ b/docs/examples/fan-out-with-retry.md @@ -56,6 +56,12 @@ intact. recording the resolved `item_count` / `concurrency` / `error_policy` at runtime. Inner-instance events carry `fan_out_index` but not the config. +- In `degrade` mode, a `failure_isolation_observer` captures each + `FailureIsolatedEvent` and the demo prints its + `caught_exception.category`. At a fan-out instance placement the + category resolves to the originating cause (`provider_unavailable`) + rather than the masking `node_exception`, so the telemetry names what + actually failed. ## Composing with checkpointing @@ -182,3 +188,16 @@ exhausted-retry failure and returned the degraded partial, so the fan-out recorded the instance as a (degraded) success. The per-instance timings still show the sentinel's failed attempts, so you can see the retries happened before the instance was degraded. + +The degrade run also prints a `Failure-isolation events` block from the +`failure_isolation_observer`: + +``` +Failure-isolation events (1): + event='headline_degraded' cause=provider_unavailable attempt_index=2 +``` + +`cause` is the resolved originating category, `provider_unavailable`, +not the masking `node_exception` the engine wraps the failure in before +isolation catches it. `attempt_index` is the final, exhausting attempt +of the three the retry middleware made. diff --git a/examples/fan-out-with-retry/main.py b/examples/fan-out-with-retry/main.py index 37843fb..e1133f9 100644 --- a/examples/fan-out-with-retry/main.py +++ b/examples/fan-out-with-retry/main.py @@ -54,6 +54,11 @@ the fan-out node's own started / completed pair and gives observers a record of the resolved item_count, concurrency, and error_policy at dispatch time. +- In ``degrade`` mode a ``failure_isolation_observer`` captures each + ``FailureIsolatedEvent``; the demo prints its ``event_name``, the + resolved ``caught_exception.category`` (the originating cause, e.g. + ``provider_unavailable``, not the masking ``node_exception``), and the + exhausting ``attempt_index``. **Configuration** (env vars; OpenAI defaults shown): @@ -68,6 +73,10 @@ uv sync --group examples cd examples/fan-out-with-retry LLM_API_KEY=sk-... uv run python main.py + + # exercise the degrade failure-path: prepends a synthetic failing + # headline and prints the Failure-isolation events block + MODE=degrade LLM_API_KEY=sk-... uv run python main.py """ from __future__ import annotations @@ -83,6 +92,7 @@ from openarmature.graph import ( END, CompiledGraph, + FailureIsolatedEvent, GraphBuilder, NodeEvent, ObserverEvent, @@ -238,6 +248,28 @@ async def _record_timing(record: TimingRecord) -> None: _timings.append(record) +# Captured failure-isolation events, populated by the observer below. +# Only fires in ``degrade`` mode, where FailureIsolationMiddleware catches +# an exhausted instance and emits one FailureIsolatedEvent per degraded +# instance. +_isolated: list[FailureIsolatedEvent] = [] + + +async def failure_isolation_observer(event: ObserverEvent) -> None: + """Capture each FailureIsolatedEvent so the demo can surface the + resolved failure cause. + + When FailureIsolation wraps Retry at a fan-out instance, the engine + has already wrapped the originating error as a node_exception carrier + by the time isolation catches it. ``caught_exception.category`` + resolves through that carrier to the originating cause, so a degraded + instance reports ``provider_unavailable`` (what actually failed) + rather than the masking ``node_exception``. + """ + if isinstance(event, FailureIsolatedEvent): + _isolated.append(event) + + # --------------------------------------------------------------------------- # Outer graph # --------------------------------------------------------------------------- @@ -371,8 +403,9 @@ async def fan_out_config_observer(event: ObserverEvent) -> None: async def main() -> None: # Reset module-level capture so a REPL or repeated-main() driver - # doesn't accumulate timings across invocations. + # doesn't accumulate timings / isolation events across invocations. _timings.clear() + _isolated.clear() # MODE selects the per-instance failure posture: fail_fast (default, # abort on the first exhausted-retry failure), collect (record @@ -382,6 +415,7 @@ async def main() -> None: mode = os.environ.get("MODE", "fail_fast") graph = build_graph(mode=mode) graph.attach_observer(fan_out_config_observer) + graph.attach_observer(failure_isolation_observer) # collect and degrade both need a failure to demonstrate, so prepend # a deliberately-failing headline that summarize() always raises on. @@ -430,6 +464,14 @@ async def main() -> None: for err in final.instance_errors: print(f" {err}") print() + if _isolated: + print(f"Failure-isolation events ({len(_isolated)}):") + for ev in _isolated: + print( + f" event={ev.event_name!r} cause={ev.caught_exception.category} " + f"attempt_index={ev.attempt_index}" + ) + print() print("Per-instance timings (in completion order):") for nth, record in enumerate(_timings): print(f" #{nth} {record.duration_ms:7.1f} ms outcome={record.outcome}")