Skip to content

Commit 4ca3d32

Browse files
Docs: vLLM cookbook, non-obvious shapes, caller-metadata concept (#87)
* Docs: vLLM cookbook, non-obvious shapes, caller-metadata concept Adds five new "non-obvious shape" entries to docs/agent/non-obvious-shapes.md (regenerated into AGENTS.md): - Reconcile started → completed pairs via a per-invocation dict keyed on (namespace, attempt_index, fan_out_index). Closes downstream feedback item 4 (StepMetadataObserver reconciliation pattern). - Filter openarmature.*-namespaced events when the observer only cares about user nodes. Closes downstream feedback item 5 (sentinel-event noise in custom observers). - A with_state_migration recipe — register migrations alongside the state class, run on resume. Closes downstream feedback item 6. - Fan-out subgraphs that emit list[X] per instance produce list[list[X]] at target_field. Documents the gotcha plus a custom-reducer workaround; forward-references the spec v0.27.0 built-ins (concat_flatten, merge_all) not yet absorbed into the python impl. - invoke(metadata=...) for caller-supplied trace identifiers. Pairs with proposal 0034's new public surface. Adds a self-hosted vLLM cookbook page at docs/model-providers/vllm.md covering base_url shape (rejected /v1 suffix), authentication, genai_system="vllm" override, force_prompt_augmentation_fallback for older vLLM, readiness-probe limitations, and tool-calling. Linked from model-providers/index.md and mkdocs.yml nav. Adds a caller-supplied invocation metadata section to docs/concepts/observability.md (right after the correlation_id section), covering boundary validation, mid-invocation augmentation via set_invocation_metadata, and per-async-context COW isolation for fan-out instances. Fixes a stale comment in examples/00-hello-world/main.py that enumerated only three of RuntimeConfig's seven post-proposal-0032 declared fields. All 12 python snippets in docs/model-providers/vllm.md execute under the doc-examples test (916 passed, 129 skipped). * Address PR 87 review: shape-key fix, accurate ready() docs - Reconciliation shape entry: add branch_name to the event-pair key. Per the NodeEvent docstring the unique tuple is (namespace, branch_name, fan_out_index, attempt_index, phase); omitting branch_name let two parallel-branches running the same subgraph collide. Fixed the heading, the prose, and the code example. - Filter-events shape entry: drop the incorrect claim that OA-internal events lack an invocation_id. invocation_id is set at invoke() entry before the deliver_loop task is created, so the observer reads it as non-None for both user-node and OA-internal events. The namespace-prefix check is the only correct filter. - vLLM 30-second example: construct the messages + config locals so the imports are referenced naturally, and run asyncio.run( main()) so the doc-examples test exercises provider construction + aclose() (network calls stay commented). - vLLM readiness section: document that ready() also consults the optional per-entry status field and raises ProviderModelNotLoaded on loading/not_loaded, then frame the warm-up workaround as the vLLM-specific fallback (vLLM's /v1/models omits status). AGENTS.md regenerated.
1 parent 2aa6efc commit 4ca3d32

7 files changed

Lines changed: 601 additions & 1 deletion

File tree

docs/agent/non-obvious-shapes.md

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,139 @@ The compiled graph stays usable for subsequent invocations after a timed-out dra
109109
- `LlmProviderError` (in `openarmature.llm`) — provider call failures: `ProviderAuthentication`, `ProviderInvalidRequest`, `ProviderInvalidResponse`, `ProviderInvalidModel`, `ProviderModelNotLoaded`, `ProviderRateLimit`, `ProviderUnavailable`, `ProviderUnsupportedContentBlock`, `StructuredOutputInvalid`.
110110

111111
Catching `Exception` works but is too broad; catching one hierarchy misses the other two. If you want to branch on category strings (e.g., for retry logic), catch the relevant base — `RuntimeGraphError` covers all five spec runtime categories, `LlmProviderError` covers all nine provider categories, `CheckpointError` covers all six checkpoint categories. The `TRANSIENT_CATEGORIES` frozenset in `openarmature.llm` enumerates which provider categories are retriable.
112+
113+
### Reconcile `started``completed` pairs via a per-invocation dict keyed on `(namespace, branch_name, attempt_index, fan_out_index)`
114+
115+
Observers receive `started` and `completed` events as a pair per node attempt, but the engine doesn't carry a `step_id`-like correlation field across the pair (it doesn't need one for its own logic — the events arrive serially per spec §6). Observer code that needs to thread per-call state — start timestamps, request payloads, custom IDs — between the two events has to reconcile manually.
116+
117+
The pair identity is `(namespace, branch_name, attempt_index, fan_out_index)`: that tuple is unique within an invocation (per graph-engine §6 uniqueness invariants — `branch_name` and `fan_out_index` are independent slots, so a node inside a parallel-branches branch needs `branch_name` in the key to avoid colliding with the same-named node in a sibling branch). Carry per-invocation state in a `dict[invocation_id, dict[tuple, value]]` and look up on `completed`:
118+
119+
```python
120+
class StepTimingObserver:
121+
def __init__(self) -> None:
122+
# invocation_id -> {(namespace, branch_name, attempt_index, fan_out_index): start_ts}
123+
self._pending: dict[str, dict[tuple[Any, ...], float]] = {}
124+
125+
async def __call__(self, event: NodeEvent) -> None:
126+
invocation_id = current_invocation_id()
127+
if invocation_id is None:
128+
return
129+
key = (event.namespace, event.branch_name, event.attempt_index, event.fan_out_index)
130+
if event.phase == "started":
131+
self._pending.setdefault(invocation_id, {})[key] = time.monotonic()
132+
elif event.phase == "completed":
133+
start = self._pending.get(invocation_id, {}).pop(key, None)
134+
if start is not None:
135+
duration = time.monotonic() - start
136+
# … emit timing
137+
# Sweep when the dict empties (last completed for this invocation).
138+
if not self._pending.get(invocation_id):
139+
self._pending.pop(invocation_id, None)
140+
```
141+
142+
The `_pending[invocation_id]` sub-dict naturally tracks in-flight pairs and drains as completions arrive. Sweep the outer entry when the sub-dict empties so long-running services don't accumulate per-invocation entries. If you also subscribe to drain events, that's another sweep opportunity. The same pattern works for any per-call state the observer needs to thread across the pair.
143+
144+
### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes
145+
146+
OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc. — but a custom observer that only cares about user-defined node activity sees them as noise:
147+
148+
```python
149+
async def __call__(self, event: NodeEvent) -> None:
150+
# Skip OA-internal events; only react to user node activity.
151+
if event.namespace and event.namespace[0].startswith("openarmature."):
152+
return
153+
# … user-node handling
154+
```
155+
156+
`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None` — OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract.
157+
158+
### A `with_state_migration` recipe — register migrations alongside the state class, run on resume
159+
160+
`GraphBuilder.with_state_migration(s)` registers callables that transform an old-schema state record into the current schema. The engine calls them automatically on `invoke(resume_invocation=...)` when the loaded record's `schema_version` doesn't match `state_cls.schema_version`. The migration callable's signature is `(state_dict: dict, from_version: str, to_version: str) -> dict`; it receives the raw deserialized record and returns the new shape.
161+
162+
Wire it up at compile time:
163+
164+
```python
165+
class PipelineState(State):
166+
schema_version: ClassVar[str] = "2"
167+
# … v2 fields
168+
169+
def _migrate_v1_to_v2(state_dict: dict, from_version: str, to_version: str) -> dict:
170+
# Old field "step_count" renamed to "steps_completed" in v2.
171+
state_dict["steps_completed"] = state_dict.pop("step_count", 0)
172+
return state_dict
173+
174+
compiled = (
175+
GraphBuilder(PipelineState)
176+
.add_node("step", _step_body)
177+
.add_edge("step", END)
178+
.set_entry("step")
179+
.with_state_migration(from_version="1", to_version="2", migrate=_migrate_v1_to_v2)
180+
.compile()
181+
)
182+
compiled.attach_checkpointer(checkpointer)
183+
```
184+
185+
Important detail: the migration runs once on resume, before any node body fires; the engine dispatches a synthetic `checkpoint_migrated` observer event (per spec §6 cross-ref) so observers can emit a migration span. The migrated state is what `_step_body` sees on resume — you do NOT need to handle both v1 and v2 shapes in node bodies.
186+
187+
When chaining multiple migrations (v1 → v2 → v3), register each step separately via repeated `with_state_migration` calls; the engine walks the chain in version order. If the chain has gaps (registered v1→v2 and v3→v4 but a record is at v2 with `to_version="4"`), the engine raises `CheckpointStateMigrationMissing` at resume time — fail-loud rather than silently skipping.
188+
189+
### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field`
190+
191+
When a fan-out's per-instance state collects a `list[X]` as its `collect_field` (e.g., each instance produces 0..N records), the engine's contribution step is `[s[cfg.collect_field] for s in successes]` — every instance's value becomes one element of the outer list. With `list[X]` per-instance, the parent receives `list[list[X]]`, and the default `append` reducer on the parent's `Annotated[list[X], append]` field preserves the nesting verbatim. Pydantic then fails to validate each `list[X]` element against `X`:
192+
193+
```
194+
attributed_candidates.0 Input should be a valid dictionary or
195+
instance of ClaimCandidate [input_value=[ClaimCandidate(...)],
196+
input_type=list]
197+
```
198+
199+
The right fix is a flattening reducer. Until OA ships the spec-blessed built-ins (proposal 0036 — `concat_flatten` for the list-of-lists case, `merge_all` for the dict-of-mappings case — accepted in spec v0.27.0 but not yet absorbed into the python impl), use a small custom reducer:
200+
201+
```python
202+
from openarmature.graph import Reducer
203+
204+
class _ConcatFlatten(Reducer):
205+
name = "concat_flatten"
206+
207+
def __call__(self, prior: list[Any], update: list[list[Any]]) -> list[Any]:
208+
return [*prior, *(item for sublist in update for item in sublist)]
209+
210+
concat_flatten = _ConcatFlatten()
211+
212+
class PipelineState(State):
213+
attributed_candidates: Annotated[list[ClaimCandidate], concat_flatten] = ...
214+
```
215+
216+
Single-record-per-instance fan-outs (`collect_field: str`, parent field `Annotated[list[X], append]`) don't hit this — the engine still wraps each instance's value as one element, but `append` flattens it correctly since each element is already an `X`. The list-of-lists shape only emerges when the per-instance value is itself a list.
217+
218+
If a parent field is populated by BOTH direct node writes AND fan-out collection, that's an architectural ambiguity worth fixing upstream — split into two fields, or pick one path.
219+
220+
### `invoke(metadata=...)` for caller-supplied trace identifiers (tenant IDs, request IDs, feature flags)
221+
222+
Per spec observability §3.4 / proposal 0034, callers attach arbitrary key/value entries at `invoke()` time and the framework propagates them to every observability backend:
223+
224+
```python
225+
await compiled.invoke(
226+
initial_state,
227+
metadata={"tenantId": "acme-corp", "requestId": "req-12345", "featureFlag": "v2-canary"},
228+
)
229+
```
230+
231+
The OTel observer emits each entry as an `openarmature.user.<key>` cross-cutting span attribute on every span (invocation, node, subgraph wrapper, fan-out instance, LLM provider). The Langfuse observer merges each entry as a top-level key into `trace.metadata` AND every observation's metadata. Backends that consume OTel attributes (Honeycomb, Datadog APM, HyperDX, Grafana Tempo) pick the entries up for free; backends with typed metadata fields (Langfuse) get them via the per-backend propagation rule.
232+
233+
Boundary validation runs synchronously: keys MUST NOT start with `openarmature.` or `gen_ai.` (reserved namespaces); values MUST be OTel-attribute-compatible scalars (`str` / `int` / `float` / `bool`) or homogeneous arrays of those. Violations raise `ValueError` before any work begins.
234+
235+
Mid-invocation augmentation via the public helper:
236+
237+
```python
238+
from openarmature.observability import set_invocation_metadata
239+
240+
async def my_node(state: MyState) -> dict:
241+
set_invocation_metadata(productId=state.product_id)
242+
# subsequent spans (this node's completed, next node's started,
243+
# any LLM call inside, etc.) carry productId
244+
return {"score": await compute_score(state)}
245+
```
246+
247+
The augmentation respects fan-out / parallel-branches per-instance scoping — each instance's augmentation lives in its own Context copy and doesn't leak to siblings. Sequential nodes in the same engine task see prior nodes' augmentations forward. The helper validates the same rules as the `invoke()` boundary.

docs/concepts/observability.md

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,98 @@ though their `invocation_id`s differ. It's exported from the
302302
`current_invocation_id` (and friends) for code that needs to thread
303303
the IDs explicitly.
304304

305+
## Caller-supplied invocation metadata
306+
307+
`correlation_id` is one string; if you also need to attach
308+
business-domain identifiers — tenant IDs, request IDs, feature
309+
flags, A/B cohort labels — pass them as a structured mapping at
310+
`invoke()` time:
311+
312+
```python
313+
await compiled.invoke(
314+
initial_state,
315+
metadata={
316+
"tenantId": "acme-corp",
317+
"requestId": "req-12345",
318+
"featureFlag": "v2-canary",
319+
"seatCount": 42,
320+
},
321+
)
322+
```
323+
324+
Every observability backend picks the entries up:
325+
326+
- **OTel** emits each entry as an `openarmature.user.<key>`
327+
cross-cutting span attribute on every span — invocation, node,
328+
subgraph wrapper, fan-out instance, LLM provider, retry attempt.
329+
Backends that consume OTel attributes (Phoenix / Arize, Honeycomb,
330+
Datadog APM, HyperDX, Grafana Tempo, custom collectors) see them
331+
uniformly without per-backend wiring.
332+
- **Langfuse** merges each entry as a top-level key into
333+
`trace.metadata` AND into every `observation.metadata`. The
334+
Langfuse UI filters on `metadata.<key>` directly, so dashboard
335+
queries like "show me all traces for `tenantId == acme-corp`"
336+
work without any custom dashboard config.
337+
338+
Validation runs at the `invoke()` boundary before any work begins.
339+
Two rules:
340+
341+
- **Keys** MUST NOT start with `openarmature.` or `gen_ai.`
342+
(reserved for spec-normative attribute namespaces; collisions
343+
would silently overwrite OA-emitted state).
344+
- **Values** MUST be OTel-attribute-compatible scalars (`str`,
345+
`int`, `float`, `bool`) or homogeneous arrays of those types.
346+
`None`, nested objects, and mixed-type arrays are rejected.
347+
348+
Violations raise `ValueError` synchronously — no spans emitted, no
349+
work runs.
350+
351+
### Adding entries mid-invocation
352+
353+
From inside a node body, middleware, or observer, augment the
354+
in-scope metadata via the public helper:
355+
356+
```python
357+
from openarmature.observability import set_invocation_metadata
358+
359+
async def evaluate_product(state: PipelineState) -> dict[str, Any]:
360+
set_invocation_metadata(productId=state.product_id, productCategory=state.category)
361+
# Spans emitted AFTER this call carry productId + productCategory
362+
# in addition to whatever the original invoke() metadata supplied.
363+
response = await provider.complete(messages)
364+
return {"score": parse_score(response.message.content)}
365+
```
366+
367+
Spans already closed are NOT retroactively updated. Spans emitted
368+
after the call (the current node's `completed` event, the next
369+
node's `started`, any LLM call inside) pick up the new entries.
370+
371+
**Per-async-context scoping.** The metadata mapping lives in a
372+
`ContextVar`, which Python copies on async-task creation. Fan-out
373+
instances and parallel-branches each receive their own copy at
374+
dispatch time — an instance that calls `set_invocation_metadata`
375+
does NOT leak its augmentation to sibling instances. This is the
376+
canonical pattern for per-instance identifiers:
377+
378+
```python
379+
# Each fan-out instance adds its own productId; siblings stay clean
380+
async def evaluate_product(state: ProductState) -> dict[str, Any]:
381+
set_invocation_metadata(productId=state.product_id)
382+
return await score_product(state)
383+
```
384+
385+
Augmentation within the parent context (before fan-out dispatch, or
386+
in code that runs serially) flows forward to subsequent spans in
387+
that context, per normal `ContextVar` semantics.
388+
389+
### Reading the in-scope metadata
390+
391+
`openarmature.observability.current_invocation_metadata()` returns
392+
the live mapping (or an empty `MappingProxyType` outside an
393+
invocation). Observers and capability code read this to surface
394+
the entries on backend-specific records; user code typically uses
395+
`set_invocation_metadata` to write and lets the framework propagate.
396+
305397
## OpenTelemetry mapping (opt-in)
306398

307399
Install with the `[otel]` extra:

docs/model-providers/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ nodes call it inside their bodies.
205205

206206
## Where to next
207207

208+
- **[Self-hosted vLLM](vllm.md)**: configure `OpenAIProvider` to
209+
talk to a self-hosted vLLM server. Covers the base-URL contract,
210+
the legacy-server fallback flag, the `gen_ai.system` override,
211+
and readiness-probe limitations.
208212
- **[Authoring a Provider](authoring.md)**: how to implement the
209213
Protocol for a non-default wire format. Includes a ~60-line
210214
skeleton + contract checklist.

0 commit comments

Comments
 (0)