Skip to content

Commit ebbafd8

Browse files
Add three patterns from graduated agent shapes (#111)
* Add three patterns from graduated agent shapes Graduate three substantive entries from docs/agent/non-obvious- shapes.md into full patterns under docs/patterns/, each with a runnable snippet and "when this is right / when it isn't" guidance: - state-migration-on-resume: schema_version conventions, the Callable[[Any], Any] migrate signature, JSON-backed checkpointer requirement, chained migrations and the CheckpointStateMigrationMissing failure mode. - caller-supplied-trace-identifiers: invoke(metadata=...) and set_invocation_metadata propagation through OTel attributes and Langfuse trace.metadata, with the boundary validation rules. - observer-state-reconciliation: per-invocation dict keyed on (namespace, branch_name, attempt_index, fan_out_index) for custom observers that need to thread state between paired events. Update the patterns index + mkdocs nav, remove the three graduated entries from non-obvious-shapes (the patterns catalog handles discoverability), regenerate the bundled AGENTS.md and the programmatic patterns API (openarmature.patterns now lists 7 entries instead of 4), and bump the test_patterns_api expectation. * Correct two pattern-doc inaccuracies per PR review Two CoPilot findings on PR #111: 1. caller-supplied-trace-identifiers referenced a non-existent public constant ``RESERVED_LANGFUSE_METADATA_KEYS``. The actual symbol is ``_RESERVED_KEY_NAMES`` (module-private). Replace the false-symbol claim with a description of where validation runs (the boundary in ``observability.metadata``) and point at the spec's observability §3.4 for the canonical reserved-key list. 2. state-migration-on-resume's resume snippet used ``starting_state=None``, but ``CompiledGraph.invoke()`` takes positional ``initial_state: StateT`` (required, no such kwarg). Update the comment to pass ``PipelineState()`` positionally; the engine overwrites it from the loaded checkpoint record. Regenerate AGENTS.md and ``_patterns/`` so the two regenerated mirrors carry the same fixes.
1 parent d504350 commit ebbafd8

12 files changed

Lines changed: 1229 additions & 183 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
88

99
### Added
1010

11+
- **Three new patterns docs.** `docs/patterns/state-migration-on-resume.md`, `docs/patterns/caller-supplied-trace-identifiers.md`, and `docs/patterns/observer-state-reconciliation.md` graduate the corresponding entries from `docs/agent/non-obvious-shapes.md` into full pattern recipes with code snippets and "when this is right / when it isn't" guidance. The programmatic patterns API (`openarmature.patterns.list()` / `get(name)`) grows from 4 to 7 entries.
1112
- **HyperDX OTel integration test path and "Production swap" docs in example 03.** `examples/03-observer-hooks/main.py`'s module docstring grows a "Production swap" section showing how to substitute the demo's `SimpleSpanProcessor` + `ConsoleSpanExporter` for `BatchSpanProcessor` + `OTLPSpanExporter` pointed at HyperDX (or any other OTLP-HTTP collector). A new opt-in integration test (`tests/integration/test_otel_hyperdx_export.py`, gated by `HYPERDX_API_KEY` + `HYPERDX_OTLP_ENDPOINT` env vars and `@pytest.mark.integration`) drives the same production export path end-to-end against a live endpoint. `opentelemetry-exporter-otlp-proto-http` lands as a dev-only dep; not promoted to a public extras group yet.
1213

1314
### Changed (breaking)

docs/agent/non-obvious-shapes.md

Lines changed: 0 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -125,37 +125,6 @@ Different classes, same OTel-Logs export path. If both are attached against the
125125

126126
Catching `Exception` works but is too broad; catching one hierarchy misses the other two. If you want to branch on category strings (e.g., for retry logic), catch the relevant base — `RuntimeGraphError` covers all five spec runtime categories, `LlmProviderError` covers all nine provider categories, `CheckpointError` covers all six checkpoint categories. The `TRANSIENT_CATEGORIES` frozenset in `openarmature.llm` enumerates which provider categories are retriable.
127127

128-
### Reconcile `started``completed` pairs via a per-invocation dict keyed on `(namespace, branch_name, attempt_index, fan_out_index)`
129-
130-
Observers receive `started` and `completed` events as a pair per node attempt, but the engine doesn't carry a `step_id`-like correlation field across the pair (it doesn't need one for its own logic — the events arrive serially per spec §6). Observer code that needs to thread per-call state — start timestamps, request payloads, custom IDs — between the two events has to reconcile manually.
131-
132-
The pair identity is `(namespace, branch_name, attempt_index, fan_out_index)`: that tuple is unique within an invocation (per graph-engine §6 uniqueness invariants — `branch_name` and `fan_out_index` are independent slots, so a node inside a parallel-branches branch needs `branch_name` in the key to avoid colliding with the same-named node in a sibling branch). Carry per-invocation state in a `dict[invocation_id, dict[tuple, value]]` and look up on `completed`:
133-
134-
```python
135-
class StepTimingObserver:
136-
def __init__(self) -> None:
137-
# invocation_id -> {(namespace, branch_name, attempt_index, fan_out_index): start_ts}
138-
self._pending: dict[str, dict[tuple[Any, ...], float]] = {}
139-
140-
async def __call__(self, event: NodeEvent) -> None:
141-
invocation_id = current_invocation_id()
142-
if invocation_id is None:
143-
return
144-
key = (event.namespace, event.branch_name, event.attempt_index, event.fan_out_index)
145-
if event.phase == "started":
146-
self._pending.setdefault(invocation_id, {})[key] = time.monotonic()
147-
elif event.phase == "completed":
148-
start = self._pending.get(invocation_id, {}).pop(key, None)
149-
if start is not None:
150-
duration = time.monotonic() - start
151-
# … emit timing
152-
# Sweep when the dict empties (last completed for this invocation).
153-
if not self._pending.get(invocation_id):
154-
self._pending.pop(invocation_id, None)
155-
```
156-
157-
The `_pending[invocation_id]` sub-dict naturally tracks in-flight pairs and drains as completions arrive. Sweep the outer entry when the sub-dict empties so long-running services don't accumulate per-invocation entries. If you also subscribe to drain events, that's another sweep opportunity. The same pattern works for any per-call state the observer needs to thread across the pair.
158-
159128
### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes
160129

161130
OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc. — but a custom observer that only cares about user-defined node activity sees them as noise:
@@ -170,37 +139,6 @@ async def __call__(self, event: NodeEvent) -> None:
170139

171140
`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None` — OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract.
172141

173-
### A `with_state_migration` recipe — register migrations alongside the state class, run on resume
174-
175-
`GraphBuilder.with_state_migration(s)` registers callables that transform an old-schema state record into the current schema. The engine calls them automatically on `invoke(resume_invocation=...)` when the loaded record's `schema_version` doesn't match `state_cls.schema_version`. The migration callable's signature is `(state_dict: dict, from_version: str, to_version: str) -> dict`; it receives the raw deserialized record and returns the new shape.
176-
177-
Wire it up at compile time:
178-
179-
```python
180-
class PipelineState(State):
181-
schema_version: ClassVar[str] = "2"
182-
# … v2 fields
183-
184-
def _migrate_v1_to_v2(state_dict: dict, from_version: str, to_version: str) -> dict:
185-
# Old field "step_count" renamed to "steps_completed" in v2.
186-
state_dict["steps_completed"] = state_dict.pop("step_count", 0)
187-
return state_dict
188-
189-
compiled = (
190-
GraphBuilder(PipelineState)
191-
.add_node("step", _step_body)
192-
.add_edge("step", END)
193-
.set_entry("step")
194-
.with_state_migration(from_version="1", to_version="2", migrate=_migrate_v1_to_v2)
195-
.compile()
196-
)
197-
compiled.attach_checkpointer(checkpointer)
198-
```
199-
200-
Important detail: the migration runs once on resume, before any node body fires; the engine dispatches a synthetic `checkpoint_migrated` observer event (per spec §6 cross-ref) so observers can emit a migration span. The migrated state is what `_step_body` sees on resume — you do NOT need to handle both v1 and v2 shapes in node bodies.
201-
202-
When chaining multiple migrations (v1 → v2 → v3), register each step separately via repeated `with_state_migration` calls; the engine walks the chain in version order. If the chain has gaps (registered v1→v2 and v3→v4 but a record is at v2 with `to_version="4"`), the engine raises `CheckpointStateMigrationMissing` at resume time — fail-loud rather than silently skipping.
203-
204142
### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field`
205143

206144
When a fan-out's per-instance state collects a `list[X]` as its `collect_field` (e.g., each instance produces 0..N records), the engine's contribution step is `[s[cfg.collect_field] for s in successes]` — every instance's value becomes one element of the outer list. With `list[X]` per-instance, the parent receives `list[list[X]]`, and the default `append` reducer on the parent's `Annotated[list[X], append]` field preserves the nesting verbatim. Pydantic then fails to validate each `list[X]` element against `X`:
@@ -242,32 +180,3 @@ class PipelineState(State):
242180
Single-record-per-instance fan-outs (`collect_field: str`, parent field `Annotated[list[X], append]`) don't hit this — the engine still wraps each instance's value as one element, but `append` flattens it correctly since each element is already an `X`. The two non-flat shapes emerge only when the per-instance value is itself a container: a `list[X]` per instance lands `list[list[X]]` (use `concat_flatten`), and a `dict[str, X]` per instance lands `list[dict]` (use `merge_all`).
243181

244182
If a parent field is populated by BOTH direct node writes AND fan-out collection, that's an architectural ambiguity worth fixing upstream — split into two fields, or pick one path.
245-
246-
### `invoke(metadata=...)` for caller-supplied trace identifiers (tenant IDs, request IDs, feature flags)
247-
248-
Per spec observability §3.4 / proposal 0034, callers attach arbitrary key/value entries at `invoke()` time and the framework propagates them to every observability backend:
249-
250-
```python
251-
await compiled.invoke(
252-
initial_state,
253-
metadata={"tenantId": "acme-corp", "requestId": "req-12345", "featureFlag": "v2-canary"},
254-
)
255-
```
256-
257-
The OTel observer emits each entry as an `openarmature.user.<key>` cross-cutting span attribute on every span (invocation, node, subgraph wrapper, fan-out instance, LLM provider). The Langfuse observer merges each entry as a top-level key into `trace.metadata` AND every observation's metadata. Backends that consume OTel attributes (Honeycomb, Datadog APM, HyperDX, Grafana Tempo) pick the entries up for free; backends with typed metadata fields (Langfuse) get them via the per-backend propagation rule.
258-
259-
Boundary validation runs synchronously: keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved namespaces); values MUST be OTel-attribute-compatible scalars (`str` / `int` / `float` / `bool`) or homogeneous arrays of those. Violations raise `ValueError` before any work begins.
260-
261-
Mid-invocation augmentation via the public helper:
262-
263-
```python
264-
from openarmature.observability import set_invocation_metadata
265-
266-
async def my_node(state: MyState) -> dict:
267-
set_invocation_metadata(productId=state.product_id)
268-
# subsequent spans (this node's completed, next node's started,
269-
# any LLM call inside, etc.) carry productId
270-
return {"score": await compute_score(state)}
271-
```
272-
273-
The augmentation respects fan-out / parallel-branches per-instance scoping — each instance's augmentation lives in its own Context copy and doesn't leak to siblings. Sequential nodes in the same engine task see prior nodes' augmentations forward. The helper validates the same rules as the `invoke()` boundary.
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Caller-supplied trace identifiers
2+
3+
**Problem.** A service runs the same graph for many tenants /
4+
requests / feature flag cohorts. How do you tag every span and
5+
trace so downstream observability (Honeycomb, Datadog, Langfuse,
6+
HyperDX, Grafana Tempo) can filter by tenant or join across
7+
services without each node having to thread the identifiers
8+
through manually?
9+
10+
## Approach
11+
12+
Pass a `metadata` dict to `invoke()`. The framework propagates each
13+
entry to every observability backend at once: the OTel observer
14+
emits each entry as an `openarmature.user.<key>` cross-cutting span
15+
attribute on every span (invocation, node, subgraph wrapper,
16+
fan-out instance, LLM provider), and the Langfuse observer merges
17+
each entry as a top-level key into `trace.metadata` AND every
18+
observation's metadata. Backends that consume OTel attributes pick
19+
the entries up for free; backends with typed metadata fields get
20+
them via per-backend propagation.
21+
22+
For metadata that's only known mid-flight (an ID resolved by an
23+
LLM-classification node, a derived feature flag), use
24+
`set_invocation_metadata` from inside a node. The augmentation
25+
respects fan-out / parallel-branches per-instance scoping per
26+
proposal 0045, so each instance's update lives in its own
27+
async-context copy and doesn't leak to siblings.
28+
29+
## Snippet
30+
31+
```python
32+
import asyncio
33+
34+
from openarmature.graph import END, GraphBuilder, State
35+
from openarmature.observability import set_invocation_metadata
36+
37+
38+
class RequestState(State):
39+
query: str = ""
40+
answer: str = ""
41+
42+
43+
async def answer(s: RequestState) -> dict:
44+
# An entry resolved mid-invocation propagates to subsequent spans
45+
# in the same async-context: this node's `completed`, the LLM
46+
# provider span if any, and onwards. Sibling fan-out instances
47+
# and parallel-branches branches see their own copies.
48+
set_invocation_metadata(modelTier="standard")
49+
return {"answer": "Apollo 13 aborted due to an O2 tank failure."}
50+
51+
52+
graph = (
53+
GraphBuilder(RequestState)
54+
.add_node("answer", answer)
55+
.add_edge("answer", END)
56+
.set_entry("answer")
57+
.compile()
58+
)
59+
60+
61+
async def main() -> None:
62+
final = await graph.invoke(
63+
RequestState(query="why did Apollo 13 abort?"),
64+
metadata={
65+
"tenantId": "acme-corp",
66+
"requestId": "req-12345",
67+
"featureFlag": "v2-canary",
68+
},
69+
)
70+
print(final.answer)
71+
72+
73+
asyncio.run(main())
74+
```
75+
76+
Every span emitted during this `invoke()` carries
77+
`openarmature.user.tenantId="acme-corp"`,
78+
`openarmature.user.requestId="req-12345"`, and
79+
`openarmature.user.featureFlag="v2-canary"`. Spans inside the
80+
`answer` node (and any downstream nodes if the graph had more)
81+
additionally carry `openarmature.user.modelTier="standard"` from
82+
the `set_invocation_metadata` call.
83+
84+
## Boundary validation
85+
86+
Validation runs synchronously, before any node body fires. Both
87+
`invoke(metadata=...)` and `set_invocation_metadata(...)` enforce
88+
the same rules:
89+
90+
- Keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved
91+
namespaces per the spec).
92+
- Keys MUST NOT collide with the spec's reserved per-trace metadata
93+
keys (`correlation_id`, `entry_node`, `spec_version`, etc.). The
94+
set is enforced at the `invoke()` and `set_invocation_metadata`
95+
boundaries via the validator in
96+
`openarmature.observability.metadata`; it grows per spec proposals
97+
0041 / 0042, with the canonical list in the spec's observability
98+
§3.4.
99+
- Values MUST be OTel-attribute-compatible scalars (`str` / `int` /
100+
`float` / `bool`) or homogeneous arrays of those.
101+
102+
Violations raise `ValueError` at the boundary. Failing loud at
103+
construction is better than the bare-key silently clobbering a
104+
spec-reserved key in flat Langfuse `trace.metadata`.
105+
106+
## When this is the right pattern
107+
108+
- One service runs the same graph for many distinct callers
109+
(multi-tenant SaaS, per-customer feature flags, A/B test
110+
cohorts).
111+
- Downstream observability needs to filter or join on caller-side
112+
identifiers (tenant ID for billing dashboards, request ID for
113+
cross-service trace stitching, feature flag for experiment
114+
analysis).
115+
- You don't want each node to know about tenancy. The metadata
116+
flows through the framework, not the node bodies.
117+
118+
## When it isn't
119+
120+
- The identifier is a per-node decision, not a per-invocation one.
121+
If different nodes in the same invocation produce different
122+
values, that's typed state, not invocation metadata. Put it on
123+
the `State` schema with a clear reducer.
124+
- The value isn't a scalar or homogeneous array. The boundary
125+
validation rejects complex shapes; if you need to attach a nested
126+
object, serialize it to a JSON string before passing.
127+
- The value contains PII you don't want in every span. Metadata is
128+
unconditionally emitted everywhere the observers run; filter at
129+
the caller or skip the propagation for those keys.
130+
131+
## Cross-references
132+
133+
- [Observability concept page](../concepts/observability.md): how
134+
OTel attributes and Langfuse metadata propagate.
135+
- [`examples/10-langfuse-observability`](../examples/10-langfuse-observability.md):
136+
runnable example exercising the metadata propagation path.
137+
- Spec: [observability](https://openarmature.org/capabilities/observability/),
138+
the propagation contract for caller-supplied metadata.

docs/patterns/index.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,12 @@ docs composing existing primitives.
3333
- [Bypass if output exists](bypass-if-output-exists.md)
3434
short-circuit a node whose external output already exists, via
3535
middleware.
36+
- [State migration on resume](state-migration-on-resume.md) — let
37+
older in-flight checkpoints resume against an evolved state
38+
schema without each node body having to handle multiple shapes.
39+
- [Caller-supplied trace identifiers](caller-supplied-trace-identifiers.md)
40+
— propagate tenant ID / request ID / feature flags into every
41+
observability span via `invoke(metadata=...)`.
42+
- [Custom observer: reconciling started → completed pairs](observer-state-reconciliation.md)
43+
— thread per-call state between paired events using a per-
44+
invocation dict keyed on the spec's uniqueness tuple.

0 commit comments

Comments
 (0)