Middleware wraps the dispatch of a single node. The shape is an async
callable (state, next) -> partial_update. Anything you want to happen
around a node, without changing the node itself, lives here: retries,
timing, structured logging, request enrichment, error transformation,
circuit-breaking.
from collections.abc import Mapping
from typing import Any
from openarmature.graph import Middleware, NextCall
class LogAround:
async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]:
print("before")
partial = await next_(state)
print("after")
return partial
_: Middleware = LogAround() # structural conformance checknext invokes the next layer of the chain (or the wrapped node, at
the innermost end) and returns the partial update from that layer.
Code before await next(state) is the pre-node phase (runs on the way
in); code after is the post-node phase (runs on the way out).
You can attach middleware at four places. The same Middleware shape
works in all of them.
Per-node, on a single function node:
builder.add_node("fetch", fetch_fn, middleware=[RetryMiddleware()])Per-graph, applied to every node in the graph:
builder.add_middleware(TimingMiddleware(node_name="...", on_complete=record))Per-branch, on a single branch of a parallel-branches node:
from openarmature.graph import BranchSpec
branches = {
"sentiment": BranchSpec(
subgraph=sentiment_subgraph,
middleware=(RetryMiddleware(),),
),
"topic": BranchSpec(subgraph=topic_subgraph),
}
builder.add_parallel_branches_node("classify", branches=branches)The branch middleware wraps the whole branch dispatch as one call. A retry on a branch retries the entire branch from scratch, not an individual node inside it.
Per-fan-out-instance, on the instance dispatch inside a fan-out node:
builder.add_fan_out_node(
"summarize",
subgraph=summarize_subgraph,
items_field="articles",
item_field="article",
collect_field="article",
target_field="summaries",
instance_middleware=[RetryMiddleware()],
)A retry here retries one instance, not the whole fan-out.
When a node has middleware from multiple sites, per-graph composes outside per-node. The runtime chain at a single function node is:
[per_graph_outer_to_inner...] → [per_node_outer_to_inner...] → node
The first middleware in builder.add_middleware() calls is the
outermost layer; the last is closest to the node. Same rule for
per-node: list order is outer-to-inner.
Middleware does not cross into a subgraph. The parent's middleware
wraps the SubgraphNode dispatch as a single atomic call, and the
subgraph's own middleware (configured on the child builder) wraps the
child's internal nodes independently.
In practical terms: a RetryMiddleware on a subgraph-as-node retries
the whole child graph from its entry. A RetryMiddleware inside the
child retries one of its individual nodes.
An exception raised by next(state) propagates up through await next(state). Middleware may:
- Re-raise: the simplest case. Don't catch, let it bubble.
- Catch and recover: catch the exception and return a partial update of your own. The rest of the chain continues as if the node had returned that partial update normally.
- Catch and transform: catch one exception type, raise a different one. The new exception propagates up.
- Call
nextmore than once: this is what retry middleware does.
A middleware MUST NOT mutate the input state object in place. To
hand a transformed state down the chain, pass a new state instance to
next(...).
from openarmature.graph import RetryConfig, RetryMiddleware, exponential_jitter_backoff
async def on_retry(exc: Exception, attempt: int) -> None:
log.warning("retrying after %r (attempt %d)", exc, attempt)
retry = RetryMiddleware(
RetryConfig(
max_attempts=3,
backoff=exponential_jitter_backoff,
on_retry=on_retry,
)
)Configured with a RetryConfig; four fields, all optional:
max_attemptsis the total attempt count including the first call.1disables retry. Default3.classifieris a predicate(exception, state) -> bool. The default (default_classifier, importable fromopenarmature.graph) treats any exception with acategoryattribute matching the project'sTRANSIENT_CATEGORIESset as transient. To retry on additional types, write a classifier that delegates todefault_classifierand falls back to your own check.backoffis a callable(attempt_index) -> seconds. The default is exponential with jitter (base 1s, cap 30s, full jitter).deterministic_backoff(seconds)is provided for tests.on_retryis an optional async callback(exception, attempt) -> None. Fires before each sleep. Useful for emitting a structured "about to retry" event.
A retry's attempt counter propagates as a context variable to every
node event emitted from within the retry, including nodes inside
subgraphs and branches that the retry wraps transitively. So an
observer logging a retried node sees attempt=1, attempt=2, etc. on
the inner events.
from openarmature.graph import TimingMiddleware, TimingRecord
async def record(rec: TimingRecord) -> None:
metrics.histogram("node_duration_ms", rec.duration_ms, tags={
"node": rec.node_name,
"outcome": rec.outcome,
})
builder.add_node(
"fetch",
fetch_fn,
middleware=[TimingMiddleware(node_name="fetch", on_complete=record)],
)TimingMiddleware records the wrapped chain's duration with a
monotonic clock and delivers a TimingRecord to your async callback.
The record includes node_name, duration_ms, outcome ("success"
or "exception"), and exception_category (the failing exception's
category attribute when present).
Two implementation details worth knowing:
- The callback fires inline before the chain's result returns. Slow callbacks add to the apparent node duration. Keep them fast (queue work, defer I/O).
- The clock is injectable per instance via the
clockkwarg. Test fixtures use this to supply a deterministic stub without globally patchingtime.monotonic(which would also distort asyncio's scheduling).
from openarmature.graph import FailureIsolationMiddleware
builder.add_node(
"extract_segments",
extract_fn,
middleware=[
FailureIsolationMiddleware(
degraded_update={"segments": []},
event_name="segment_extraction_degraded",
),
],
)FailureIsolationMiddleware catches an exception escaping the wrapped
chain and returns a degraded partial update instead of letting it abort
the invocation. Reach for it when a node is not load-bearing enough to
kill the whole run: a failed enrichment step degrades to an empty list,
the graph continues, and the failure is still visible in your traces.
It is the named, observable form of the "catch and recover" pattern
from Error semantics above.
Configuration:
degraded_update(required) is the partial update returned on a caught exception. It may be a static mapping, or a callablestate -> partial_updatewhen the fallback shape depends on the input state. The callable is resolved once, at catch time.event_name(required, no default) is a stable identifier for this catch site. It rides on the emitted event (below) and any downstream logging. There is no default on purpose: a generic name like"failure_isolated"collapses every degraded path into one indistinguishable bucket in a dashboard, so the name is forced at the construction site, where the context to name it well is available.predicateis an optionalException -> bool. When supplied, only exceptions where it returns true are caught; everything else propagates. The default catches everyException.on_caughtis an optional async hookException -> None, fired when the middleware catches. Use it to pump the caught exception to caller-specific telemetry beyond the framework event. It fires inline before the degraded update returns, and an exception it raises is isolated (logged, not propagated) so a buggy hook cannot defeat the recovery.
Like RetryMiddleware, it catches Exception only; BaseException
(cancellation, keyboard interrupt) propagates so aborts still work.
On a catch, the middleware dispatches a FailureIsolatedEvent onto the
observer stream. It is a distinct event variant, not a node event: it
carries the event_name, the wrapped node's lineage identity, the input
and degraded states, and a CaughtException record. That record holds a
derived category (when the cause has one) and message for simple
consumers, plus a chain of cause links (CauseLink) from the caught
exception down to the originating raise, with graph-engine carrier
wrappers flagged so a consumer can skip them. Observers narrow on it with
isinstance(event, FailureIsolatedEvent). The bundled OTel
and Langfuse observers render it as a marker span / observation so the
catch shows up alongside the node's own span. The default emission path
is the observer stream only, with no logging-library dependency;
on_caught is the escape hatch for anything else.
The two compose into the canonical "retry transients, then give up gracefully" pattern. The order is load-bearing: failure isolation is the outer layer, retry is inner.
builder.add_node(
"summarize",
summarize_fn,
middleware=[
FailureIsolationMiddleware(
degraded_update={"summary": ""},
event_name="summary_degraded",
),
RetryMiddleware(RetryConfig(max_attempts=3)),
],
)Retry sits closest to the node, so it sees raw transient failures first and retries them. Only what escapes retry (an exhausted budget, or a non-transient exception retry's classifier declines) reaches the outer failure isolation, which degrades. Reverse the order and the inner isolation would swallow transients before retry ever saw them, defeating the retry entirely.
The fan-out with retry example
applies this composition as instance_middleware in its degrade
mode: each fan-out instance is wrapped isolation-outer / retry-inner,
so an instance whose retries exhaust degrades to a placeholder result
and the batch finishes instead of aborting.
- Parallel branches: per-branch middleware and its interaction with parent-graph middleware.
- Fan-out:
instance_middlewareand how it composes with parent and node-level layers. - LLMs: how transient-classification flows from provider
errors into
RetryMiddleware's default classifier. - Observability: observer events emitted around middleware-wrapped nodes carry the retry attempt index.