diff --git a/docs/examples/fan-out-with-retry.md b/docs/examples/fan-out-with-retry.md index 77ad84f..08d64d6 100644 --- a/docs/examples/fan-out-with-retry.md +++ b/docs/examples/fan-out-with-retry.md @@ -56,6 +56,12 @@ intact. recording the resolved `item_count` / `concurrency` / `error_policy` at runtime. Inner-instance events carry `fan_out_index` but not the config. +- In `degrade` mode, a `failure_isolation_observer` captures each + `FailureIsolatedEvent` and the demo prints its + `caught_exception.category`. At a fan-out instance placement the + category resolves to the originating cause (`provider_unavailable`) + rather than the masking `node_exception`, so the telemetry names what + actually failed. ## Composing with checkpointing @@ -182,3 +188,16 @@ exhausted-retry failure and returned the degraded partial, so the fan-out recorded the instance as a (degraded) success. The per-instance timings still show the sentinel's failed attempts, so you can see the retries happened before the instance was degraded. + +The degrade run also prints a `Failure-isolation events` block from the +`failure_isolation_observer`: + +``` +Failure-isolation events (1): + event='headline_degraded' cause=provider_unavailable attempt_index=2 +``` + +`cause` is the resolved originating category, `provider_unavailable`, +not the masking `node_exception` the engine wraps the failure in before +isolation catches it. `attempt_index` is the final, exhausting attempt +of the three the retry middleware made. diff --git a/examples/fan-out-with-retry/main.py b/examples/fan-out-with-retry/main.py index 37843fb..e1133f9 100644 --- a/examples/fan-out-with-retry/main.py +++ b/examples/fan-out-with-retry/main.py @@ -54,6 +54,11 @@ the fan-out node's own started / completed pair and gives observers a record of the resolved item_count, concurrency, and error_policy at dispatch time. +- In ``degrade`` mode a ``failure_isolation_observer`` captures each + ``FailureIsolatedEvent``; the demo prints its ``event_name``, the + resolved ``caught_exception.category`` (the originating cause, e.g. + ``provider_unavailable``, not the masking ``node_exception``), and the + exhausting ``attempt_index``. **Configuration** (env vars; OpenAI defaults shown): @@ -68,6 +73,10 @@ uv sync --group examples cd examples/fan-out-with-retry LLM_API_KEY=sk-... uv run python main.py + + # exercise the degrade failure-path: prepends a synthetic failing + # headline and prints the Failure-isolation events block + MODE=degrade LLM_API_KEY=sk-... uv run python main.py """ from __future__ import annotations @@ -83,6 +92,7 @@ from openarmature.graph import ( END, CompiledGraph, + FailureIsolatedEvent, GraphBuilder, NodeEvent, ObserverEvent, @@ -238,6 +248,28 @@ async def _record_timing(record: TimingRecord) -> None: _timings.append(record) +# Captured failure-isolation events, populated by the observer below. +# Only fires in ``degrade`` mode, where FailureIsolationMiddleware catches +# an exhausted instance and emits one FailureIsolatedEvent per degraded +# instance. +_isolated: list[FailureIsolatedEvent] = [] + + +async def failure_isolation_observer(event: ObserverEvent) -> None: + """Capture each FailureIsolatedEvent so the demo can surface the + resolved failure cause. + + When FailureIsolation wraps Retry at a fan-out instance, the engine + has already wrapped the originating error as a node_exception carrier + by the time isolation catches it. ``caught_exception.category`` + resolves through that carrier to the originating cause, so a degraded + instance reports ``provider_unavailable`` (what actually failed) + rather than the masking ``node_exception``. + """ + if isinstance(event, FailureIsolatedEvent): + _isolated.append(event) + + # --------------------------------------------------------------------------- # Outer graph # --------------------------------------------------------------------------- @@ -371,8 +403,9 @@ async def fan_out_config_observer(event: ObserverEvent) -> None: async def main() -> None: # Reset module-level capture so a REPL or repeated-main() driver - # doesn't accumulate timings across invocations. + # doesn't accumulate timings / isolation events across invocations. _timings.clear() + _isolated.clear() # MODE selects the per-instance failure posture: fail_fast (default, # abort on the first exhausted-retry failure), collect (record @@ -382,6 +415,7 @@ async def main() -> None: mode = os.environ.get("MODE", "fail_fast") graph = build_graph(mode=mode) graph.attach_observer(fan_out_config_observer) + graph.attach_observer(failure_isolation_observer) # collect and degrade both need a failure to demonstrate, so prepend # a deliberately-failing headline that summarize() always raises on. @@ -430,6 +464,14 @@ async def main() -> None: for err in final.instance_errors: print(f" {err}") print() + if _isolated: + print(f"Failure-isolation events ({len(_isolated)}):") + for ev in _isolated: + print( + f" event={ev.event_name!r} cause={ev.caught_exception.category} " + f"attempt_index={ev.attempt_index}" + ) + print() print("Per-instance timings (in completion order):") for nth, record in enumerate(_timings): print(f" #{nth} {record.duration_ms:7.1f} ms outcome={record.outcome}")