|
| 1 | +# Middleware |
| 2 | + |
| 3 | +Middleware wraps the dispatch of a single node. The shape is an async |
| 4 | +callable `(state, next) -> partial_update`. Anything you want to happen |
| 5 | +around a node, without changing the node itself, lives here: retries, |
| 6 | +timing, structured logging, request enrichment, error transformation, |
| 7 | +circuit-breaking. |
| 8 | + |
| 9 | +```python |
| 10 | +from collections.abc import Mapping |
| 11 | +from typing import Any |
| 12 | + |
| 13 | +from openarmature.graph import Middleware, NextCall |
| 14 | + |
| 15 | + |
| 16 | +class LogAround: |
| 17 | + async def __call__(self, state: Any, next_: NextCall) -> Mapping[str, Any]: |
| 18 | + print("before") |
| 19 | + partial = await next_(state) |
| 20 | + print("after") |
| 21 | + return partial |
| 22 | + |
| 23 | + |
| 24 | +_: Middleware = LogAround() # structural conformance check |
| 25 | +``` |
| 26 | + |
| 27 | +`next` invokes the next layer of the chain (or the wrapped node, at |
| 28 | +the innermost end) and returns the partial update from that layer. |
| 29 | +Code before `await next(state)` is the pre-node phase (runs on the way |
| 30 | +in); code after is the post-node phase (runs on the way out). |
| 31 | + |
| 32 | +## Four registration sites |
| 33 | + |
| 34 | +You can attach middleware at four places. The same `Middleware` shape |
| 35 | +works in all of them. |
| 36 | + |
| 37 | +**Per-node**, on a single function node: |
| 38 | + |
| 39 | +```python |
| 40 | +builder.add_node("fetch", fetch_fn, middleware=[RetryMiddleware()]) |
| 41 | +``` |
| 42 | + |
| 43 | +**Per-graph**, applied to every node in the graph: |
| 44 | + |
| 45 | +```python |
| 46 | +builder.add_middleware(TimingMiddleware(node_name="...", on_complete=record)) |
| 47 | +``` |
| 48 | + |
| 49 | +**Per-branch**, on a single branch of a parallel-branches node: |
| 50 | + |
| 51 | +```python |
| 52 | +from openarmature.graph import BranchSpec |
| 53 | + |
| 54 | +branches = { |
| 55 | + "sentiment": BranchSpec( |
| 56 | + subgraph=sentiment_subgraph, |
| 57 | + middleware=(RetryMiddleware(),), |
| 58 | + ), |
| 59 | + "topic": BranchSpec(subgraph=topic_subgraph), |
| 60 | +} |
| 61 | +builder.add_parallel_branches_node("classify", branches=branches) |
| 62 | +``` |
| 63 | + |
| 64 | +The branch middleware wraps the whole branch dispatch as one call. A |
| 65 | +retry on a branch retries the entire branch from scratch, not an |
| 66 | +individual node inside it. |
| 67 | + |
| 68 | +**Per-fan-out-instance**, on the instance dispatch inside a fan-out |
| 69 | +node: |
| 70 | + |
| 71 | +```python |
| 72 | +builder.add_fan_out_node( |
| 73 | + "summarize", |
| 74 | + subgraph=summarize_subgraph, |
| 75 | + items_field="articles", |
| 76 | + item_field="article", |
| 77 | + collect_field="article", |
| 78 | + target_field="summaries", |
| 79 | + instance_middleware=[RetryMiddleware()], |
| 80 | +) |
| 81 | +``` |
| 82 | + |
| 83 | +A retry here retries one instance, not the whole fan-out. |
| 84 | + |
| 85 | +## Composition order |
| 86 | + |
| 87 | +When a node has middleware from multiple sites, per-graph composes |
| 88 | +*outside* per-node. The runtime chain at a single function node is: |
| 89 | + |
| 90 | +``` |
| 91 | +[per_graph_outer_to_inner...] → [per_node_outer_to_inner...] → node |
| 92 | +``` |
| 93 | + |
| 94 | +The first middleware in `builder.add_middleware()` calls is the |
| 95 | +outermost layer; the last is closest to the node. Same rule for |
| 96 | +per-node: list order is outer-to-inner. |
| 97 | + |
| 98 | +## The subgraph boundary |
| 99 | + |
| 100 | +Middleware does not cross into a subgraph. The parent's middleware |
| 101 | +wraps the `SubgraphNode` dispatch as a single atomic call, and the |
| 102 | +subgraph's own middleware (configured on the child builder) wraps the |
| 103 | +child's internal nodes independently. |
| 104 | + |
| 105 | +In practical terms: a `RetryMiddleware` on a subgraph-as-node retries |
| 106 | +the whole child graph from its entry. A `RetryMiddleware` inside the |
| 107 | +child retries one of its individual nodes. |
| 108 | + |
| 109 | +## Error semantics |
| 110 | + |
| 111 | +An exception raised by `next(state)` propagates up through `await |
| 112 | +next(state)`. Middleware may: |
| 113 | + |
| 114 | +- **Re-raise**: the simplest case. Don't catch, let it bubble. |
| 115 | +- **Catch and recover**: catch the exception and return a partial |
| 116 | + update of your own. The rest of the chain continues as if the node |
| 117 | + had returned that partial update normally. |
| 118 | +- **Catch and transform**: catch one exception type, raise a different |
| 119 | + one. The new exception propagates up. |
| 120 | +- **Call `next` more than once**: this is what retry middleware does. |
| 121 | + |
| 122 | +A middleware MUST NOT mutate the input `state` object in place. To |
| 123 | +hand a transformed state down the chain, pass a new state instance to |
| 124 | +`next(...)`. |
| 125 | + |
| 126 | +## Built-in: RetryMiddleware |
| 127 | + |
| 128 | +```python |
| 129 | +from openarmature.graph import RetryMiddleware, exponential_jitter_backoff |
| 130 | + |
| 131 | + |
| 132 | +async def on_retry(exc: Exception, attempt: int) -> None: |
| 133 | + log.warning("retrying after %r (attempt %d)", exc, attempt) |
| 134 | + |
| 135 | + |
| 136 | +retry = RetryMiddleware( |
| 137 | + max_attempts=3, |
| 138 | + backoff=exponential_jitter_backoff, |
| 139 | + on_retry=on_retry, |
| 140 | +) |
| 141 | +``` |
| 142 | + |
| 143 | +Four plug points, all optional: |
| 144 | + |
| 145 | +- **`max_attempts`** is the total attempt count including the first |
| 146 | + call. `1` disables retry. Default `3`. |
| 147 | +- **`classifier`** is a predicate `(exception, state) -> bool`. |
| 148 | + The default (`default_classifier`, importable from |
| 149 | + `openarmature.graph`) treats any exception with a `category` |
| 150 | + attribute matching the project's `TRANSIENT_CATEGORIES` set as |
| 151 | + transient. To retry on additional types, write a classifier that |
| 152 | + delegates to `default_classifier` and falls back to your own check. |
| 153 | +- **`backoff`** is a callable `(attempt_index) -> seconds`. The default |
| 154 | + is exponential with jitter (base 1s, cap 30s, full jitter). |
| 155 | + `deterministic_backoff(seconds)` is provided for tests. |
| 156 | +- **`on_retry`** is an optional async callback `(exception, attempt) |
| 157 | + -> None`. Fires before each sleep. Useful for emitting a structured |
| 158 | + "about to retry" event. |
| 159 | + |
| 160 | +A retry's attempt counter propagates as a context variable to every |
| 161 | +node event emitted from within the retry, including nodes inside |
| 162 | +subgraphs and branches that the retry wraps transitively. So an |
| 163 | +observer logging a retried node sees `attempt=1`, `attempt=2`, etc. on |
| 164 | +the inner events. |
| 165 | + |
| 166 | +## Built-in: TimingMiddleware |
| 167 | + |
| 168 | +```python |
| 169 | +from openarmature.graph import TimingMiddleware, TimingRecord |
| 170 | + |
| 171 | + |
| 172 | +async def record(rec: TimingRecord) -> None: |
| 173 | + metrics.histogram("node_duration_ms", rec.duration_ms, tags={ |
| 174 | + "node": rec.node_name, |
| 175 | + "outcome": rec.outcome, |
| 176 | + }) |
| 177 | + |
| 178 | + |
| 179 | +builder.add_node( |
| 180 | + "fetch", |
| 181 | + fetch_fn, |
| 182 | + middleware=[TimingMiddleware(node_name="fetch", on_complete=record)], |
| 183 | +) |
| 184 | +``` |
| 185 | + |
| 186 | +`TimingMiddleware` records the wrapped chain's duration with a |
| 187 | +monotonic clock and delivers a `TimingRecord` to your async callback. |
| 188 | +The record includes `node_name`, `duration_ms`, `outcome` (`"success"` |
| 189 | +or `"exception"`), and `exception_category` (the failing exception's |
| 190 | +`category` attribute when present). |
| 191 | + |
| 192 | +Two implementation details worth knowing: |
| 193 | + |
| 194 | +- The callback fires **inline** before the chain's result returns. |
| 195 | + Slow callbacks add to the apparent node duration. Keep them fast |
| 196 | + (queue work, defer I/O). |
| 197 | +- The clock is injectable per instance via the `clock` kwarg. |
| 198 | + Test fixtures use this to supply a deterministic stub without |
| 199 | + globally patching `time.monotonic` (which would also distort |
| 200 | + asyncio's scheduling). |
| 201 | + |
| 202 | +## Related |
| 203 | + |
| 204 | +- [Parallel branches](parallel-branches.md): per-branch middleware |
| 205 | + and its interaction with parent-graph middleware. |
| 206 | +- [Fan-out](fan-out.md): `instance_middleware` and how it composes |
| 207 | + with parent and node-level layers. |
| 208 | +- [LLMs](llms.md): how transient-classification flows from provider |
| 209 | + errors into `RetryMiddleware`'s default classifier. |
| 210 | +- [Observability](observability.md): observer events emitted around |
| 211 | + middleware-wrapped nodes carry the retry attempt index. |
0 commit comments