Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/examples/fan-out-with-retry.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ intact.
recording the resolved `item_count` / `concurrency` /
`error_policy` at runtime. Inner-instance events carry
`fan_out_index` but not the config.
- In `degrade` mode, a `failure_isolation_observer` captures each
`FailureIsolatedEvent` and the demo prints its
`caught_exception.category`. At a fan-out instance placement the
category resolves to the originating cause (`provider_unavailable`)
rather than the masking `node_exception`, so the telemetry names what
actually failed.

## Composing with checkpointing

Expand Down Expand Up @@ -182,3 +188,16 @@ exhausted-retry failure and returned the degraded partial, so the
fan-out recorded the instance as a (degraded) success. The
per-instance timings still show the sentinel's failed attempts, so
you can see the retries happened before the instance was degraded.

The degrade run also prints a `Failure-isolation events` block from the
`failure_isolation_observer`:

```
Failure-isolation events (1):
event='headline_degraded' cause=provider_unavailable attempt_index=2
```

`cause` is the resolved originating category, `provider_unavailable`,
not the masking `node_exception` the engine wraps the failure in before
isolation catches it. `attempt_index` is the final, exhausting attempt
of the three the retry middleware made.
44 changes: 43 additions & 1 deletion examples/fan-out-with-retry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
the fan-out node's own started / completed pair and gives observers
a record of the resolved item_count, concurrency, and error_policy
at dispatch time.
- In ``degrade`` mode a ``failure_isolation_observer`` captures each
``FailureIsolatedEvent``; the demo prints its ``event_name``, the
resolved ``caught_exception.category`` (the originating cause, e.g.
``provider_unavailable``, not the masking ``node_exception``), and the
exhausting ``attempt_index``.

**Configuration** (env vars; OpenAI defaults shown):

Expand All @@ -68,6 +73,10 @@
uv sync --group examples
cd examples/fan-out-with-retry
LLM_API_KEY=sk-... uv run python main.py

# exercise the degrade failure-path: prepends a synthetic failing
# headline and prints the Failure-isolation events block
MODE=degrade LLM_API_KEY=sk-... uv run python main.py
"""

from __future__ import annotations
Expand All @@ -83,6 +92,7 @@
from openarmature.graph import (
END,
CompiledGraph,
FailureIsolatedEvent,
GraphBuilder,
NodeEvent,
ObserverEvent,
Expand Down Expand Up @@ -238,6 +248,28 @@ async def _record_timing(record: TimingRecord) -> None:
_timings.append(record)


# Captured failure-isolation events, populated by the observer below.
# Only fires in ``degrade`` mode, where FailureIsolationMiddleware catches
# an exhausted instance and emits one FailureIsolatedEvent per degraded
# instance.
_isolated: list[FailureIsolatedEvent] = []


async def failure_isolation_observer(event: ObserverEvent) -> None:
"""Capture each FailureIsolatedEvent so the demo can surface the
resolved failure cause.

When FailureIsolation wraps Retry at a fan-out instance, the engine
has already wrapped the originating error as a node_exception carrier
by the time isolation catches it. ``caught_exception.category``
resolves through that carrier to the originating cause, so a degraded
instance reports ``provider_unavailable`` (what actually failed)
rather than the masking ``node_exception``.
"""
if isinstance(event, FailureIsolatedEvent):
_isolated.append(event)


# ---------------------------------------------------------------------------
# Outer graph
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -371,8 +403,9 @@ async def fan_out_config_observer(event: ObserverEvent) -> None:

async def main() -> None:
# Reset module-level capture so a REPL or repeated-main() driver
# doesn't accumulate timings across invocations.
# doesn't accumulate timings / isolation events across invocations.
_timings.clear()
_isolated.clear()

# MODE selects the per-instance failure posture: fail_fast (default,
# abort on the first exhausted-retry failure), collect (record
Expand All @@ -382,6 +415,7 @@ async def main() -> None:
mode = os.environ.get("MODE", "fail_fast")
graph = build_graph(mode=mode)
graph.attach_observer(fan_out_config_observer)
graph.attach_observer(failure_isolation_observer)

# collect and degrade both need a failure to demonstrate, so prepend
# a deliberately-failing headline that summarize() always raises on.
Expand Down Expand Up @@ -430,6 +464,14 @@ async def main() -> None:
for err in final.instance_errors:
print(f" {err}")
print()
if _isolated:
print(f"Failure-isolation events ({len(_isolated)}):")
for ev in _isolated:
print(
f" event={ev.event_name!r} cause={ev.caught_exception.category} "
f"attempt_index={ev.attempt_index}"
)
print()
print("Per-instance timings (in completion order):")
for nth, record in enumerate(_timings):
print(f" #{nth} {record.duration_ms:7.1f} ms outcome={record.outcome}")
Expand Down