This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.64.0). For the full docs site see openarmature.ai. For the canonical spec text see openarmature.org/capabilities. For project-specific conventions for the code you're editing, see the host project's AGENTS.md or CLAUDE.md.
OpenArmature is a workflow framework for LLM pipelines and tool-calling agents: typed state, compile-time topology checks, observability, and crash-safe checkpoints baked into a graph engine. The graph layer has no concept of LLMs or tools; the same primitives drive deterministic ETL pipelines and tool-calling agents alike. Nodes return partial updates; the engine merges into a frozen state snapshot. Behavior is defined by openarmature-spec and verified by conformance fixtures; this package is the reference Python implementation.
What OpenArmature is NOT: not a chat framework (no built-in messages channel), not an LLM SDK (Provider is the abstraction layer; OpenAIProvider is the canonical impl), not a state-management library (state is per-invocation, not application-wide), not an evaluation framework (deferred to openarmature-eval).
Sourced from openarmature-spec v0.64.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's spec.md verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the conformance.toml manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site.
The graph engine defines how a workflow is structured, how state flows between steps, and how execution progresses. It is the substrate for both deterministic LLM pipelines and LLM-driven tool-calling agents.
State. A typed schema describing the data flowing through a graph. State is a product type (a record with named, typed fields). Implementations MUST validate state against the schema at graph boundaries (entry, exit) and SHOULD validate at node boundaries.
Node. A named unit of work. A node receives the current state and returns a partial update — a mapping from field names to new values. Nodes MUST be asynchronous. A node MUST NOT mutate the state object it received; it returns a new partial update which the engine merges. In languages whose typed-state representation is effectively immutable (notably Python with Pydantic) this is directly enforceable; in languages without value-type enforcement (notably TypeScript) implementations SHOULD defend against accidental mutation via freezing or immutable data structures.
Edge. A directed connection between nodes. Edges are one of:
- Static edge — always routes from source node to a fixed destination.
- Conditional edge — a function of current state that returns the destination node name (or the sentinel
END).
Each node has exactly one outgoing edge. Branching is always expressed via a conditional edge, not by declaring multiple static edges from the same source.
END. An engine-provided sentinel value used as a routing target to halt execution. END is a distinct
engine constant, not a reserved node name, so a user node may happen to be named "END" without collision.
Reducer. A function that merges a node's partial update into the prior state for a given field. Each state
field has exactly one reducer. The default reducer is last-write-wins (the new value replaces the old).
Implementations MUST provide at least the following eight canonical reducers: last_write_wins, append
(for list-typed fields), merge (for mapping-typed fields), concat_flatten (for list-typed fields whose
updates are lists of lists — e.g., fan-out target fields collecting list-emitting per-instance values),
merge_all (for mapping-typed fields whose updates are lists of mappings — e.g., fan-out target fields
collecting dict-emitting per-instance values), bounded_append(max_len) (factory; append capped at
max_len entries with front-drop on overflow), dedupe_append(key=None) (factory; append skipping
items whose key already appears in the existing list), and merge_by_key(key) (factory; list-of-records
keyed merge — entries with a key matching an existing entry replace the existing entry in place; entries
with novel keys are appended). Users MAY register custom reducers per field.
concat_flatten semantics. concat_flatten(prior, update) returns the concatenation of prior with the
one-level flattening of update. Both prior and update MUST be lists, and every element of update MUST
itself be a list. Violations raise ReducerError per §4 (the engine MUST surface the offending field, the
reducer name, and a root-cause naming the non-list value). Empty update is a no-op (returns prior
unchanged). Empty sub-lists inside update contribute zero elements (the one-to-many fan-out case where an
instance legitimately produces zero records). Implementations MUST NOT auto-detect whether update is a list
of lists vs. a flat list — concat_flatten is strictly the two-level reducer; callers with mixed-shape
requirements MUST register a custom reducer rather than rely on shape-dependent behavior.
merge_all semantics. merge_all(prior, update) folds the sequence of mappings in update into prior,
applying the same shallow merge semantics as merge (later writes win on key conflict; non-conflicting keys
from prior are preserved). For update = [d_1, d_2, ..., d_n], the result is equivalent to applying merge
N times sequentially: merge(merge(...merge(merge(prior, d_1), d_2)...), d_n), so within update
last-write-wins applies across all N dicts (e.g., if d_2 and d_n both set key k, d_n's value wins).
prior MUST be a mapping, update MUST be a list, and every element of update MUST itself be a mapping.
Violations raise ReducerError per §4. Empty update is a no-op (returns prior unchanged). Empty mappings
inside update contribute zero keys. Implementations MUST NOT auto-detect whether update is a list of
mappings vs. a single mapping — merge_all is strictly the list-of-mappings reducer; callers needing both
behaviors on the same field MUST register a custom reducer rather than rely on shape-dependent behavior.
bounded_append(max_len) semantics. A factory returning a reducer that extends a list with the update's
items and truncates from the front (oldest entries dropped first) if the post-merge length exceeds max_len.
max_len MUST be a positive integer (≥ 1); a factory call with max_len ≤ 0 raises
reducer_configuration_invalid at field registration time. Behavior: concatenate prior + update, then if
the concatenated list's length exceeds max_len, drop entries from the front until the length equals
max_len. The bound applies to the post-merge length, not to the update's individual size — an update
larger than max_len keeps only the last max_len items of the update and the prior list is fully evicted. Both prior and update MUST be lists;
violations raise ReducerError per §4. Empty update is a no-op (returns prior unchanged) — the bound
applies to merge-time transformations, not as a prior-validation pass; prior is returned as-is even if
it somehow already exceeds max_len (matching the established concat_flatten / merge_all empty-update
pattern). Truncation MUST be from the front (oldest-first eviction) for cross-impl consistency; back-drop
is recoverable via a
custom reducer if needed. bounded_append is for cases where silent drop of evicted data is acceptable
(recent-events buffers, debug log windows, sliding metric caches); for cases where dropped data must be
summarized or transformed first (the canonical chat-history-with-LLM-summarization shape), use unbounded
append plus a separate compaction node or middleware — reducers are pure synchronous functions per the
contract above and cannot perform the IO that real compaction requires.
dedupe_append(key=None) semantics. A factory returning a reducer that extends a list with items from
the update that are not already present (by key) in the existing list. The key parameter is an optional
callable mapping an item to its dedup key; if omitted, the item itself is used as the key (requires hashable
items). Behavior: initialize a seen-keys set from prior (preserving prior unchanged in the result),
iterate update in order, and for each item compute its key — if the key is NOT yet in seen-keys, append
the item to the result and record its key; otherwise skip. Existing items appear before update items;
within each, original order is maintained. Duplicates within the update itself are filtered alongside
matches against prior — first occurrence wins (preserves left-to-right precedence consistent with
append). The computed key (the item itself when no key callable is supplied, or the value returned by
the callable) MUST be hashable; a non-hashable key raises ReducerError per §4 at merge time. A key
callable that raises on any item propagates as ReducerError. The reducer does NOT mutate existing items
(no in-place dedup of prior); only the update is filtered.
merge_by_key(key) semantics. A factory returning a reducer for list-of-records fields. Items in the
update with a key matching an existing item REPLACE the existing item in place; items with novel keys are
appended at the end of the list in the order they appear in the update. The key parameter is a required
callable mapping an item to its merge key — the spec does NOT default this; keyed merge without a key
function is meaningless and a factory call with key=None raises reducer_configuration_invalid at field
registration time. Behavior: build a key_to_idx index from prior (when prior contains duplicate keys,
the index MUST hold the LAST index for each duplicate key — implementations whose native dict construction
uses first-wins semantics MUST iterate explicitly to enforce last-wins); for each item in update, if its
key is in the index, replace the prior entry at that index with the update item; otherwise append the
update item to the result and register its key. Existing entry order MUST be preserved (replacements are
in-place); novel entries are appended in update order. Duplicate keys within the update collapse to
last-occurrence-wins (consistent with how dict updates work for repeated keys). Earlier duplicates in
prior are preserved in place — the reducer does NOT in-place dedupe existing entries (parallel to
dedupe_append's "no in-place dedup of existing" rule). The value returned by the key callable MUST
be hashable (required by the index-build step); a non-hashable return value raises ReducerError per §4
at merge time. The key callable raising on any item propagates as ReducerError. Empty update is a
no-op. merge_by_key is NOT a substitute for merge — merge
operates on dict-typed fields with shallow key-value semantics; merge_by_key operates on list-of-records
fields with item-key semantics. The qualifier _by_key distinguishes the two shapes.
Subgraph. A compiled graph used as a node inside another graph. A subgraph executes against its own state schema and produces a partial update that is merged into the parent's state. The merge uses the same reducer rules as ordinary nodes — parent reducers, applied to parent fields.
By default, no projection in occurs: the subgraph runs from the initial state defined by its own schema's field defaults, independent of the parent's current state.
Projection out defaults to field-name matching: when the subgraph completes, the values of any subgraph fields whose names match parent fields are merged into those parent fields via the parent's reducers. Subgraph fields with no matching parent field are discarded.
Explicit input/output mapping. A subgraph-as-node MAY declare an inputs mapping, an outputs mapping,
or both:
inputs: a mapping from subgraph field name → parent field name. For each entry, the parent field's current value is copied to the subgraph's corresponding field at entry. Subgraph fields not named ininputsreceive their schema-declared default — they are NOT filled by field-name matching as a fallback.outputs: a mapping from parent field name → subgraph field name. For each entry, the subgraph's final value for the named subgraph field is merged into the corresponding parent field via the parent's reducer for that field. Subgraph fields not named inoutputsare discarded — they do NOT fall through to field-name matching.
The two directions are independent: a subgraph-as-node MAY declare inputs only, outputs only, both, or
neither.
- When
inputsis absent, the default above applies: no projection in. The subgraph runs from its own schema defaults. - When
inputsis present, named parent fields are copied to their mapped subgraph fields at entry; all other subgraph fields receive their schema-declared defaults. - When
outputsis absent, the default above applies: subgraph fields whose names match parent fields are merged back via the parent's reducers; non-matching subgraph fields are discarded. - When
outputsis present, it replaces field-name matching for projection-out: only the parent/subgraph field pairs named inoutputsare merged, via the parent's reducer for the named parent field. All other subgraph fields are discarded.
This asymmetry — inputs additive, outputs replacement — is intentional. It reflects the asymmetry in
the defaults themselves: projection-in is off by default (so inputs turns it on for listed fields), while
projection-out is on by default via field-name matching (so outputs replaces it to avoid ambiguous mixed
rules).
Compilation MUST fail with category mapping_references_undeclared_field if an inputs mapping names a
parent field that is not declared in the parent's state schema, or a subgraph field that is not declared in
the subgraph's state schema. The same rule applies symmetrically to outputs. Implementations SHOULD
validate at compile time that the types of mapped parent/subgraph field pairs are compatible (per the
language's type system's notion of compatibility); this is SHOULD rather than MUST because type-system
expressiveness varies across languages.
Compiled graph. The result of compiling a graph definition. A compiled graph is immutable and executable. The entry node MUST be declared explicitly by the graph author — there is no implicit "first node added" default. Compilation MUST fail with a diagnostic error if the graph has: no declared entry node, unreachable nodes, dangling edges (references to nonexistent nodes), a node with more than one outgoing edge, or a field with more than one declared reducer.
When reporting a compile-time error, implementations MUST expose one of the following canonical category identifiers (as an error class, error code, or tagged discriminant, per the language's idiom):
no_declared_entry— no entry node was declared.unreachable_node— a declared node has no path from the entry.dangling_edge— an edge references a node name that is not declared.multiple_outgoing_edges— a node has more than one outgoing edge.conflicting_reducers— a state field has more than one declared reducer.mapping_references_undeclared_field— a subgraph-as-nodeinputsoroutputsmapping names a field not declared in the relevant state schema.reducer_configuration_invalid— a reducer factory was supplied invalid construction parameters (e.g.,bounded_append(max_len=0),merge_by_key(key=None)). Raised at field registration / graph compilation time, before any node body runs. Distinct fromconflicting_reducers, which is about the reducer-declaration shape across multiple reducers on the same field;reducer_configuration_invalidis about parameters supplied to a single reducer factory.
The pipeline-utilities capability defines a layer of cross-cutting concerns that compose with the graph-engine without modifying the engine. This first version specifies middleware — wrappers around node execution — and two canonical middleware as concrete instances: retry and timing. Both are mandated as part of the pipeline-utilities surface (§6) because their shape is non-obvious enough to warrant a normative contract; other middleware-shaped concerns (logging, resource lifecycle, circuit breakers) are implementable as middleware but are not spec-mandated.
Middleware solves the problem of code that should run around many node invocations without being duplicated in each node's body. Retry, timing, logging, instrumentation, and resource lifecycle are all middleware-shaped. Observer hooks (graph-engine §6) cover read-only observation of what happened; middleware covers control over what happens.
The pipeline-utilities capability composes on top of graph-engine. It does NOT modify graph-engine behavior — middleware sits between the engine's "node dispatch" step and the user's node function, and is invisible to nodes that don't opt into middleware.
Middleware. An async callable with the shape:
async def middleware(state, next) -> partial_update
where:
stateis the input state the wrapped node would have received (the engine's pre-merge state at the time of node dispatch).nextis an async callable taking a single argument (the state to pass to the next layer or the original node) and returning the partial update from that layer.- The middleware MUST return a partial update — a mapping of field names to new values, the same shape a node returns.
A middleware MAY:
- Call
next(state)to invoke the wrapped chain, optionally inspecting or transforming the input state first (the transformed state is passed tonext, NOT to the engine's merge step). - Inspect, augment, or replace the returned partial update before returning it.
- Short-circuit by NOT calling
nextand returning its own partial update. The rest of the chain — subsequent middleware and the wrapped node — does not execute, and this middleware's own post-phase (code followingawait next(...)) is skipped. See "Pre-node and post-node phases" below for the dual-phase model that makes this possible. - Catch exceptions raised by
next(state)and either re-raise, transform, or recover (returning a partial update instead of raising). - Call
nextmore than once (e.g., retry middleware). The state passed to subsequent calls MAY be the original or a transformed version; the middleware decides.
A middleware MUST NOT:
- Mutate the input
stateobject. The same immutability contract that applies to nodes (graph-engine §2 Node) applies to middleware. Pass a new state tonextif a transformation is needed. - Side-effect on engine internals (the reducer registry, edge map, etc.). Middleware operates only
through the
stateandnextit receives and the partial update it returns.
Middleware chain. An ordered sequence of middleware applied to a single node. The chain composes
outer-to-inner: the first middleware in the chain runs first, calls next(state) to invoke the
second, and so on, with the original node at the inner end.
For a chain [m1, m2, m3] wrapping node n, execution proceeds:
m1 sees state, calls next(s) ────► m2 sees state, calls next(s) ────► m3 sees state, calls next(s)
│
▼
n(state) → partial_update
│
m1 returns partial_update ◄──── m2 returns partial_update ◄──── m3 returns partial_update
Each middleware's return value flows back through the previous layer's next call return.
Pre-node and post-node phases. A middleware function has two phases separated by
await next(...). Code before await next is the pre-node phase, running on the way into
the chain (left-to-right in the diagram); code after await next returns is the post-node
phase, running on the way out (right-to-left). The wrapped node always runs at the innermost
point — it is never reached partway through the chain.
The two phases are tied to a single position in the chain: if m1 is outermost, m1's pre-phase
runs first AND m1's post-phase runs last. Pre-order and post-order are not configured
independently. Concretely, a middleware function carries both phases:
async def my_middleware(state, next):
# ── pre-node phase: runs on the way IN ──
started_at = time.time()
partial_update = await next(state) # the rest of the chain (and eventually the node) runs here
# ── post-node phase: runs on the way OUT ──
log(f"node took {time.time() - started_at}s")
return partial_update
This is the standard middleware shape used by Express, Koa, ASGI, Tower, Django middleware, and similar frameworks.
The LLM provider capability defines a uniform request/response surface for sending messages to a Large Language Model and receiving its response. It is the substrate every higher-level LLM capability composes against — tool systems, prompt management, evaluation harnesses, agent loops.
The substrate is intentionally narrow:
- A provider is stateless. It does not maintain conversation history; the caller passes the full message list on every call.
- A provider does not loop on tool calls. If the assistant returns tool calls, the caller is
responsible for executing the tools and making a follow-on
complete()with the results. - A provider does not handle retry, rate limiting, fallback, or routing. Those are pipeline- utilities concerns and compose above the provider via middleware.
- A provider is bound to a single model identifier. Switching models means constructing a new provider, not passing a different argument per call. (Implementations MAY offer convenience factories that produce per-model providers from shared credentials; that is a constructor concern, not a behavioral one.)
Every constraint above is a deliberate scope cut. The narrower the provider surface, the easier it is to swap implementations, mock for tests, and stack pipeline utilities on top.
Transparency. Per charter §3.1 principle 8 ("Transparency over abstraction"), the provider
abstraction surfaces a normalized shape — Message, Tool, Response — without hiding what the
underlying provider returned. The Response record carries the parsed provider response verbatim
alongside the normalized fields (§6 raw), and the §7 error categories preserve the underlying
provider exception as cause. Users who need provider-specific fields (logprobs, content-filter
details, vendor-specific extensions) reach through the abstraction directly; structure is added,
never removed.
Message. A typed entry in a conversation. The four message kinds are system, user,
assistant, and tool. Each kind carries kind-specific content as defined in §3.
Tool. A function the model may request the user execute. A tool definition is a record of name,
description, and parameters (a JSON Schema describing the argument shape).
Tool call. A request from an assistant message to invoke a named tool with structured arguments.
The user is responsible for executing the tool and returning the result via a tool message bearing
the corresponding tool_call_id.
Provider. An object that, given a sequence of messages and an optional set of tools, returns a
single assistant message wrapped in a Response. A provider is bound to a specific model identifier.
Response. The result of a provider call: the assistant message, a finish reason, and usage information.
The observability capability defines normative mappings from OpenArmature's runtime event surface (graph-engine §6 observer events, specifically the v0.6.0 started/completed event pairs) into well-known external observability backends. The substrate is provider-neutral; the capability is where each concrete backend's translation lives.
This spec defines two concrete backend mappings: the OpenTelemetry mapping in §3–§7 and the Langfuse mapping in §8. Future proposals add additional backends as further sibling sections of this same spec; the OTel mapping serves as the reference shape for cross-backend equivalence.
The capability does NOT introduce new graph-engine primitives. It consumes the existing observer
event stream — started events open spans, completed events close them. An implementation that
emits OTel spans (or Langfuse observations, per §8) is built on top of §6, not into the engine.
Span. A unit of work in OTel — a logically distinct interval with a name, start/end timestamps, status, attributes, and parent-child relationships. The mapping translates each user-meaningful unit of work in a graph invocation (the invocation itself, each subgraph, each node execution, each fan- out instance) into a span.
Span attributes. Key/value pairs attached to a span. OTel attribute values are restricted to
scalar types (string, int, float, bool) and arrays thereof. The mapping uses dotted-key namespaces
under the prefix openarmature..
Span status. OTel spans carry a status of OK, ERROR, or UNSET. The mapping translates
graph-engine §4 error categories into status ERROR with a category-bearing description.
Trace. OTel's term for a complete tree of spans rooted at a single trace ID. By default, one
outermost graph invocation produces one trace; subgraphs (whether composed via
add_subgraph_node or instantiated by a fan-out per pipeline-utilities §9) participate in the
parent invocation's trace as nested spans. Implementations MUST also support an opt-in
detached mode for specific subgraphs or fan-outs (§4.4), where the subgraph or fan-out gets
its own trace and the parent's dispatch span carries an OTel Link to that new trace.
Correlation ID. A per-invocation identifier that flows across observability backends.
Distinct from invocation_id — the invocation_id (caller-supplied or framework-generated, per
§5.1) correlates spans within a single backend, while correlation_id is application-supplied
(or auto-generated when absent)
and is intended to be visible in every backend the implementation emits to. A user running an
LLM workflow with both an OTel backend (system traces, logs) and a Langfuse backend
(LLM-specific traces) uses the correlation_id as a join key between them: find a slow request
in Langfuse, search for its correlation_id in OTel logs, and see the surrounding
infrastructure activity. See §3 (architectural contract), §5.6 (OTel attribute realization),
and §8.5 (Langfuse attribute realization).
The prompt-management capability defines the contract by which named, versioned templates are fetched from one or more backends, rendered with caller-supplied variables, and turned into LLM-ready message sequences. The spec establishes the contracts; implementations and sibling-package backends ship the concrete forms.
The capability composes with the llm-provider capability (a PromptResult carries
Message records per llm-provider §3) and with the observability capability (rendered
prompts carry stable identity that observer events MAY surface).
This capability does NOT define:
- The templating language or syntax (Jinja2 in Python, handlebars / template literals in TypeScript — per implementation).
- Specific backend implementations beyond a minimum local-filesystem reference.
- Prompt versioning workflows (the spec defines a
versionfield onPrompt; how versions are assigned, incremented, or pinned is per-project discipline). - Cache invalidation policies (the spec defines hashes that user code MAY use as cache keys; the cache itself is out of scope).
Prompt. An unrendered template plus its identity metadata. A prompt is what a backend returns from a fetch; it carries enough information to be rendered, traced, and content-addressed without a backend round-trip.
PromptResult. The rendered output of applying variables to a prompt. Carries the
rendered Message sequence (per llm-provider §3) plus the prompt's identity metadata
(propagated from the source Prompt) plus a rendered_hash that captures the rendered
content.
PromptManager. The user-facing API. Composes one or more PromptBackends and exposes
fetch + render operations. Users interact with the manager; backends are an
implementation detail of the manager's construction.
PromptBackend. The protocol implementations and sibling packages plug into. Defines a single operation: fetch a prompt by name and label. Backends do not render; rendering is the manager's concern.
PromptGroup. A composition pattern for tracing related prompts together: an ordered
sequence of PromptResult instances that should appear under one logical grouping in
observability. The canonical N=2 case is "classifier + follow-up"; longer chains
(multi-stage classification, RAG with reranking, self-correction loops, map-reduce over
chunks) work under the same primitive. The group is a thin wrapper over its members and
a span-grouping convention; it is not a fetch or render primitive and performs no
orchestration.
Fetch vs. render distinction. Fetching retrieves the template; rendering applies variables. Splitting the two operations lets users:
- Inspect a template without binding variables (useful for tooling, schema validation, prompt-version diffs).
- Cache templates separately from rendered output (template fetch is the I/O-bound step; rendering is local).
- Render the same template with different variables in tight loops without re-fetching.
A convenience operation that combines fetch + render is permitted (see §6) but the spec treats fetch and render as separable.
Recipes that compose the primitives. Not framework contracts — these are how to do common things idiomatically.
Problem. How do I skip a node whose external output already exists?
A small custom middleware wraps the
node. Before calling next_(state), the middleware checks "does
my output already exist?" (a filesystem file, a database row, a
content-addressable store entry). If yes, it returns the cached
output as the partial update directly. If no, it calls next_
and returns the result.
The node sees its normal (state) → partial_update contract.
The middleware is the only thing that knows about idempotency;
all callers of the node compose with it cleanly.
import os
from collections.abc import Mapping
from typing import Any
from openarmature.graph import GraphBuilder, NextCall, State
class BypassIfRendered:
"""Skip the node if its rendered output already exists on disk."""
def __init__(self, output_field: str, key_field: str, root: str):
self.output_field = output_field
self.key_field = key_field
self.root = root
async def __call__(
self, state: Any, next_: NextCall
) -> Mapping[str, Any]:
key = getattr(state, self.key_field)
path = f"{self.root}/{key}.bin"
if os.path.exists(path):
with open(path, "rb") as f:
return {self.output_field: f.read()}
partial = await next_(state)
# ... persist partial[self.output_field] to path here, or
# have the node itself write the file ...
return partial
class RenderState(State):
scene_id: str
rendered_frame: bytes = b""
builder = (
GraphBuilder(RenderState)
.add_node(
"render",
render_frame_fn,
middleware=[
BypassIfRendered(
output_field="rendered_frame",
key_field="scene_id",
root="./renders",
)
],
)
# ... rest of graph ...
)The middleware composes with the framework's four registration sites: attach it per-node (as above), per-graph, per-branch, or per-fan-out-instance, depending on the scope of the bypass.
- The node's work is expensive and idempotent given the same key (rendering a frame, calling an external API with content- addressable output, downloading a file).
- The "does it exist" check is cheap (a filesystem
stat, a RedisEXISTS, a database key lookup). - You're OK with the node being skipped silently; the partial update returned by the middleware is indistinguishable from a successful node run.
- The check itself is expensive enough that you'd rather just run the node. The cost model inverts; the pattern is wrong.
- You need to force re-execution on demand (cache invalidation).
Add a
force_rerun: boolfield on state that the middleware consults. But if you're doing that often, the bypass logic belongs in the node itself, gated on a state field, not in middleware. - The cached output's freshness depends on inputs the middleware can't see (downstream state, time-of-day, etc.). Use a dedicated caching layer instead of reimplementing cache invalidation in the middleware.
- Middleware: middleware shape, the four registration sites, composition.
- Spec: pipeline-utilities
This pattern is explicitly called out in proposal 0008's Alternatives considered section as a userland recipe rather than spec'd behavior; this page is its canonical home.
Problem. A service runs the same graph for many tenants / requests / feature flag cohorts. How do you tag every span and trace so downstream observability (Honeycomb, Datadog, Langfuse, HyperDX, Grafana Tempo) can filter by tenant or join across services without each node having to thread the identifiers through manually?
Pass a metadata dict to invoke(). The framework propagates each
entry to every observability backend at once: the OTel observer
emits each entry as an openarmature.user.<key> cross-cutting span
attribute on every span (invocation, node, subgraph wrapper,
fan-out instance, LLM provider), and the Langfuse observer merges
each entry as a top-level key into trace.metadata AND every
observation's metadata. Backends that consume OTel attributes pick
the entries up for free; backends with typed metadata fields get
them via per-backend propagation.
For metadata that's only known mid-flight (an ID resolved by an
LLM-classification node, a derived feature flag), use
set_invocation_metadata from inside a node. The augmentation
respects fan-out / parallel-branches per-instance scoping per
proposal 0045, so each instance's update lives in its own
async-context copy and doesn't leak to siblings.
import asyncio
from openarmature.graph import END, GraphBuilder, State
from openarmature.observability import set_invocation_metadata
class RequestState(State):
query: str = ""
answer: str = ""
async def answer(s: RequestState) -> dict:
# An entry resolved mid-invocation propagates to subsequent spans
# in the same async-context: this node's `completed`, the LLM
# provider span if any, and onwards. Sibling fan-out instances
# and parallel-branches branches see their own copies.
set_invocation_metadata(modelTier="standard")
return {"answer": "Apollo 13 aborted due to an O2 tank failure."}
graph = (
GraphBuilder(RequestState)
.add_node("answer", answer)
.add_edge("answer", END)
.set_entry("answer")
.compile()
)
async def main() -> None:
final = await graph.invoke(
RequestState(query="why did Apollo 13 abort?"),
metadata={
"tenantId": "acme-corp",
"requestId": "req-12345",
"featureFlag": "v2-canary",
},
)
print(final.answer)
asyncio.run(main())Every span emitted during this invoke() carries
openarmature.user.tenantId="acme-corp",
openarmature.user.requestId="req-12345", and
openarmature.user.featureFlag="v2-canary". Spans inside the
answer node (and any downstream nodes if the graph had more)
additionally carry openarmature.user.modelTier="standard" from
the set_invocation_metadata call.
Validation runs synchronously, before any node body fires. Both
invoke(metadata=...) and set_invocation_metadata(...) enforce
the same rules:
- Keys MUST NOT start with
openarmature.orgen_ai.(reserved namespaces per the spec). - Keys MUST NOT collide with the spec's reserved per-trace metadata
keys (
correlation_id,entry_node,spec_version, etc.). The set is enforced at theinvoke()andset_invocation_metadataboundaries via the validator inopenarmature.observability.metadata; it grows per spec proposals 0041 / 0042, with the canonical list in the spec's observability §3.4. - Values MUST be OTel-attribute-compatible scalars (
str/int/float/bool) or homogeneous arrays of those.
Violations raise ValueError at the boundary. Failing loud at
construction is better than the bare-key silently clobbering a
spec-reserved key in flat Langfuse trace.metadata.
- One service runs the same graph for many distinct callers (multi-tenant SaaS, per-customer feature flags, A/B test cohorts).
- Downstream observability needs to filter or join on caller-side identifiers (tenant ID for billing dashboards, request ID for cross-service trace stitching, feature flag for experiment analysis).
- You don't want each node to know about tenancy. The metadata flows through the framework, not the node bodies.
- The identifier is a per-node decision, not a per-invocation one.
If different nodes in the same invocation produce different
values, that's typed state, not invocation metadata. Put it on
the
Stateschema with a clear reducer. - The value isn't a scalar or homogeneous array. The boundary validation rejects complex shapes; if you need to attach a nested object, serialize it to a JSON string before passing.
- The value contains PII you don't want in every span. Metadata is unconditionally emitted everywhere the observers run; filter at the caller or skip the propagation for those keys.
- Observability concept page: how OTel attributes and Langfuse metadata propagate.
examples/langfuse-observability: runnable example exercising the metadata propagation path.- Spec: observability, the propagation contract for caller-supplied metadata.
Problem. A custom observer needs to thread per-call state
between a node's started and completed events: measure
duration, capture request/response payloads, attach a custom ID
that downstream uses. The engine doesn't carry a correlation field
across the pair (it doesn't need one for its own logic, since
events arrive serially per spec §6). How does the observer
reconcile which completed matches which started?
The pair identity is the tuple
(namespace, branch_name, attempt_index, fan_out_index). That
tuple is unique within an invocation: the namespace separates
subgraph wrappers from their parents, branch_name distinguishes
parallel-branches branches, attempt_index distinguishes retried
attempts of the same node, and fan_out_index distinguishes
per-instance fan-out copies. Carry per-invocation state in
dict[invocation_id, dict[tuple, value]], look up on completed,
and sweep the outer entry when the per-invocation sub-dict
empties.
Both branch_name and fan_out_index matter even for nodes that
look "the same" by name: a node score inside parallel-branches
branch=fast vs branch=slow produces two distinct pair
identities, and a per-instance fan-out copy at fan_out_index=3 is
not the same as fan_out_index=4.
import time
from typing import NamedTuple
from openarmature.graph import NodeEvent
from openarmature.observability.correlation import current_invocation_id
PairKey = tuple[tuple[str, ...], str | None, int, int | None]
class StepTiming(NamedTuple):
node_name: str
namespace: tuple[str, ...]
branch_name: str | None
attempt_index: int
fan_out_index: int | None
duration_s: float
class StepTimingObserver:
"""Custom observer that records wall-clock duration per node
attempt. Stitches started -> completed via the per-invocation
pair-identity dict.
"""
def __init__(self) -> None:
# invocation_id -> {pair_key: start_monotonic}
self._pending: dict[str, dict[PairKey, float]] = {}
# Final per-call timings, surfaced to whatever consumes them
# (metrics exporter, log line, in-test assertion).
self.timings: list[StepTiming] = []
async def __call__(self, event: NodeEvent) -> None:
invocation_id = current_invocation_id()
if invocation_id is None:
return
key: PairKey = (
event.namespace,
event.branch_name,
event.attempt_index,
event.fan_out_index,
)
if event.phase == "started":
self._pending.setdefault(invocation_id, {})[key] = time.monotonic()
return
if event.phase == "completed":
start = self._pending.get(invocation_id, {}).pop(key, None)
if start is not None:
self.timings.append(
StepTiming(
node_name=event.node_name,
namespace=event.namespace,
branch_name=event.branch_name,
attempt_index=event.attempt_index,
fan_out_index=event.fan_out_index,
duration_s=time.monotonic() - start,
)
)
# Sweep when the dict empties for this invocation.
if not self._pending.get(invocation_id):
self._pending.pop(invocation_id, None)Attach with graph.attach_observer(StepTimingObserver()). Run
the invocation; the observer's timings list carries one entry
per node attempt with its duration and identifying tuple.
- A custom observer needs paired-event state that the spec doesn't carry across the pair.
- The pair identity needs to be unique across fan-out instances or
parallel-branches branches; a key shape that omits
branch_nameorfan_out_indexwould collide. - Long-running services need the dict to drain naturally as invocations complete. The "sweep when sub-dict empties" pattern prevents the outer dict from growing per-invocation forever.
- You only need a final-summary signal at invocation completion.
Subscribe to the invocation
completedevent and read the final state directly; no per-call reconciliation needed. - The
OTelObserverorLangfuseObserveralready provides what you want. Both stitchstarted/completedinternally to open / close spans; you don't need a custom observer to track timings if a span carries the duration already. - The metric is cross-invocation. A pair-identity dict scoped to a single invocation_id won't aggregate; use a global counter or push to an external metrics backend instead.
- Observability concept page: the
NodeEventshape,started/completedlifecycle. - Caller-supplied trace identifiers: adjacent pattern for tagging the events your observer sees.
- Spec: graph-engine,
observer events and the uniqueness invariants for
(namespace, branch_name, attempt_index, fan_out_index).
Problem. How do I start the graph at an arbitrary node?
You don't. Make the "entry point" a state-level parameter instead. A first router node passes through, and a conditional edge routes to wherever execution should begin. The graph stays a single graph; what differs across runs is which branch the conditional edge takes.
Combine with checkpointing if you want resume-style behavior: skip nodes whose work is already captured in state.
from openarmature.graph import END, EndSentinel, GraphBuilder, State
class MissionState(State):
starting_stage: str = "plan" # "plan" | "execute" | "report"
plan: str = ""
execution_log: str = ""
report: str = ""
def route_from_starting_stage(s: MissionState) -> str | EndSentinel:
return s.starting_stage
async def router(s: MissionState) -> dict:
return {} # no state change; conditional edge below routes
async def plan(s: MissionState) -> dict:
return {
"plan": "Apollo-style free-return trajectory.",
"starting_stage": "execute",
}
async def execute(s: MissionState) -> dict:
return {"execution_log": "Burn complete. Trajectory nominal."}
async def report(s: MissionState) -> dict:
return {"report": "Mission objectives met."}
builder = (
GraphBuilder(MissionState)
.add_node("router", router)
.add_node("plan", plan)
.add_node("execute", execute)
.add_node("report", report)
.add_conditional_edge("router", route_from_starting_stage)
.add_edge("plan", "execute")
.add_edge("execute", "report")
.add_edge("report", END)
.set_entry("router")
)
graph = builder.compile()
### Start at the beginning:
await graph.invoke(MissionState())
### Or skip straight to execute, with the plan already in state:
await graph.invoke(MissionState(starting_stage="execute", plan="..."))The caller pre-populates starting_stage (and any prerequisite
fields the chosen branch needs) and the graph routes accordingly.
- You have a few canonical entry points and the choice between them is data, not control flow.
- You want to skip work already done in a prior run; combine with checkpointing to pick up where you left off.
- Your "different entry points" share state structure and most of the downstream graph.
- "Start at node X" really means "run a different pipeline." Then it's a different compiled graph. Don't bend one graph into two; two graphs are easier to test and reason about.
- The number of entry points grows unboundedly. Then you're reimplementing routing; consider a higher-level dispatch layer that picks which graph to invoke.
Problem. How do I keep multi-turn agent state across turns?
The framework's checkpointing
provides single-invocation crash resume out of the box. Multi-turn
state is the same primitive used differently: the application
keeps a stable session_id → invocation_id mapping, and each
turn calls invoke(resume_invocation=<prior_invocation_id>) to
pick up where the previous turn left off.
The checkpointer returns the prior state. The new turn proceeds
from there. Session-context fields that accumulate across turns
(message history, retrieved facts, running totals) use a merge
or append reducer so each turn's contribution adds to what's
already there rather than replacing it.
Each resume mints a new invocation_id; the session_id is the
join key the application maintains, typically as the
correlation_id on invoke() (which is preserved unchanged
across resume).
from typing import Annotated
from pydantic import Field
from openarmature.checkpoint import SQLiteCheckpointer
from openarmature.graph import END, GraphBuilder, State, append, merge
from openarmature.llm import Message
class SessionState(State):
messages: Annotated[list[Message], append] = Field(default_factory=list)
facts: Annotated[dict[str, str], merge] = Field(default_factory=dict)
last_user_input: str = ""
### ... define nodes that read s.messages, append to s.messages,
### and merge into s.facts ...
checkpointer = SQLiteCheckpointer(path="./sessions.db")
graph = (
GraphBuilder(SessionState)
.add_node("plan", plan)
.add_node("respond", respond)
.add_edge("plan", "respond")
.add_edge("respond", END)
.set_entry("plan")
.with_checkpointer(checkpointer)
.compile()
)
### The application maintains its own session table mapping
### session_id -> latest invocation_id. OA's checkpointer doesn't
### know about sessions; the join is the application's
### responsibility. The session_id doubles as correlation_id so
### observability traces share the cross-turn join key.
async def handle_turn(session_id: str, user_input: str) -> str:
initial = SessionState(last_user_input=user_input)
prior_invocation_id = sessions_db.get_invocation_id(session_id)
if prior_invocation_id is None:
final = await graph.invoke(initial, correlation_id=session_id)
else:
final = await graph.invoke(
initial, resume_invocation=prior_invocation_id
)
# Record the new invocation_id for next turn's resume.
# Read it from the checkpointer's latest record for this
# correlation_id; exact lookup is application-side bookkeeping.
sessions_db.set_invocation_id(session_id, latest_for(session_id))
return final.messages[-1].contentsessions_db is your application's session-state store (Postgres,
Redis, a flat file, whatever); the checkpointer holds the OA-side
state and the session table holds the join keys.
- Your application has long-lived sessions with multiple LLM turns and you want the prior state to be the starting point of the next turn.
- You're already running a checkpointer for crash resume; this pattern is "use it more."
- Cross-turn state has clean reducer semantics:
mergefor accumulating dicts,appendfor growing lists.
- A session's "state" is bigger than fits comfortably in a single graph state shape. Split into multiple graphs and share an external store keyed by session.
- Turns are completely independent; there's no value in carrying state across them. Then just run each turn as a fresh invoke.
- The application already has its own state-management layer that conflicts with OA's frozen-state model. Use OA per-turn without cross-turn resume.
- Checkpointing: backend wiring,
resume_invocation, schema migration. - State and reducers:
mergeandappendreducer strategies. examples/checkpointing-and-migration: single-resume baseline.- Spec: pipeline-utilities
Problem. A long-running pipeline has saved checkpoints mid-flight. You add a field to the state schema and rename another. How do older checkpoints resume against the new schema without each node body having to handle both shapes?
Tag the state class with a schema_version and register migration
callables at compile time via GraphBuilder.with_state_migration.
On resume, the engine inspects the loaded record's schema_version,
walks the registered chain (v1 → v2 → v3 → …), and hands node
bodies a fully-migrated state object. Node code stays single-shape;
all version-aware logic lives in the migration functions.
The migration callable's typed signature is Callable[[Any], Any].
For JSON-backed checkpointers (the only kind that supports
migration; see Checkpointing),
that resolves to (state_dict: dict) -> dict: the callable
receives the deserialized record and returns the new shape. The
from_version and to_version are registered alongside the
callable on with_state_migration; the callable itself stays
signature-light because migrations MUST be pure (no implicit
version-dispatch logic inside the function body). The engine
dispatches a checkpoint_migrated observer event after each
migration step so OTel / Langfuse spans can correlate the migration
with the resume.
from typing import ClassVar
from openarmature.checkpoint import SQLiteCheckpointer
from openarmature.graph import END, GraphBuilder, State
### v2 schema: renamed `step_count` -> `steps_completed` and added
### `last_node`. Old v1 checkpoints carry `step_count` and lack
### `last_node` entirely.
class PipelineState(State):
schema_version: ClassVar[str] = "2"
query: str = ""
steps_completed: int = 0
last_node: str | None = None
def _migrate_v1_to_v2(state_dict: dict) -> dict:
# Rename: step_count -> steps_completed. Default missing
# last_node to None (the v2 schema allows it).
state_dict["steps_completed"] = state_dict.pop("step_count", 0)
state_dict.setdefault("last_node", None)
return state_dict
async def _step(s: PipelineState) -> dict:
return {"steps_completed": s.steps_completed + 1, "last_node": "step"}
### ``serialization="json"`` is required for migration to operate on a
### dict; the default ``"pickle"`` mode round-trips through class
### identity and can't migrate across schemas.
compiled = (
GraphBuilder(PipelineState)
.add_node("step", _step)
.add_edge("step", END)
.set_entry("step")
.with_checkpointer(SQLiteCheckpointer("ck.db", serialization="json"))
.with_state_migration("1", "2", _migrate_v1_to_v2)
.compile()
)
### Later, on resume:
### final = await compiled.invoke(
### PipelineState(), # overwritten by the loaded checkpoint
### resume_invocation=prior_invocation_id,
### )When the chain spans multiple versions (v1 → v2 → v3), register
each step separately with repeated with_state_migration calls;
the engine walks them in version order. Gaps fail loudly: if v1→v2
and v3→v4 are registered but a record loads at v2 needing v3, the
engine raises CheckpointStateMigrationMissing at resume time
rather than silently using a partial schema.
- A schema change lands while in-flight checkpoints exist. Without migrations, those resume attempts would fail validation at the state-merge boundary.
- The change is shape-altering (rename, type change, field add/remove) rather than purely additive with a safe default. A bare field add with a Pydantic default doesn't need migration; Pydantic fills it in on load.
- You want resume to be transparent to node bodies. Migrations let each node body assume the current schema unconditionally.
- Adding a field with a safe default and NOT bumping
schema_version. Pydantic's default handling resolves the missing field at load. Bumpingschema_versionwithout a corresponding migration is fail-loud: the engine raisesCheckpointStateMigrationMissingat resume rather than silently skipping. If you bump the version, register an identity migration (a callable that returns the dict unchanged) to make the additive intent explicit. - Migrations need to call the LLM or do other slow / fallible work.
The migration runs synchronously during resume; long-running work
belongs in a dedicated
recomputenode guarded by bypass-if-output-exists, not in a migration callable. - Schema changes are happening on every release. Migration
callables accumulate fast; if the cadence is high enough that
v1→v2→v3→…→v9 starts to feel like a chain, consider whether the
schema would benefit from being more open at the seams (e.g. a
metadata: dict[str, Any]field for evolving auxiliary data instead of dedicated columns).
- Checkpointing concept page: checkpointer backends and the resume contract.
session-as-checkpoint-resume: multi-turn agent state via the same checkpointer machinery.- Spec: pipeline-utilities,
the state-migration contract and
checkpoint_migratedevent.
Problem. How do I run an agent tool-call loop?
A node reads the assistant's last tool_calls from the running
message list, dispatches each to a local Python function, appends
ToolMessage records back to the message list via an
append reducer, and a
conditional edge loops back to the
LLM node if the model wants more turns. The exit is the
conditional edge routing to a present node (or END) when the
assistant returns no tool_calls.
No "agent framework" abstraction; the loop is just a graph cycle
on top of Tool, ToolCall, ToolMessage.
import json
from typing import Annotated
from pydantic import Field
from openarmature.graph import END, EndSentinel, GraphBuilder, State, append
from openarmature.llm import AssistantMessage, Message, Tool, ToolMessage
class AgentState(State):
messages: Annotated[list[Message], append] = Field(default_factory=list)
turn: int = 0
TOOLS = [
Tool(
name="lookup_mission",
description="Look up Apollo or Artemis mission facts.",
parameters={
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
),
]
MAX_TURNS = 5
async def call_llm(s: AgentState) -> dict:
response = await provider.complete(s.messages, tools=TOOLS)
return {"messages": [response], "turn": s.turn + 1}
async def dispatch_tools(s: AgentState) -> dict:
assistant = s.messages[-1]
assert isinstance(assistant, AssistantMessage)
results: list[Message] = []
for tc in assistant.tool_calls or ():
output = await dispatch_one(tc.name, tc.arguments) # str or JSON-serializable
content = output if isinstance(output, str) else json.dumps(output)
results.append(ToolMessage(content=content, tool_call_id=tc.id))
return {"messages": results}
def route_after_llm(s: AgentState) -> str | EndSentinel:
if s.turn >= MAX_TURNS:
return "present"
last = s.messages[-1]
if isinstance(last, AssistantMessage) and last.tool_calls:
return "dispatch_tools"
return "present"
async def present(s: AgentState) -> dict:
return {} # final formatting / output
builder = (
GraphBuilder(AgentState)
.add_node("call_llm", call_llm)
.add_node("dispatch_tools", dispatch_tools)
.add_node("present", present)
.add_conditional_edge("call_llm", route_after_llm)
.add_edge("dispatch_tools", "call_llm")
.add_edge("present", END)
.set_entry("call_llm")
)
graph = builder.compile()The MAX_TURNS cap prevents runaway loops; the conditional edge
short-circuits to present when the cap is hit or when the model
returns no tool_calls.
See examples/tool-use for a
runnable version with full tool definitions, defensive handling
for malformed ToolCall.arguments, and trace output.
- The model needs to call local Python functions and react to their results.
- The loop is bounded, either by
MAX_TURNS, by the model signaling it's done, or by both. - Tool results are textual or JSON-serializable and fit cleanly
into
ToolMessage.content.
- Tools have side effects you can't replay safely on resume. Wrap each side-effecting tool with the bypass-if-output-exists pattern so a crashed run resumes without re-side-effecting.
- The "tools" are long-running async pipelines, not function calls. Model them as subgraphs and let the LLM node route via conditional edge to the right subgraph; the loop shape is the same but each "tool" is a full pipeline.
- You need streaming tool results back to the model mid-call. The
current
Tool/ToolMessageshape is request/response; streaming is out of scope for this pattern.
- LLMs concept page:
Tool,ToolCall,ToolMessagetypes and thecomplete(messages, tools=...)contract. - State and reducers:
appendreducer semantics. examples/tool-use: runnable reference implementation.- Spec: llm-provider
Recipes that aren't deducible from the API surface alone. The primitives docs tell you what's possible; this section tells you what's smart.
State fields default to last_write_wins: each node's write replaces the prior value for that field. For scalar fields (status: str, count: int) that's usually what you want. For list fields that accumulate contributions across multiple nodes (messages: list[Message], events: list[Event], results: list[Result]), it's the wrong default; every node's contribution silently clobbers everything before it.
Declare append (or another non-clobbering reducer) at the state class:
from typing import Annotated
from pydantic import Field
from openarmature.graph import State, append
class WorkflowState(State):
messages: Annotated[list[Message], append] = Field(default_factory=list)
events: Annotated[list[Event], append] = Field(default_factory=list)
final_status: str = "pending" # last_write_wins is fine hereThe failure mode without append is silent and easy to misdiagnose: the final state shows only the last node's contribution to the list, with no error. Common "why is my accumulator empty?" question. merge is the equivalent for dict[str, V] fields that accumulate keys across nodes.
After await provider.complete(messages, tools=[...]) returns, the shape of Response varies by finish_reason:
finish_reason == "stop": assistant produced a content response.message.contentcarries the text;message.tool_callsis empty.finish_reason == "tool_calls": assistant emitted tool calls.message.tool_callscarries the list;message.contentis typically empty (model didn't say anything beyond the tool calls).finish_reason == "length"/"content_filter"/"error": completion was cut off or refused;message.contentmay be partial or empty.
Post-LLM logic that reads message.content without checking finish_reason misses the entire tool-calling path:
response = await provider.complete(messages, tools=tools)
if response.finish_reason == "tool_calls":
# Dispatch each tool call, append ToolMessage responses, re-call complete()
for tc in response.message.tool_calls:
result = dispatch_tool(tc.name, tc.arguments)
messages.append(ToolMessage(content=result, tool_call_id=tc.id))
response = await provider.complete(messages, tools=tools)
elif response.finish_reason == "stop":
handle_text(response.message.content)
else:
handle_error_or_partial(response)The discriminator is one branch; missing it gives you empty data on tool-call responses and silently wrong behavior on truncations.
The OTelObserver (and any spec-conformant observer reading LLM events) defaults disable_provider_payload: bool = True per spec §5.5's "default-off by privacy" framing. Without flipping the flag, LLM spans carry GenAI semconv attributes (token counts, model name, finish reason) but NOT the message payload (input messages, response content, request extras).
That's the right default for general OpenArmature use: payloads may contain PII the user hasn't audited, and storage cost grows with prompt size. But it's the WRONG default if you're wiring up an LLM-aware observability backend (Langfuse, Phoenix, Honeycomb's LLM lens) that renders the message stream as part of its generation view. Backends will show "empty" generations and you'll wonder why.
Flip the flag once at observer construction:
from openarmature.observability import OTelObserver
observer = OTelObserver(
span_processor=your_exporter,
disable_provider_payload=False, # opt in to message-payload attributes
)
graph.attach_observer(observer)The companion disable_genai_semconv flag defaults to False: GenAI semconv attributes emit by default since they're how LLM-aware backends render anything at all. Don't flip that one unless you're routing GenAI emission through a different layer.
The temptation when persisting graph state is to json.dumps(state.model_dump()) and write to a file. Don't. The shipped Checkpointer backends handle every contract openarmature.checkpoint.Checkpointer defines: round-trip integrity, parent_states for inner-save resume, fan-out progress tracking, schema-version migration, listing by correlation_id, CheckpointRecordInvalid on shape drift. A hand-rolled serializer that "works" on the happy path silently fails the moment a fan-out crash leaves an in-flight save record, and you'll be debugging it for hours before realizing the bundled backend exists.
If your storage requirement isn't local disk (FilesystemCheckpointer) or local SQLite (SQLiteCheckpointer, which also supports :memory: and arbitrary file paths), implement the Checkpointer Protocol against your backend rather than wrapping state serialization yourself. Custom backends inherit the spec's correctness contract for free.
A common shape is "after this LLM call, route to either a JSON-extraction node or a tool-dispatch node depending on finish_reason." The naive solution is two conditional edges from the LLM node, one to each downstream. That works for two branches; it scales poorly past three.
When the branches operate on different sub-shapes of state (e.g., one path is "extract JSON, then validate" while another is "dispatch tools, loop until done, then summarize"), encapsulate each as a SubgraphNode and route from the LLM node to the right subgraph. Each subgraph has its own state schema (projected from the parent), its own entry node, and its own internal topology. The parent graph becomes a switchboard with a few edges; the complexity lives one layer down where it composes cleanly.
OpenAIProvider.ready() exercises chat/completions by default; opt back into the catalog-only probe for cost-sensitive callers
OpenAIProvider(..., readiness_probe=...) accepts "chat_completions" (default), "models", or "both". The default issues POST /v1/chat/completions with a max_tokens=1 body so a green ready() actually proves the inference wire path works, not just that the catalog endpoint answers. The motivating failure class: OpenAI-compatible proxies (Bifrost is the field-reported case) that return 200 on GET /v1/models while 405'ing the completions endpoint, so the previous catalog-only default reported ready and every real call broke. The "models" opt-in is the old behavior, useful for cost-sensitive cloud callers where every ready() would otherwise bill one prompt's worth of tokens. "both" runs catalog then chat, giving the strongest signal at double the cost. Non-200 responses on either probe route through classify_http_error, so the canonical error categories (ProviderAuthentication, ProviderUnavailable, ProviderInvalidModel, etc.) surface consistently regardless of which probe ran.
Provider.complete(messages, tools, tool_choice=...) accepts "auto", "required", "none", or a ForceTool(name=...) record. When you omit tool_choice, the OpenAI provider's own default applies (usually "auto" when tools is non-empty, but documented per-provider). A pipeline that wants deterministic tool-calling (a routing node that MUST produce a tool call, a guarded LLM call that MUST NOT call tools) should pin tool_choice explicitly rather than relying on the provider default.
Pre-send validation catches the three §5 failure modes (required with empty tools, ForceTool with empty tools, ForceTool.name not in tools) and raises ProviderInvalidRequest before the HTTP call. Not all providers honor tool_choice (confirm with your provider's docs), but the OpenAI-compatible mapping is in OpenAIProvider.
CompiledGraph.invoke() returns when the graph reaches END or raises; observer events are dispatched onto a per-invocation queue and delivered by a background worker. The graph's execution loop never awaits observer processing. In a long-running service this is invisible: the worker drains naturally. In a CLI, script, or serverless function, the process exits before the worker finishes, and any late observer events (typically the last node's completed event plus any checkpoint_saved events) get dropped.
Always call await graph.drain() before the short-lived process exits. If your observer set includes anything that might hang (a metrics observer with a flaky network endpoint, an OTel exporter behind a slow OTLP collector), supply a timeout:
summary = await graph.drain(timeout=5.0)
if summary.timeout_reached:
log.warning("drain truncated: %d events undelivered", summary.undelivered_count)The compiled graph stays usable for subsequent invocations after a timed-out drain: workers are cancelled cleanly, no partial state leaks.
install_log_bridge skips its own handler when the application already attached one to the same LoggerProvider
Two distinct classes both named LoggingHandler exist in the OTel Python ecosystem and both bridge stdlib log records to the OTel Logs SDK:
opentelemetry.sdk._logs.LoggingHandler(the SDK class). Typically attached by an application's own logging setup, e.g., a FastAPIsetup_logging(...)step that wires up an OTLP-backedLoggerProviderfor log export.opentelemetry.instrumentation.logging.handler.LoggingHandler(the instrumentation class). Whatopenarmature.observability.otel.install_log_bridgeattaches when it runs.
Different classes, same OTel-Logs export path. If both are attached against the same LoggerProvider, every stdlib log record fires through both handlers, both call provider.get_logger(...).emit(...), and BatchLogRecordProcessor ships the record TWICE to the OTLP endpoint. The duplication is OTLP-only; a console handler attached separately is unaffected, which makes "OTLP rows are doubled, console isn't" a head-scratcher to diagnose.
install_log_bridge detects either handler class against the same provider and skips its own addHandler accordingly; the openarmature.correlation_id LogRecord factory still installs. The check is provider-scoped, so an application that intentionally attaches a handler against a DIFFERENT LoggerProvider (a separate logs pipeline) still gets the OA bridge against the OA provider; the helper only dedups when the SAME provider would receive duplicate emissions.
openarmature exceptions split across three sibling hierarchies:
RuntimeGraphError(inopenarmature.graph): node execution failures:NodeException,RoutingError,EdgeException,ReducerError,StateValidationError. Each has acategorystring matching the spec's canonical error categories.CheckpointError(inopenarmature.checkpoint): persistence failures:CheckpointNotFound,CheckpointSaveFailed,CheckpointRecordInvalid,CheckpointStateMigrationMissing,CheckpointStateMigrationFailed,CheckpointStateMigrationChainAmbiguous.LlmProviderError(inopenarmature.llm): provider call failures:ProviderAuthentication,ProviderInvalidRequest,ProviderInvalidResponse,ProviderInvalidModel,ProviderModelNotLoaded,ProviderRateLimit,ProviderUnavailable,ProviderUnsupportedContentBlock,StructuredOutputInvalid.
Catching Exception works but is too broad; catching one hierarchy misses the other two. If you want to branch on category strings (e.g., for retry logic), catch the relevant base: RuntimeGraphError covers all five spec runtime categories, LlmProviderError covers all nine provider categories, CheckpointError covers all six checkpoint categories. The TRANSIENT_CATEGORIES frozenset in openarmature.llm enumerates which provider categories are retriable.
OA emits observer events under sentinel node-names for some internal dispatch: openarmature.checkpoint.migrate for state-migration runs (proposal 0014) and openarmature.checkpoint.save for checkpoint saves (proposal 0010) ride on NodeEvent with a sentinel namespace. (LLM provider calls used to follow the same pattern but moved to typed LlmCompletionEvent / LlmFailedEvent variants in v0.13.0 per proposals 0049 + 0058 — those are filtered by isinstance instead.) The sentinel-namespace events let the OTel / Langfuse observers emit checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise:
async def __call__(self, event: NodeEvent) -> None:
# Skip OA-internal events; only react to user node activity.
if event.namespace and event.namespace[0].startswith("openarmature."):
return
# … user-node handlingevent.namespace[0] is the safest discriminator. Don't try to filter on current_invocation_id() is None: OA-internal events are dispatched within the same invocation context as user-node events, so invocation_id is set for both; the namespace-prefix check is the stable contract.
When a fan-out's per-instance state collects a list[X] as its collect_field (e.g., each instance produces 0..N records), the engine's contribution step is [s[cfg.collect_field] for s in successes]; every instance's value becomes one element of the outer list. With list[X] per-instance, the parent receives list[list[X]], and the default append reducer on the parent's Annotated[list[X], append] field preserves the nesting verbatim. Pydantic then fails to validate each list[X] element against X:
attributed_candidates.0 Input should be a valid dictionary or
instance of ClaimCandidate [input_value=[ClaimCandidate(...)],
input_type=list]
The fix is the concat_flatten built-in reducer (proposal 0036), the list-of-lists analog of append. Declare it on the parent's collection field:
from typing import Annotated
from pydantic import Field
from openarmature.graph import State, concat_flatten
class PipelineState(State):
attributed_candidates: Annotated[list[ClaimCandidate], concat_flatten] = Field(default_factory=list)concat_flatten folds the per-instance lists into one flat list ([*prior, *(item for sublist in update for item in sublist)]), strict like append: it raises ReducerError if any element of the update isn't itself a list.
The dict-shaped analog is merge_all (also proposal 0036): when each fan-out instance contributes a dict[str, X], the parent's target_field receives list[dict], which plain merge can't consume. merge_all folds the sequence of mappings into the prior with shallow last-write-wins per key:
from typing import Annotated
from pydantic import Field
from openarmature.graph import State, merge_all
class PipelineState(State):
keyed_results: Annotated[dict[str, Result], merge_all] = Field(default_factory=dict)Single-record-per-instance fan-outs (collect_field: str, parent field Annotated[list[X], append]) don't hit this; the engine still wraps each instance's value as one element, but append flattens it correctly since each element is already an X. The two non-flat shapes emerge only when the per-instance value is itself a container: a list[X] per instance lands list[list[X]] (use concat_flatten), and a dict[str, X] per instance lands list[dict] (use merge_all).
If a parent field is populated by BOTH direct node writes AND fan-out collection, that's an architectural ambiguity worth fixing upstream: split into two fields, or pick one path.
Runnable example programs shipped in the source tree at examples/. The full code is not bundled here (each example is 300+ lines); read the file at the listed path to see the canonical shape for that use case.
examples/chat-with-multimodal/main.py— openarmature demo: multi-turn chat with conversation memory and a multimodal turn, using ChatPrompt + PlaceholderSegment.examples/checkpointing-and-migration/main.py— openarmature demo: a lunar-mission planning pipeline that survives a mid-pipeline crash and later resumes under an upgraded state schema.examples/explicit-subgraph-mapping/main.py— openarmature demo: same compiled subgraph reused at two sites in one parent graph, each site with its own ExplicitMapping.examples/fan-out-with-retry/main.py— openarmature demo: summarize a batch of lunar-mission headlines in parallel, with per-headline retries and timing.examples/hello-world/main.py— Hello-world demo: a 3-node graph where each node makes an LLM call with structured output. Classify a query, then either plan research or write a one-sentence summary.examples/langfuse-observability/main.py— openarmature demo: Langfuse observer + prompt linkage on a lunar mission Q&A pipeline.examples/multimodal-prompt/main.py— openarmature demo: two independent analyses of a lunar-mission photograph using versioned prompt templates, a fallback prompt backend, and a multimodal user message.examples/nested-subgraphs/main.py— openarmature demo: question answering against a tiny document corpus, with two levels of subgraph nesting.examples/observer-hooks/main.py— openarmature demo: observer hooks for structured logging, per-call metrics, and OTel spans.examples/parallel-branches/main.py— openarmature demo: enrich a lunar-mission news article with three independent analyses running concurrently.examples/production-observability/main.py— openarmature demo: production observability with dual OTel + Langfuse observers, caller hooks for trace.input/output, and the canonical TimingMiddleware.examples/routing-and-subgraphs/main.py— openarmature demo: conditional routing + subgraph with a custom projection.examples/tool-use/main.py— openarmature demo: a lunar-mission assistant that calls local Python functions as tools to answer fact and physics questions about Apollo / Artemis missions.
If your question isn't covered above, look here:
- Full docs site: openarmature.ai
- Spec text: openarmature.org/capabilities
- API reference: openarmature.ai/reference
- Host project conventions: the project's own
AGENTS.md/CLAUDE.md