From 5262c37d55c8014f8cb2de8463d469257af2f1c4 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 9 Jun 2026 17:48:52 -0700 Subject: [PATCH 1/7] Correct v0.13.0 release narrative per spec review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three blocking + three should-fix items spec flagged on the pre-tag review. All narrative; no code behavior change. - 0047 CHANGELOG entry mis-attributed pieces 1+2 (Response.usage cache fields + OTel cache attributes) to v0.12.0. Verified via git: those landed in PRs #136 + #140 post-v0.12.0-tag, so all three pieces of 0047 ship in v0.13.0. Reframed. - conformance.toml [proposals."0047"] leading-comment block had the same v0.12.0 mis-attribution. Same correction; added PR references for traceability. - Unreleased section had two ### Added headings with the 0057 entry orphaned below ### Changed. Consolidated. - Spec pin advance text undercounted the cycle journey (said v0.51.0 → v0.53.0; actual is v0.46.0 → v0.53.0 across three hops). Reframed and listed absorbed proposals inline. - tool_call.arguments JSON encoding now uses sort_keys=True (functionally equivalent but byte-different for downstream snapshot consumers). Surfaced as its own ### Changed entry instead of buried in the 0047 ### Added. - conformance.toml [proposals."0049"] leading-comment block grew the fixture-deferral surface (057-068 + 069-073 parser- deferred pending harness directive schema catch-up; behavior pinned by unit tests) per spec OQ2. --- CHANGELOG.md | 10 +++---- conformance.toml | 71 ++++++++++++++++++++++++++++-------------------- 2 files changed, 46 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 613754c..e8c1d89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,22 +8,20 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ### Added -- **Implicit prefix-cache wire-byte stability** (proposal 0047, spec v0.39.0). The OpenAI Chat Completions wire body is now byte-stable across equivalent OA inputs — equivalent calls produce byte-identical request bodies regardless of dict insertion order at every user-supplied-dict boundary (tool definitions including the top-level `function` record + the `parameters` JSON Schema, `response_format.json_schema.schema`, `RuntimeConfig` extras, `tool_call.arguments` JSON encoding). A new `_canonicalize_dict_keys` helper recursively sorts dict keys at every nesting level while preserving caller-supplied array ordering (the spec's split between "object keys MUST be sorted" and "array order MUST be preserved per caller-supplied order"). A top-level belt-and-suspenders canonicalization pass over the assembled body catches anything the per-field passes miss. Combined with the existing `Response.usage.cached_tokens` / `cache_creation_tokens` fields sourced from `prompt_tokens_details` (v0.12.0) and the OTel observer's `openarmature.llm.cache_read.input_tokens` + `openarmature.llm.cache_creation.input_tokens` attributes (also v0.12.0), this closes proposal 0047 end-to-end. Prompt-management §13 *Cross-variable substring stability* is satisfied by the existing Jinja2 `StrictUndefined` render path; pinned by a new test. Scope is the Chat Completions endpoint only — the OpenAI Responses API endpoint and the Anthropic / Gemini wire-format mappings are deferred (the providers aren't implemented in python today). +- **Implicit prefix-cache wire-byte stability** (proposal 0047, spec v0.39.0). Closes proposal 0047 end-to-end across three pieces all landing in v0.13.0: (1) `Response.usage.cached_tokens` / `cache_creation_tokens` fields sourced from the OpenAI `prompt_tokens_details` payload (PR #136); (2) the OTel observer emits `openarmature.llm.cache_read.input_tokens` and optional `openarmature.llm.cache_creation.input_tokens` when the corresponding usage field is populated (PR #140); (3) the OpenAI Chat Completions wire body is now byte-stable across equivalent OA inputs — equivalent calls produce byte-identical request bodies regardless of dict insertion order at every user-supplied-dict boundary (tool definitions including the top-level `function` record + the `parameters` JSON Schema, `response_format.json_schema.schema`, `RuntimeConfig` extras, `tool_call.arguments` JSON encoding) via a new `_canonicalize_dict_keys` helper that recursively sorts dict keys at every nesting level while preserving caller-supplied array ordering, plus a top-level belt-and-suspenders canonicalization pass over the assembled body (PR #145). Prompt-management §13 *Cross-variable substring stability* is satisfied by the existing Jinja2 `StrictUndefined` render path; pinned by a new test. Scope is the Chat Completions endpoint only — the OpenAI Responses API endpoint and the Anthropic / Gemini wire-format mappings are deferred (the providers aren't implemented in python today). - **`LlmFailedEvent` typed event variant** (proposal 0058, spec v0.53.0). Carves LLM provider failures into a spec-normatively-typed event variant alongside `LlmCompletionEvent`. 17 mirrored identity / scoping / request-side fields + 3 failure-specific fields (`error_category` always-present from the llm-provider §7 normative category enumeration; optional `error_type` for vendor-specific detail or upstream exception class name; always-present `error_message`). `OpenAIProvider.complete()` emits the typed event alongside the §7 exception on both raise paths — adapter-caught provider exceptions AND pre-send validation raises. Caller-side exception flow unchanged; the exception still raises out of `complete()`. Mutually exclusive with `LlmCompletionEvent` on the same call. Both bundled observers (OTel + Langfuse) consume `LlmFailedEvent` directly: same `openarmature.llm.complete` span / Generation shape as the success path with ERROR status / level + `openarmature.error.category` attribute (OTel) / `error_category` as statusMessage (Langfuse), `start_time` back-dated by `latency_ms` so the failure duration reflects the time-to-raise. +- **`LlmCompletionEvent` extended with proposal 0057 request-side fields** (spec v0.51.0). The typed event now carries `input_messages`, `output_content`, `request_params`, `request_extras`, `active_prompt`, `active_prompt_group`, `call_id`, and `response_model` alongside the existing v0.49.0 fields. `request_id` renamed to `response_id` per the proposal's response-side naming. Inline image bytes in `input_messages` stay redacted per observability §5.5.5 — the OpenAI provider reuses the existing message-serialization helper for the projection. Observer-side privacy gates (OTel `disable_llm_payload`, Langfuse equivalents) apply at rendering, symmetric with the §5.5.1 span attribute path. ### Changed - **Sentinel-namespace `NodeEvent` emission for LLM events retired entirely from `OpenAIProvider`** (proposal 0058 cleanup). The provider no longer dispatches the `("openarmature.llm.complete",)`-namespaced `NodeEvent`s on either outcome path; both success and failure flow through their respective typed variants exclusively. The `_make_llm_event` helper is removed. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` for success and `isinstance(event, LlmFailedEvent)` for failure to keep receiving LLM-call notifications. `LlmEventPayload` and `LLM_NAMESPACE` remain in `openarmature.observability.llm_event` as a documented compatibility surface for custom providers that haven't migrated; neither is referenced by the bundled provider or observers anymore. -- **Pinned spec advances from v0.51.0 to v0.53.0** (absorbs proposals 0023 + 0058). Proposal 0023 (canonical state reducers) ships in spec v0.52.0 but is not implemented this cycle — `conformance.toml` marks 0023 as `not-yet`; fixtures 034–038 stay parser-deferred. +- **Pinned spec advances from v0.46.0 to v0.53.0** across the v0.13.0 cycle. Absorbs four implemented proposals (0047 — implicit prefix-cache wire-byte stability; 0049 — typed `LlmCompletionEvent`; 0057 — `LlmCompletionEvent` request-side field-set extension; 0058 — typed `LlmFailedEvent`) plus 0023 (canonical state reducers, v0.52.0) carried as `not-yet` in the manifest. Pin journey: v0.46.0 → v0.51.0 (PR #141 absorbs 0057) → v0.53.0 (PR #144 absorbs 0058; spec v0.52.0's 0023 entry rides along as `not-yet`). Fixtures 034–038 (0023) stay parser-deferred. +- **`tool_call.arguments` JSON encoding now uses `sort_keys=True`** (proposal 0047 §8 byte-stability requirement for caller-supplied dicts JSON-encoded into a string field). Functionally equivalent — the encoded string parses to the same dict — but byte-different from the previous insertion-order encoding. Downstream consumers that snapshot wire bodies (golden-file tests, audit logging, recorded fixtures) will see byte-different `tool_calls[].function.arguments` strings across this upgrade for any call whose argument dict was emitted in non-sorted insertion order before. - **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. The §5.5 attribute set and §8.4 Generation metadata are unchanged. (Failure paths land on `LlmFailedEvent` later in the same cycle — see the proposal 0058 entry above.) - **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. (The failure-path sentinel emission is retired entirely later in the same cycle — see the proposal 0058 entry above.) - **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter handles v4 Langfuse SDK quirks transparently: `Langfuse.start_observation()` does NOT accept `start_time`, so back-dated generations are routed through the private `_otel_tracer.start_span(name=..., start_time=int_ns)` API (mirroring the SDK's own `create_event` precedent) and the resulting OTel span is wrapped in `LangfuseGeneration` directly; the non-back-dated path still uses `start_observation`. `LangfuseSpan.end()` is typed `Optional[int]` (nanoseconds), so the adapter converts the Protocol's `datetime` surface to int nanoseconds before forwarding. The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions. - **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips. -### Added - -- **`LlmCompletionEvent` extended with proposal 0057 request-side fields** (spec v0.51.0). The typed event now carries `input_messages`, `output_content`, `request_params`, `request_extras`, `active_prompt`, `active_prompt_group`, `call_id`, and `response_model` alongside the existing v0.49.0 fields. `request_id` renamed to `response_id` per the proposal's response-side naming. Inline image bytes in `input_messages` stay redacted per observability §5.5.5 — the OpenAI provider reuses the existing message-serialization helper for the projection. Observer-side privacy gates (OTel `disable_llm_payload`, Langfuse equivalents) apply at rendering, symmetric with the §5.5.1 span attribute path. - ## [0.12.0] — 2026-06-05 Observability release. The pinned spec advances from v0.38.0 to v0.46.0, absorbing eight accepted proposals (0047-0054). Three ship as fully implemented this cycle: proposal 0048 grows a read-symmetric `get_invocation_metadata()` API + a §9 *Queryable observer pattern* concept doc section; proposal 0052 puts `openarmature.implementation.name` + `.version` attribution attributes on every OTel invocation span + every Langfuse Trace; proposal 0054 ships `CompiledGraph.drain_events_for(invocation_id, *, timeout)` as the architectural pair to 0048's §9.4 accumulator lifecycle. Two ship as textual-only acks (0051 Langfuse trace I/O caveat; 0053 §3.4 shared-parent boundary clarification). One Fixed: the retry middleware now resets the invocation-metadata ContextVar between attempts per §3.4. The production-observability example grows the queryable accumulator + drain_events_for pattern end-to-end so the new APIs have a runnable demo. diff --git a/conformance.toml b/conformance.toml index 4d31243..587505c 100644 --- a/conformance.toml +++ b/conformance.toml @@ -266,33 +266,38 @@ status = "implemented" since = "0.11.0" # Spec v0.39.0 (proposal 0047). Implicit prefix-cache wire-byte -# stability. Cross-capability proposal landed in v0.13.0 across -# three pieces: (1) ``Response.usage`` cache-stat fields -# (``cached_tokens`` / ``cache_creation_tokens``) sourced from the -# OpenAI ``prompt_tokens_details`` payload, with conditional emission +# stability. Cross-capability proposal landed end-to-end in the +# v0.13.0 cycle across three pieces, all post-v0.12.0: +# (1) ``Response.usage`` cache-stat fields (``cached_tokens`` / +# ``cache_creation_tokens``) sourced from the OpenAI +# ``prompt_tokens_details`` payload, with conditional emission # preserved (absent-vs-zero distinction stays observable) — landed -# in the v0.12.0 cycle as the proposal's payload-side prerequisite; +# in PR #136 as the proposal's payload-side prerequisite; # (2) OTel observer emits ``openarmature.llm.cache_read.input_tokens`` # (and optional ``openarmature.llm.cache_creation.input_tokens``) -# when the corresponding usage field is populated — also v0.12.0; -# (3) §8.1 intra-impl wire-byte canonicalization in the OpenAI -# adapter — landed here. The canonicalizer recursively sorts dict -# keys at every nesting level while preserving caller-supplied -# array order, applied at the four user-input boundaries +# when the corresponding usage field is populated — landed in +# PR #140; (3) §8.1 intra-impl wire-byte canonicalization in the +# OpenAI adapter — landed in PR #145. The canonicalizer recursively +# sorts dict keys at every nesting level while preserving caller- +# supplied array order, applied at the four user-input boundaries # (``tool.parameters`` / ``tool.function`` record top-level per # spec Q5, ``response_format.json_schema.schema``, ``RuntimeConfig`` # extras, ``tool_call.arguments`` JSON encoding) plus a top-level -# belt-and-suspenders pass over the assembled request body. Scope -# is the Chat Completions endpoint only; the OpenAI Responses API -# endpoint is deferred to a future cycle (no python consumer -# today). Prompt-management §13 cross-variable substring stability -# is satisfied by the existing Jinja2 ``StrictUndefined`` render -# path; pinned by ``tests/unit/test_prompts.py:: +# belt-and-suspenders pass over the assembled request body. +# Downstream-observable wire-byte shift on +# ``tool_call.arguments``: the encoded string now uses +# ``sort_keys=True`` (functionally equivalent — parses to the same +# dict — but byte-different for golden-file / audit-snapshot +# consumers). Scope is the Chat Completions endpoint only; the +# OpenAI Responses API endpoint is deferred to a future cycle (no +# python consumer today). Prompt-management §13 cross-variable +# substring stability is satisfied by the existing Jinja2 +# ``StrictUndefined`` render path; pinned by +# ``tests/unit/test_prompts.py:: # test_cross_variable_substring_stability_text_prompt`` and # ``test_cross_variable_substring_stability_chat_prompt``. -# Anthropic / Gemini -# wire-byte conformance fixtures stay deferred — neither provider -# is implemented in python today. +# Anthropic / Gemini wire-byte conformance fixtures stay deferred +# — neither provider is implemented in python today. [proposals."0047"] status = "implemented" since = "0.13.0" @@ -344,16 +349,24 @@ status = "implemented" since = "0.12.0" # Spec v0.41.0 (proposal 0049). Typed LLM Completion Event — first -# typed event variant on the observer event union. Shipped in -# v0.13.0: provider dual-emits the typed event alongside the sentinel -# NodeEvent pair (success-only per spec scope); LlmCompletionEvent -# carries identity/scoping/outcome fields per the spec field table. -# Conformance fixtures 050-056 activated by the typed_event_collector -# harness directive. The OTel + Langfuse observers continue to drive -# their §5.5 / §8.4.4 surface off the sentinel NodeEvent pair during -# the dual-emit transition window; type-discrimination migration -# lands once the follow-on request-side-fields extension (proposal -# 0057) ships. +# typed event variant on the observer event union. Shipped fully in +# v0.13.0 across PRs #141 (typed-event definition + provider +# emission + 0057 field-set extension), #142 (OTel observer migration +# to type discrimination), #143 (Langfuse observer migration + +# success-side sentinel emission dropped), and #144 (0058 typed +# LlmFailedEvent + sentinel-namespace NodeEvent emission for LLM +# events retired entirely from the bundled OpenAIProvider). +# LlmCompletionEvent carries identity/scoping/outcome fields per +# the spec field table. Both bundled observers (OTel + Langfuse) +# consume the typed events via isinstance discrimination on both +# outcome paths. Conformance fixtures 050-056 activated by the +# typed_event_collector harness directive. Fixtures 057-068 +# (proposal 0057 request-side fields) and 069-073 (proposal 0058 +# typed failure event) stay parser-deferred pending the harness's +# typed_event_collector directive schema catch-up + the event_counts +# list directive introduced by fixture 071; behavior pinned by +# unit tests in tests/unit/test_llm_provider.py + +# test_observability_otel.py + test_observability_langfuse.py. [proposals."0049"] status = "implemented" since = "0.13.0" From 4faa508c0a62cd661487bd4651f56e2211abb4ef Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 9 Jun 2026 17:59:47 -0700 Subject: [PATCH 2/7] Migrate LLM-event docs to typed-event-first MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three docs still pushed the legacy sentinel-namespace pattern as the primary path for custom observers consuming LLM events and custom providers emitting them. After v0.13.0 the bundled provider emits typed LlmCompletionEvent / LlmFailedEvent variants directly; the bundled OTel + Langfuse observers consume via isinstance discrimination. Rewrites: - docs/concepts/observability.md: "Publishing LLM events for custom observers" → "Consuming LLM events in custom observers". Typed-event consumption shown as primary (isinstance branch on LlmCompletionEvent + LlmFailedEvent with the mutual-exclusion + field-set notes). Sentinel pattern demoted to a "Legacy sentinel-namespace pattern (compatibility surface)" subsection for downstream code interoperating with custom providers that haven't migrated. - docs/model-providers/authoring.md: custom-provider emission sketch rewritten — dispatch LlmCompletionEvent on success, LlmFailedEvent alongside the §7 exception on failure. Shows the current-attempt-index / current-fan-out-index / etc. scoping fields the typed events carry. Calls out the mutual-exclusion + exception-flow-preservation contracts. Legacy sentinel pattern retained as a compatibility-surface callout for older providers. - docs/agent/non-obvious-shapes.md: "filter openarmature.*- namespaced events" tip drops the openarmature.llm.complete example (v0.13.0 retired the sentinel pattern for LLM events); checkpoint sentinels stay since the tip is still applicable for those. Adjusted the follow-on paragraph mentioning LLM events. mkdocs strict build clean. --- docs/agent/non-obvious-shapes.md | 4 +- docs/concepts/observability.md | 71 ++++++++++--- docs/model-providers/authoring.md | 171 ++++++++++++++++++++---------- 3 files changed, 174 insertions(+), 72 deletions(-) diff --git a/docs/agent/non-obvious-shapes.md b/docs/agent/non-obvious-shapes.md index f582c5c..b2bd6ec 100644 --- a/docs/agent/non-obvious-shapes.md +++ b/docs/agent/non-obvious-shapes.md @@ -127,7 +127,7 @@ Catching `Exception` works but is too broad; catching one hierarchy misses the o ### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes -OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: +OA emits observer events under sentinel node-names for some internal dispatch: `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014) and `openarmature.checkpoint.save` for checkpoint saves (proposal 0010) ride on `NodeEvent` with a sentinel namespace. (LLM provider calls used to follow the same pattern but moved to typed `LlmCompletionEvent` / `LlmFailedEvent` variants in v0.13.0 per proposals 0049 + 0058 — those are filtered by `isinstance` instead.) The sentinel-namespace events let the OTel / Langfuse observers emit checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: ```python async def __call__(self, event: NodeEvent) -> None: @@ -137,7 +137,7 @@ async def __call__(self, event: NodeEvent) -> None: # … user-node handling ``` -`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. +`event.namespace[0]` is the safest discriminator. Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. ### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field` diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 3b67740..ba50fe9 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -767,7 +767,7 @@ informative for trace readers. Redaction is **not** gated by `disable_llm_payload` and is **not** configurable. Inline image bytes never leave the provider in event form, so custom observers consuming -[`LlmEventPayload`](#publishing-llm-events-for-custom-observers) +[`LlmCompletionEvent` / `LlmFailedEvent`](#consuming-llm-events-in-custom-observers) cannot accidentally leak raw bytes regardless of how they're written. @@ -834,33 +834,74 @@ OTel SDK silently drops `set_attribute` on ended spans. Exceptions raised by an enricher are caught and warned, never propagated. -### Publishing LLM events for custom observers +### Consuming LLM events in custom observers + +`openarmature.graph.events.LlmCompletionEvent` and +`openarmature.graph.events.LlmFailedEvent` are the two typed event +variants any `Provider` implementation emits around a `complete()` +call. Custom observers consume them via type discrimination: + +```python +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent + +async def my_llm_observer(event): + if isinstance(event, LlmCompletionEvent): + # Successful call. Read identity / scoping / outcome + # directly off the typed fields: + # event.model, event.input_messages (already image-redacted), + # event.output_content, event.request_params, event.response_id, + # event.active_prompt, event.usage, event.latency_ms, … + return + if isinstance(event, LlmFailedEvent): + # §7 category exception was raised. Same identity / scoping + # surface as the completion variant, plus three failure- + # specific fields: + # event.error_category — one of the 9 normative §7 categories + # event.error_type — vendor code or upstream class name + # event.error_message — human-readable, may be empty + return +``` + +The two variants are **mutually exclusive on a single `complete()` +call** — implementations MUST NOT emit both for the same call. +Conformance fixture 072 locks this down. The failure variant carries +the same identity + request-side fields as the completion variant, +minus the response-side fields (`response_id`, `response_model`, +`usage`, `output_content`, `finish_reason`) — there was no response +to record. + +A custom `Provider` that wants observers to see the same events +dispatches `LlmCompletionEvent(...)` on success and +`LlmFailedEvent(...)` alongside the §7 category exception on failure +via `current_dispatch()`. See +[Authoring providers](../model-providers/authoring.md) for the full +pattern. + +#### Legacy sentinel-namespace pattern (compatibility surface) `openarmature.observability.LLM_NAMESPACE` and -`openarmature.observability.LlmEventPayload` are part of the public -API. A custom observer subscribing to the dispatch stream can -recognize the LLM-event sentinel namespace and read the typed -payload directly: +`openarmature.observability.LlmEventPayload` remain in the public +API as a documented compatibility surface for custom providers and +observers that haven't migrated to typed events. The bundled +`OpenAIProvider` no longer emits the sentinel `NodeEvent` pair; the +bundled OTel and Langfuse observers no longer recognize it. If +you're writing a downstream observer that needs to interoperate +with custom providers still using the sentinel pattern, the legacy +shape is: ```python from openarmature.observability import LLM_NAMESPACE, LlmEventPayload -async def my_llm_observer(event): +async def legacy_llm_observer(event): if event.namespace != LLM_NAMESPACE: return payload = event.pre_state if not isinstance(payload, LlmEventPayload): return - # payload.model, payload.input_messages (already image-redacted), - # payload.output_content, payload.request_params, - # payload.response_id, payload.active_prompt, ... + # payload.model, payload.input_messages, … ``` -A custom `Provider` that wants to participate in the same span -emission protocol dispatches `NodeEvent(namespace=LLM_NAMESPACE, -pre_state=LlmEventPayload(...))` via `current_dispatch()`. See -[Authoring providers](../model-providers/authoring.md) for the -full pattern. +New code should prefer the typed-event path above. ### Flushing under fast teardown diff --git a/docs/model-providers/authoring.md b/docs/model-providers/authoring.md index 4fc81a4..361b38f 100644 --- a/docs/model-providers/authoring.md +++ b/docs/model-providers/authoring.md @@ -216,71 +216,132 @@ of: raise `StructuredOutputInvalid` on parse or validation failure. Use `validate_response_schema` and `strict_mode_supported` from `openarmature.llm` to share the provider-agnostic boundary checks. -- **Observability spans.** Opt-in `started`/`completed` events - around the wire call so the OTel observer can build LLM spans. - Dispatch via the public `openarmature.observability.LLM_NAMESPACE` - sentinel and the typed `LlmEventPayload`. The sketch below is - what lives around the wire call inside `complete()`; the - `OpenAIProvider`'s `_make_llm_event` helper is the reference - implementation: +- **Observability events.** Opt-in typed-event dispatch around the + wire call so the bundled OTel and Langfuse observers (plus any + custom observer using type discrimination) can build LLM spans + and Generation observations. Dispatch + `openarmature.graph.events.LlmCompletionEvent` on successful + calls and `LlmFailedEvent` alongside any `LlmProviderError` your + adapter raises. The sketch below is the success and failure + emission shape inside `complete()`; the bundled + `OpenAIProvider._build_llm_completion_event` and + `_build_llm_failed_event` are the reference implementations. ```python + import time import uuid - from typing import Any - - from openarmature.graph.events import NodeEvent - from openarmature.observability import LLM_NAMESPACE, LlmEventPayload - from openarmature.observability.correlation import current_dispatch - - - def dispatch_llm_event( - phase: str, - *, - call_id: str, - model: str, - **extra: Any, - ) -> None: - """Emit one half of the started/completed pair. The same - ``call_id`` MUST appear on both events so observers can match - them under concurrency.""" + + from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent + from openarmature.llm.errors import LlmProviderError + from openarmature.observability.correlation import ( + current_attempt_index, + current_branch_name, + current_correlation_id, + current_dispatch, + current_fan_out_index, + current_invocation_id, + current_namespace_prefix, + ) + + + async def complete(self, messages, /, *, tools=None, config=None): dispatch = current_dispatch() - if dispatch is None: - return - dispatch(NodeEvent( - node_name="openarmature.llm.complete", - namespace=LLM_NAMESPACE, - step=-1, - phase=phase, - pre_state=LlmEventPayload( + call_id = str(uuid.uuid4()) # fresh per call (per-attempt under retry) + adapter_start = time.perf_counter() + + # Capture request-side data ONCE so both success and failure + # paths populate the typed event from the same projection. + serialized_messages: list[dict] = [] # image-redacted; see below + request_params = self._project_request_params(config) + request_extras = self._project_request_extras(config) + + try: + serialized_messages = self._serialize_messages(messages) + response = await self._wire_call(messages, tools, config) + except LlmProviderError as exc: + # Alongside the exception — caller-side flow unchanged + # (the exception still raises out of complete()). + if dispatch is not None: + latency_ms = (time.perf_counter() - adapter_start) * 1000.0 + dispatch(LlmFailedEvent( + invocation_id=current_invocation_id() or "", + correlation_id=current_correlation_id(), + node_name=(current_namespace_prefix() or ("",))[-1], + namespace=current_namespace_prefix(), + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + provider="my-provider", + model=self.model, + latency_ms=latency_ms, + input_messages=serialized_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=None, + active_prompt_group=None, + call_id=call_id, + error_category=exc.category, + error_type=type(exc).__name__, + error_message=str(exc), + )) + raise + + if dispatch is not None: + latency_ms = (time.perf_counter() - adapter_start) * 1000.0 + dispatch(LlmCompletionEvent( + invocation_id=current_invocation_id() or "", + correlation_id=current_correlation_id(), + node_name=(current_namespace_prefix() or ("",))[-1], + namespace=current_namespace_prefix(), + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + provider="my-provider", + model=self.model, + response_id=response.response_id, + response_model=response.response_model, + usage=response.usage, + latency_ms=latency_ms, + finish_reason=response.finish_reason, + input_messages=serialized_messages, + output_content=response.message.content or None, + request_params=request_params, + request_extras=request_extras, + active_prompt=None, + active_prompt_group=None, call_id=call_id, - model=model, - genai_system="my-provider", - **extra, - ), - post_state=None, - error=None, - parent_states=(), - )) - - - # Inside Provider.complete(), the call_id is minted once per call: - call_id = str(uuid.uuid4()) - dispatch_llm_event("started", call_id=call_id, model="my-model") - # ... wire call here, populating finish_reason / usage / output ... - dispatch_llm_event( - "completed", - call_id=call_id, - model="my-model", - finish_reason="stop", - ) + )) + return response ``` + Two contracts the bundled provider follows that custom providers + SHOULD match: + + - **Mutual exclusion.** A single `complete()` call emits exactly + one `LlmCompletionEvent` (success) **or** exactly one + `LlmFailedEvent` (failure) — never both. Conformance fixture 072 + locks this down. + - **Exception-flow preservation.** `LlmFailedEvent` is dispatched + **alongside** the §7 exception, not in place of it. The + exception still raises out of `complete()`; caller-side error + handling is unchanged. The typed event is on the observer + queue. + Inline image bytes MUST be redacted in the provider's - serialization step before reaching the payload (see + serialization step before reaching the typed event's + `input_messages` field (see [Observability: Inline image redaction](../concepts/observability.md#inline-image-redaction-always-on)) - so custom observers consuming `LlmEventPayload` cannot leak raw - bytes. + so observers consuming the typed events cannot leak raw bytes. + + **Legacy sentinel-namespace pattern** (`LLM_NAMESPACE` + + `LlmEventPayload` dispatched as a `NodeEvent` pair) remains in + the public API as a documented compatibility surface. The bundled + `OpenAIProvider` retired it in v0.13.0; the bundled OTel and + Langfuse observers no longer recognize it. New providers should + emit typed events directly; the sentinel pattern is preserved for + providers shipped before the v0.13.0 migration that haven't yet + switched. - **Lenient response parsing** under `finish_reason="error"`. Degraded responses surface what they can; tool-call arguments that fail to parse populate `arguments=None` instead of raising. From fdb89b368b797ffa3b909cddde615743bdeee06c Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 9 Jun 2026 19:13:39 -0700 Subject: [PATCH 3/7] Regenerate AGENTS.md for typed-event doc migration The non-obvious-shapes doc migration changed a generator source without regenerating the committed AGENTS.md. Bring it back in sync so the drift guard passes. --- src/openarmature/AGENTS.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index c1e65aa..e93024b 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1456,7 +1456,7 @@ Catching `Exception` works but is too broad; catching one hierarchy misses the o ### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes -OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: +OA emits observer events under sentinel node-names for some internal dispatch: `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014) and `openarmature.checkpoint.save` for checkpoint saves (proposal 0010) ride on `NodeEvent` with a sentinel namespace. (LLM provider calls used to follow the same pattern but moved to typed `LlmCompletionEvent` / `LlmFailedEvent` variants in v0.13.0 per proposals 0049 + 0058 — those are filtered by `isinstance` instead.) The sentinel-namespace events let the OTel / Langfuse observers emit checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: ```python async def __call__(self, event: NodeEvent) -> None: @@ -1466,7 +1466,7 @@ async def __call__(self, event: NodeEvent) -> None: # … user-node handling ``` -`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. +`event.namespace[0]` is the safest discriminator. Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. ### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field` From ebb9672eecdec079d077d0df20c9a4efb5bc4595 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 9 Jun 2026 19:14:42 -0700 Subject: [PATCH 4/7] Extend production-observability example for v0.13.0 Add an LlmFailureTracker observer that consumes the typed LlmFailedEvent and rolls up per-invocation error-category counts, and extend LlmUsageAccumulator to track cached_tokens and report a cache-hit ratio. The persist node now reports both rollups and the OTel formatter surfaces the cache-read attribute. Also drop spec/proposal references and em dashes from the example's comments and walk-through, which carry no meaning for end users reading the code. --- docs/examples/production-observability.md | 120 +++++---- examples/production-observability/main.py | 314 +++++++++++++++------- 2 files changed, 297 insertions(+), 137 deletions(-) diff --git a/docs/examples/production-observability.md b/docs/examples/production-observability.md index d4bf22d..26cb97f 100644 --- a/docs/examples/production-observability.md +++ b/docs/examples/production-observability.md @@ -9,17 +9,18 @@ graph, caller hooks deriving domain-shaped `trace.input` / `trace.output` from State, the built-in `TimingMiddleware` recording per-node duration, multi-tenant caller-supplied metadata propagating to both observers in one `invoke()` call, AND -a third queryable-accumulator observer that a terminal `persist` -node reads at request scope after synchronizing on the deliver -loop with `drain_events_for`. +two queryable-accumulator observers (one for successful-call token +usage incl. cache hits, one for failure-category attribution) that +a terminal `persist` node reads at request scope after +synchronizing on the deliver loop with `drain_events_for`. ## Overview -Two nodes (`respond` then `persist`), one LLM call, three observers +Two nodes (`respond` then `persist`), one LLM call, four observers attached before invoke. The pipeline takes a question, calls the LLM, returns the answer, then synchronizes on the observer queue -and rolls up token cost. The interesting part is the observability -wiring: +and rolls up token cost + failure attribution. The interesting +part is the observability wiring: - `OTelObserver` attached with an `InMemorySpanExporter` (production swaps this for `BatchSpanProcessor` + @@ -84,7 +85,7 @@ sees the same logical events represented two ways. observer call surface doesn't change. - **Queryable accumulator + `drain_events_for`** ([queryable observer pattern](../concepts/observability.md)). - A third observer — `LlmUsageAccumulator` — subscribes to the + A third observer (`LlmUsageAccumulator`) subscribes to the same event stream but only records the typed `LlmCompletionEvent` variant (one event per successful LLM call; outcome fields read directly off the event). It accumulates @@ -102,28 +103,44 @@ sees the same logical events represented two ways. count through State (a workaround that pollutes the state schema with non-pipeline data). - The filter shape is `isinstance(event, LlmCompletionEvent)` — + The filter shape is `isinstance(event, LlmCompletionEvent)`, one isinstance check against the typed event variant on the - observer event union. The provider also dual-emits a sentinel - `NodeEvent` pair during the transition period for backwards - compatibility with older accumulators; this example's - accumulator ignores the sentinel pair because the typed event - carries the same outcome data without the pair-join logic. New - accumulators should follow the isinstance-based filter shape - here; the CHANGELOG tracks when the sentinel emission is - removed. + observer event union. New accumulators should follow this + shape; the typed event carries every outcome field directly, + no namespace-string-match + payload-narrow dance against the + older sentinel-event family. - `LlmCompletionEvent` is success-only by spec design. Failed LLM - calls flow through the exception path and do not emit the typed - event, so `bucket.call_count` reflects successful calls only. - This is the right semantic for a usage accumulator (failed - calls produce no tokens). A pipeline tracking attempt-level - failure rates needs a separate listener — either a custom - observer on the sentinel `NodeEvent` pair or a future - failure-event typed variant if and when that proposal lands. - Production code migrating an existing accumulator from the - sentinel pattern should expect this counting shift if it was - previously counting failure-path events. + `LlmCompletionEvent` is a success-only event. Failed LLM + calls flow through the exception path and emit the parallel + `LlmFailedEvent` variant, so `bucket.call_count` reflects + successful calls only. This is the right semantic for a usage + accumulator (failed calls produce no tokens); the + `LlmFailureTracker` (below) is the listener that owns + attempt-level failure rates. + + The bucket also tracks `usage.cached_tokens` so the persist + node can print a cache-hit ratio. Backends with prefix caching + (vLLM `--enable-prefix-caching`, Anthropic prompt caching, + OpenAI's `prompt_token_usage` cache report when enabled) + populate the field; backends without cache visibility leave it + `None` and the ratio degrades to 0% gracefully. The cache-stat + fields surface both on the typed event's `Usage` and on the OTel + LLM span's `openarmature.llm.cache_read.input_tokens` attribute. +- **Failure-category tracker** + ([typed failure event](../concepts/observability.md)). + A fourth observer (`LlmFailureTracker`) subscribes to + `LlmFailedEvent`, the typed failure-side counterpart to + `LlmCompletionEvent`. The provider emits exactly one of + (`LlmCompletionEvent`, `LlmFailedEvent`) per LLM call, never + both. The tracker maintains a per-invocation + `{error_category: count}` bucket keyed by the nine canonical + category strings (`provider_rate_limit`, `provider_unavailable`, + `provider_invalid_model`, etc.), and the persist node prints + the per-category breakdown alongside the usage rollup. This + closes the symmetric-attribution gap: operators see "this tenant + had 4 successful calls (cost X) plus 1 rate-limit failure" at + request scope, without having to cross-join the OTel exception + spans or wait for a metrics backend to roll up. ## How to run @@ -152,12 +169,13 @@ request id: feature flag:v2-canary [timing] respond: 1234.5ms (success) -[persist] LLM usage: prompt=42, completion=38, total=80 across 1 call(s) +[persist] LLM usage: prompt=42 (cached=0, 0.0% hit), completion=38, total=80 across 1 call(s) +[persist] LLM failures: none answer: The primary objective of Apollo 11 was ... model: gpt-4o-mini-2024-07-18 --- captured OTel spans --- - [openarmature.invocation] 1240.0ms openarmature.graph.entry_node='respond', openarmature.graph.spec_version='0.46.0', openarmature.implementation.name='openarmature-python', openarmature.implementation.version='0.12.0' + [openarmature.invocation] 1240.0ms openarmature.graph.entry_node='respond', openarmature.graph.spec_version='0.53.0', openarmature.implementation.name='openarmature-python', openarmature.implementation.version='0.13.0' [respond] 1235.0ms openarmature.node.name='respond', openarmature.user.tenantId='demo-acme', ... [openarmature.llm.complete] 1200.0ms openarmature.user.tenantId='demo-acme', gen_ai.system='openai', gen_ai.usage.input_tokens=42, ... [persist] 2.0ms openarmature.node.name='persist', openarmature.user.tenantId='demo-acme', ... @@ -184,28 +202,40 @@ Trace id= as `outcome="exception"` with `exception_category="provider_rate_limit"`. - **`[persist] LLM usage: ...`**: emitted by the `persist` node after it drains the deliver loop and reads the - `LlmUsageAccumulator`'s bucket for this invocation. If the drain - times out (slow / hung observer), the persist line is prefixed by - a `[persist] drain incomplete: N events still pending after 2.0s` - surface — the production version of that log would also flip an - SLO-breach metric. + `LlmUsageAccumulator`'s bucket for this invocation. The + `cached=N, X.X% hit` segment is the ratio of cache-read input + tokens to total prompt tokens for the invocation, sourced from + `usage.cached_tokens`. OpenAI's `gpt-4o-mini` (the default + model) reports zero cache hits unless `prompt_token_usage` cache + reporting is explicitly enabled; vLLM with + `--enable-prefix-caching` or Anthropic with prompt caching will + show real cache attribution against repeated prefixes. If the + drain times out (slow / hung observer), the persist line is + prefixed by a `[persist] drain incomplete: N events still + pending after 2.0s` surface, and the production version of that + log would also flip an SLO-breach metric. +- **`[persist] LLM failures: ...`**: emitted by the `persist` + node after reading the `LlmFailureTracker`'s bucket. On a + success-only invocation the line reads `none`; on a run with + retried provider errors it reads e.g. `provider_rate_limit=2, + provider_unavailable=1` with categories ordered noisiest-first. + Because every LLM call emits exactly one of `LlmCompletionEvent` + or `LlmFailedEvent` and never both, the bucket counts attempts + the success-side accumulator did NOT see, which is the right + shape for retry-rate dashboards. - **OTel spans block**: one line per captured span, sorted by start time. The relevant attributes shown are a curated subset for readability; the full attribute set is on each `Span` object - for any reader inspecting them programmatically. The - `openarmature.llm.complete` span name + the `gen_ai.usage.*` - attribute family come from the OTel observer's current - sentinel-`NodeEvent` handler — the OTel and Langfuse observers - have not yet migrated to consuming the typed `LlmCompletionEvent` - variant. Span names and attribute paths may shift when the - observer migration lands; the example's emitted span structure - tracks the current observer behavior. Note three attribute - families worth telling apart: + for any reader inspecting them programmatically. On runs against + a cache-reporting backend the LLM span also carries + `openarmature.llm.cache_read.input_tokens` (the OA-namespace + cache attribute). Note three attribute families worth telling + apart: - The root `openarmature.invocation` span carries `openarmature.graph.spec_version` plus the `openarmature.implementation.name` / `.version` attribution - attributes. These are invocation-span-only (per spec §5.1) — - operators filtering by library version use these. + attributes. These are invocation-span-only; operators + filtering by library version use these. - The `openarmature.user.*` attributes appear on every span, reflecting the cross-cutting propagation from `invoke(metadata=...)`. diff --git a/examples/production-observability/main.py b/examples/production-observability/main.py index 2e904c7..8f82efc 100644 --- a/examples/production-observability/main.py +++ b/examples/production-observability/main.py @@ -13,15 +13,15 @@ **Demonstrates (mapped to shipped features):** -- Dual observers on one graph (proposal 0031 + the no-double-export - posture from the README pitch). Both consume the same NodeEvent - stream independently; nothing in node code knows there are two. +- Dual observers on one graph, with no double-export between them. + Both consume the same NodeEvent stream independently; nothing in + node code knows there are two. - ``trace_input_from_state`` / ``trace_output_from_state`` caller - hooks on ``LangfuseObserver`` (proposal 0043 §8.4.1). The hooks - derive domain dicts (``{"question": ...}`` / ``{"answer": ..., - "model": ...}``) instead of letting the observer dump the raw - State. Production teams use this to keep PII out of trace - payloads while still surfacing operational signal. + hooks on ``LangfuseObserver``. The hooks derive domain dicts + (``{"question": ...}`` / ``{"answer": ..., "model": ...}``) + instead of letting the observer dump the raw State. Production + teams use this to keep PII out of trace payloads while still + surfacing operational signal. - Built-in ``TimingMiddleware`` from ``openarmature.graph`` wrapping the respond node. An ``on_complete`` callback receives a ``TimingRecord(node_name, duration_ms, outcome, @@ -43,7 +43,9 @@ observer call surface doesn't change. - **Queryable accumulator observer + per-invocation drain.** A third observer (``LlmUsageAccumulator``) rolls up LLM token - totals per invocation. A terminal ``persist`` node calls + totals per invocation, including a cache-hit ratio from + ``usage.cached_tokens`` for backends with prefix caching (vLLM, + Anthropic, etc.). A terminal ``persist`` node calls ``await graph.drain_events_for(current_invocation_id(), timeout=2.0)`` to synchronize on the deliver loop, then reads the accumulator's bucket and drops it. Without the drain, the bucket would be @@ -56,6 +58,18 @@ ``Observer`` itself stays a single-callable protocol; the queryable accumulator just exposes its own read methods (``get_bucket`` / ``drop``) that the persist node knows about. +- **Failure-category tracker observer.** A fourth observer + (``LlmFailureTracker``) subscribes to ``LlmFailedEvent``, the + typed failure-side counterpart to ``LlmCompletionEvent``, fired + once per LLM call that fails with a provider error category. The + tracker maintains a per-invocation ``{category: count}`` bucket; + the persist node reads + reports it alongside the usage rollup. + Together the two observers give operators success/failure + attribution at request scope without joining against external + trace storage. The provider emits exactly one of + (``LlmCompletionEvent``, ``LlmFailedEvent``) per LLM call, never + both, so attempt counts derive cleanly as + ``usage.call_count + sum(failure.by_category.values())``. Complementary to the observer-hooks example (three observers side-by-side) and the langfuse-observability example (Langfuse @@ -101,6 +115,7 @@ GraphBuilder, InvocationCompletedEvent, LlmCompletionEvent, + LlmFailedEvent, NodeException, ObserverEvent, State, @@ -164,30 +179,30 @@ class BriefingState(State): # for accumulators. # # The accumulator subscribes to every event but only records the -# typed ``LlmCompletionEvent`` variant — one event per successful LLM -# call, structured outcome fields read directly off the event without -# the namespace-string-match + payload-narrow dance the legacy -# sentinel pattern needed. The provider also dual-emits a sentinel -# ``NodeEvent`` pair during the transition period for backwards -# compatibility with older accumulators; this accumulator ignores -# the sentinel pair because the typed event carries the same outcome -# data without the pair-join logic. New accumulators should follow -# the isinstance-based filter shape here; the CHANGELOG tracks when -# the sentinel emission is removed. +# typed ``LlmCompletionEvent`` variant (one event per successful LLM +# call), structured outcome fields read directly off the event without +# the namespace-string-match + payload-narrow dance older sentinel- +# based filters needed. # -# Per-invocation isolation is by ``LlmCompletionEvent.invocation_id`` -# — read directly off the event, no ContextVar lookup needed. +# Per-invocation isolation is by ``LlmCompletionEvent.invocation_id``, +# read directly off the event, no ContextVar lookup needed. # Concurrent invocations on one observer each get their own bucket. # -# ``LlmCompletionEvent`` is success-only by spec design. Failed LLM -# calls flow through the exception path and do NOT emit the typed -# event, so ``bucket.call_count`` here reflects successful calls -# only. This is the right semantic for a usage accumulator (failed -# calls produce no tokens / cost). A pipeline tracking attempt-level -# failure rates needs a separate listener — either a custom observer -# on the sentinel ``NodeEvent`` pair, or a future -# ``LlmCallFailedEvent`` typed variant if and when that proposal -# lands. +# Cache-stat tracking: the bucket also rolls up ``usage.cached_tokens`` +# when the provider reports it. Backends with prefix caching (vLLM +# ``--enable-prefix-caching``, Anthropic prompt caching, OpenAI's +# ``prompt_token_usage`` cache report when enabled) populate the +# field; backends without cache visibility leave it ``None`` and the +# accumulator records zero. The persist node prints a cache-hit +# ratio so operators see whether prefix caching is paying off at +# request scope without having to cross-join Langfuse rows. +# +# ``LlmCompletionEvent`` is a success-only event. Failed LLM calls +# flow through the exception path and emit the parallel +# ``LlmFailedEvent`` variant (see ``LlmFailureTracker`` below), so +# ``bucket.call_count`` here reflects successful calls only. This is +# the right semantic for a usage accumulator (failed calls produce +# no tokens / cost). @dataclass @@ -195,6 +210,7 @@ class _UsageBucket: prompt_tokens: int = 0 completion_tokens: int = 0 total_tokens: int = 0 + cached_tokens: int = 0 call_count: int = 0 @@ -220,21 +236,21 @@ async def __call__(self, event: ObserverEvent) -> None: if not isinstance(event, LlmCompletionEvent): return # call_count tracks successful LLM calls (the typed event is - # success-only by spec design). Spec contract has "call - # happened" and "usage reported" as INDEPENDENT — a provider - # may legitimately omit usage on a successful call. Create the - # bucket and increment call_count unconditionally so the - # counter reflects all successful calls; gate only the - # token-counting math on usage being populated. + # emitted on success only). "Call happened" and "usage + # reported" are independent; a provider may legitimately omit + # usage on a successful call. Create the bucket and increment + # call_count unconditionally so the counter reflects all + # successful calls; gate only the token-counting math on usage + # being populated. bucket = self._by_invocation.setdefault(event.invocation_id, _UsageBucket()) bucket.call_count += 1 - # The typed event's usage field is nullable per the spec - # contract ("may be null when the provider does not report - # usage"). Python's provider always passes a Usage instance - # (with all-None fields when not reported), but the defensive - # guard keeps the accumulator robust against future providers - # that exercise the null option. Calls without reported usage - # contribute zero tokens (the only honest value we can record). + # The typed event's usage field is nullable (it may be ``None`` + # when the provider does not report usage). Python's provider + # always passes a Usage instance (with all-None fields when not + # reported), but the defensive guard keeps the accumulator + # robust against future providers that exercise the null + # option. Calls without reported usage contribute zero tokens + # (the only honest value we can record). usage = event.usage if usage is None: return @@ -251,9 +267,16 @@ async def __call__(self, event: ObserverEvent) -> None: bucket.total_tokens += usage.total_tokens elif usage.prompt_tokens is not None or usage.completion_tokens is not None: bucket.total_tokens += (usage.prompt_tokens or 0) + (usage.completion_tokens or 0) + # Cache-stat fields are populated only when the provider + # reports them. Backends without prefix-cache visibility + # leave ``cached_tokens`` ``None`` and the bucket records + # zero; the persist node's cache-hit ratio degrades to 0% + # gracefully in that case. + if usage.cached_tokens is not None: + bucket.cached_tokens += usage.cached_tokens # Consumers MUST synchronize on ``drain_events_for`` before - # calling ``get_bucket`` if completeness matters — without the + # calling ``get_bucket`` if completeness matters; without the # drain the deliver loop may still hold pending events whose # tokens have not been added yet. ``None`` is returned when # nothing has been recorded yet (e.g., an invocation with no @@ -263,7 +286,7 @@ def get_bucket(self, invocation_id: str) -> _UsageBucket | None: return self._by_invocation.get(invocation_id) # Bucket lifecycle is two-step. Fast path: a terminal node calls - # ``drop()`` immediately after reading via ``get_bucket()`` — + # ``drop()`` immediately after reading via ``get_bucket()``; # that's the normal case and runs while the invocation is still # active. Backstop: the accumulator's ``__call__`` also drops # any bucket still present when ``InvocationCompletedEvent`` @@ -276,21 +299,91 @@ def drop(self, invocation_id: str) -> None: self._by_invocation.pop(invocation_id, None) +# --------------------------------------------------------------------------- +# Failure tracker (per-invocation LLM error-category rollup) +# --------------------------------------------------------------------------- +# Parallel queryable observer for the failure path. Subscribes to +# ``LlmFailedEvent``, the typed counterpart to ``LlmCompletionEvent``, +# fired exactly once for every LLM call that fails with a provider +# error category. Every LLM call emits either one +# ``LlmCompletionEvent`` (success) or one ``LlmFailedEvent`` +# (failure), never both, so the tracker can count attempt-level +# failures by category without joining against the success-side +# accumulator. +# +# This is the listener the success-side accumulator delegates to: +# ``LlmUsageAccumulator.bucket.call_count`` counts successful calls +# only. Operators wanting attempt-level failure rates (e.g. ``how +# often did this tenant's calls land on a rate-limited provider this +# hour?``) read the failure tracker's bucket alongside the usage +# accumulator's bucket and compute the ratio at request scope. +# +# Bucket shape is a per-category counter: ``{"provider_rate_limit": 2, +# "provider_unavailable": 1, ...}``. ``error_category`` is one of the +# nine canonical category strings, so the dict keys form a stable, +# greppable vocabulary across providers. ``error_type`` (vendor- +# specific code) and ``error_message`` are NOT recorded here; the +# tracker's job is rate / category attribution at request scope, not +# vendor-error forensics; that lives in the OTel + Langfuse spans +# where the full exception detail is captured. + + +@dataclass +class _FailureBucket: + # ``dict[category, count]`` keyed by the canonical + # ``error_category`` strings. Total attempts can be derived as + # ``sum(by_category.values())``; the tracker doesn't carry a + # separate counter for it. + by_category: dict[str, int] + + @classmethod + def empty(cls) -> _FailureBucket: + return cls(by_category={}) + + +class LlmFailureTracker: + """Per-invocation LLM failure-category rollup.""" + + def __init__(self) -> None: + self._by_invocation: dict[str, _FailureBucket] = {} + + async def __call__(self, event: ObserverEvent) -> None: + # Backstop cleanup mirrors LlmUsageAccumulator's pattern so + # late-delivered failure events after a partial drain + # cannot leak a bucket. + if isinstance(event, InvocationCompletedEvent): + self._by_invocation.pop(event.invocation_id, None) + return + if not isinstance(event, LlmFailedEvent): + return + bucket = self._by_invocation.setdefault(event.invocation_id, _FailureBucket.empty()) + bucket.by_category[event.error_category] = bucket.by_category.get(event.error_category, 0) + 1 + + def get_bucket(self, invocation_id: str) -> _FailureBucket | None: + """Read the accumulated failure bucket for an invocation.""" + return self._by_invocation.get(invocation_id) + + def drop(self, invocation_id: str) -> None: + """Release the failure bucket for an invocation.""" + self._by_invocation.pop(invocation_id, None) + + # Module-level singletons make the persist node closure-free and # match how ``_provider_instance`` is handled. In an application # server, these would live on a request-scoped or app-scoped # container instead. _accumulator: LlmUsageAccumulator | None = None +_failure_tracker: LlmFailureTracker | None = None _compiled_graph: CompiledGraph[BriefingState] | None = None # --------------------------------------------------------------------------- # Caller hooks for Langfuse trace.input / trace.output # --------------------------------------------------------------------------- -# Per proposal 0043 §8.4.1, the LangfuseObserver lets callers derive -# domain-shaped trace.input and trace.output from State rather than -# letting the framework dump the raw State object. The hooks fire -# once per invocation: trace_input_from_state on InvocationStartedEvent, +# The LangfuseObserver lets callers derive domain-shaped trace.input +# and trace.output from State rather than letting the framework dump +# the raw State object. The hooks fire once per invocation: +# trace_input_from_state on InvocationStartedEvent, # trace_output_from_state on InvocationCompletedEvent. Production # teams use this to keep PII out of trace payloads while still # surfacing the operational signal a Langfuse UI viewer needs. @@ -354,19 +447,19 @@ async def respond(state: BriefingState) -> dict[str, Any]: } -# Terminal node. State is intentionally unused — this node's job is +# Terminal node. State is intentionally unused: this node's job is # to synchronize on the observer deliver loop and report a derived # rollup, not to read or modify pipeline state. # # ``drain_events_for`` blocks until every event dispatched up to this # point has reached every attached observer. Without it the # accumulator's bucket may still be missing the most-recent LLM -# event's tokens — the deliver loop hasn't processed them yet when -# the node body runs. Snapshot semantic: the drain awaits only +# event's tokens, since the deliver loop hasn't processed them yet +# when the node body runs. Snapshot semantic: the drain awaits only # events dispatched BEFORE the call (this node's own ``started`` # event included), not events that fire after the call begins # (notably this node's own ``completed`` event, which only fires -# after the body returns — that's how the call avoids deadlocking +# after the body returns; that's how the call avoids deadlocking # on itself). # # Default timeout is 5.0s; the demo tightens to 2.0s so a stuck @@ -375,15 +468,15 @@ async def respond(state: BriefingState) -> dict[str, Any]: # lets the caller record an SLO breach and proceed with whatever # data is available, rather than failing the whole invocation. async def persist(_state: BriefingState) -> dict[str, Any]: - """Drain the deliver loop, read the LLM-usage rollup, drop the bucket.""" + """Drain the deliver loop, read the LLM-usage + failure rollups, drop the buckets.""" # Use explicit RuntimeError rather than ``assert`` so the failure # mode stays informative under ``python -O`` (which strips asserts # and would otherwise turn these into silent ``None`` dereferences # at the next attribute access). - if _compiled_graph is None or _accumulator is None: + if _compiled_graph is None or _accumulator is None or _failure_tracker is None: raise RuntimeError( - "persist node requires _compiled_graph and _accumulator to be set " - "before invoke() — see build_graph() for the initialization pattern" + "persist node requires _compiled_graph, _accumulator, and _failure_tracker " + "to be set before invoke(); see build_graph() for the initialization pattern" ) invocation_id = current_invocation_id() if invocation_id is None: @@ -394,21 +487,42 @@ async def persist(_state: BriefingState) -> dict[str, Any]: # gap inline so a reader sees what an incomplete drain looks # like. print(f"[persist] drain incomplete: {summary.undelivered_count} events still pending after 2.0s") - bucket = _accumulator.get_bucket(invocation_id) + # Read both buckets up front so the drop calls run in pairs and + # neither bucket leaks if a later print raises. + usage_bucket = _accumulator.get_bucket(invocation_id) + failure_bucket = _failure_tracker.get_bucket(invocation_id) _accumulator.drop(invocation_id) - if bucket is None: - print("[persist] no LLM usage recorded for this invocation") - return {} + _failure_tracker.drop(invocation_id) # In production, this is where you'd write the canonical # invocation artifact to durable storage: a JSON record with the - # answer + per-invocation token cost + caller metadata + trace - # IDs for cross-system join. The demo prints the rollup so the - # pattern is legible. - print( - f"[persist] LLM usage: prompt={bucket.prompt_tokens}, " - f"completion={bucket.completion_tokens}, total={bucket.total_tokens} " - f"across {bucket.call_count} call(s)" - ) + # answer + per-invocation token cost + cache-hit ratio + failure + # category counts + caller metadata + trace IDs for cross-system + # join. The demo prints the rollups so the pattern is legible. + if usage_bucket is None: + print("[persist] no LLM usage recorded for this invocation") + else: + # Cache-hit ratio is ``cached_tokens / prompt_tokens`` when the + # provider reports cache stats. Backends without cache + # visibility report ``cached_tokens=0``; the ratio degrades + # to 0% gracefully without special-casing. + if usage_bucket.prompt_tokens > 0: + cache_hit_pct = (usage_bucket.cached_tokens / usage_bucket.prompt_tokens) * 100 + else: + cache_hit_pct = 0.0 + print( + f"[persist] LLM usage: prompt={usage_bucket.prompt_tokens} " + f"(cached={usage_bucket.cached_tokens}, {cache_hit_pct:.1f}% hit), " + f"completion={usage_bucket.completion_tokens}, " + f"total={usage_bucket.total_tokens} " + f"across {usage_bucket.call_count} call(s)" + ) + if failure_bucket is None or not failure_bucket.by_category: + print("[persist] LLM failures: none") + else: + # Sort by count descending so the noisiest category leads. + ordered = sorted(failure_bucket.by_category.items(), key=lambda kv: (-kv[1], kv[0])) + summary_str = ", ".join(f"{cat}={n}" for cat, n in ordered) + print(f"[persist] LLM failures: {summary_str}") return {} @@ -428,18 +542,20 @@ def build_graph() -> CompiledGraph[BriefingState]: (RetryMiddleware lives in the fan-out-with-retry / parallel- branches examples; this one's scope is observability). - The ``LlmUsageAccumulator`` is constructed, attached to the - compiled graph, and registered to the module-level singletons - so the persist node (which reads from globals to stay closure- - free) can reach it without help from the caller. The factory - is self-contained — ``graph = build_graph(); await graph.invoke(...)`` - works on its own. OTel + Langfuse observers are NOT attached - here; those are observability-stack choices made at the call - site, and ``main()`` attaches them after build_graph() returns. + The ``LlmUsageAccumulator`` and ``LlmFailureTracker`` are + constructed, attached to the compiled graph, and registered to + the module-level singletons so the persist node (which reads + from globals to stay closure-free) can reach them without help + from the caller. The factory is self-contained: ``graph = + build_graph(); await graph.invoke(...)`` works on its own. OTel + + Langfuse observers are NOT attached here; those are + observability-stack choices made at the call site, and + ``main()`` attaches them after build_graph() returns. """ - global _accumulator, _compiled_graph + global _accumulator, _failure_tracker, _compiled_graph timing = TimingMiddleware(node_name="respond", on_complete=_emit_timing) _accumulator = LlmUsageAccumulator() + _failure_tracker = LlmFailureTracker() _compiled_graph = ( GraphBuilder(BriefingState) .add_node("respond", respond, middleware=[timing]) @@ -450,6 +566,7 @@ def build_graph() -> CompiledGraph[BriefingState]: .compile() ) _compiled_graph.attach_observer(_accumulator) + _compiled_graph.attach_observer(_failure_tracker) return _compiled_graph @@ -467,7 +584,8 @@ def build_graph() -> CompiledGraph[BriefingState]: # Caller hooks attach to LangfuseObserver via constructor kwargs. # ``disable_llm_payload=False`` opts in to capturing the input # messages + output content on Generation observations so the demo -# output is meaningful; default-True is the spec privacy posture. +# output is meaningful; the default-True is the privacy-preserving +# setting. def _build_otel_observer(exporter: InMemorySpanExporter) -> OTelObserver: @@ -505,10 +623,10 @@ def _build_langfuse_observer(client: InMemoryLangfuseClient) -> LangfuseObserver # have ingested. -# Invocation-span-only attributes (spec 5.1). Surface these only on -# the root ``openarmature.invocation`` span line; inner spans don't -# carry them (they're invocation-level constants, not cross-cutting -# 5.6 attributes). +# Invocation-level attributes. Surface these only on the root +# ``openarmature.invocation`` span line; inner spans don't carry them +# (they're invocation-level constants, not cross-cutting per-node +# attributes). _INVOCATION_SPAN_KEYS = ( "openarmature.graph.entry_node", "openarmature.graph.spec_version", @@ -516,10 +634,21 @@ def _build_langfuse_observer(client: InMemoryLangfuseClient) -> LangfuseObserver "openarmature.implementation.version", ) -# Per-node + cross-cutting attributes (5.6 + GenAI semconv). Surface +# Per-node + cross-cutting attributes. The ``gen_ai.*`` family +# follows the OpenTelemetry GenAI semantic conventions. Surface # these on inner-node spans only; they propagate to the invocation # span too but showing them there is redundant once they appear on # every node line below. +# +# ``openarmature.llm.cache_read.input_tokens`` is the OA-namespace +# cache-stat attribute. Lands on the LLM span only and only when the +# provider reports cache hits. Backends with prefix caching (vLLM +# ``--enable-prefix-caching``, Anthropic prompt caching, OpenAI's +# ``prompt_token_usage`` cache report when enabled) populate it; +# OpenAI's default ``gpt-4o-mini`` configuration leaves it absent. +# The formatter omits the entry when absent rather than showing +# ``None`` so a reader instantly sees whether prefix caching is +# paying off in the observed run. _INNER_SPAN_KEYS = ( "openarmature.node.name", "openarmature.user.tenantId", @@ -528,6 +657,7 @@ def _build_langfuse_observer(client: InMemoryLangfuseClient) -> LangfuseObserver "gen_ai.system", "gen_ai.usage.input_tokens", "gen_ai.usage.output_tokens", + "openarmature.llm.cache_read.input_tokens", ) @@ -536,11 +666,11 @@ def _format_otel_spans(spans: list[ReadableSpan]) -> str: The ``openarmature.invocation`` root span closes on observer ``shutdown()`` and surfaces only its invocation-level - attributes (spec 5.1 — entry_node, spec_version, implementation - name + version). Inner-node spans surface the cross-cutting - caller metadata + GenAI semconv attributes; printing them on - the invocation line too would just repeat data shown three - more times below. + attributes (entry_node, spec_version, implementation name + + version). Inner-node spans surface the cross-cutting caller + metadata + ``gen_ai.*`` attributes; printing them on the + invocation line too would just repeat data shown three more + times below. """ if not spans: return " (no spans captured)" @@ -564,8 +694,8 @@ def _format_langfuse_trace(trace: LangfuseTrace) -> str: Mirrors what the Langfuse production UI renders for the same invocation: trace.input / trace.output (sourced via the caller - hooks), top-level metadata (caller-supplied + spec keys), and - the Observation tree underneath. + hooks), top-level metadata (caller-supplied + framework-reserved + keys), and the Observation tree underneath. """ lines: list[str] = [] lines.append(f"Trace id={trace.id}") @@ -615,7 +745,7 @@ async def main() -> None: # observability-stack observers on top. graph = build_graph() # Keep the OTel observer reachable so we can ``shutdown()`` it - # after drain — the root ``openarmature.invocation`` span only + # after drain; the root ``openarmature.invocation`` span only # closes on shutdown, and the in-memory exporter only surfaces # closed spans through ``get_finished_spans()``. Production # deployments do the same dance at process exit. From f539f3771a3d968515f426110df2e6981e1454f8 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 9 Jun 2026 19:18:00 -0700 Subject: [PATCH 5/7] Drop spec and proposal references from examples Example comments and docstrings quoted proposal numbers and spec section refs that have no meaning to end users reading the code. Reword them to describe only the implementation behavior. --- examples/chat-with-multimodal/main.py | 4 ++-- examples/tool-use/main.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/chat-with-multimodal/main.py b/examples/chat-with-multimodal/main.py index e49d18a..1dbdaae 100644 --- a/examples/chat-with-multimodal/main.py +++ b/examples/chat-with-multimodal/main.py @@ -8,8 +8,8 @@ chat-history shape. Turns 1, 2, 4 are text-only. **Demonstrates:** ChatPrompt + ContentSegment (system + user) + -PlaceholderSegment for chat-history injection (proposal 0046, -spec v0.38.0). PromptManager.render with the `placeholders` kwarg. +PlaceholderSegment for chat-history injection. PromptManager.render +with the `placeholders` kwarg. Multi-turn message threading through state with the `append` reducer; the conversation history grows over turns and feeds back into render() on each turn. The same chat template carries an diff --git a/examples/tool-use/main.py b/examples/tool-use/main.py index 01ed273..72f26ea 100644 --- a/examples/tool-use/main.py +++ b/examples/tool-use/main.py @@ -29,9 +29,9 @@ schema (or ``None`` only under ``finish_reason="error"``). - The dispatcher node parses each ``ToolCall``, runs the matching local Python function, and appends one - ``ToolMessage(content=..., tool_call_id=...)`` per call. Spec - requires the ``tool_call_id`` round-trip exactly so the model can - pair its requests with the responses. + ``ToolMessage(content=..., tool_call_id=...)`` per call. The + ``tool_call_id`` must round-trip exactly so the model can pair + its requests with the responses. - The loop is just a conditional edge on the graph: ``call_llm`` → ``dispatch_tools`` → back to ``call_llm`` when the model wants more tools, or → ``present`` when it's done. No special "agent From 77528dd41ae29a5463e297567bed053962b29510 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 9 Jun 2026 19:37:02 -0700 Subject: [PATCH 6/7] Add tests for production-observability observers The examples smoke test only proves the demo loads and its build_graph() compiles. Cover the two queryable observers the production-observability example ships: cache-token accumulation and the derived cache-hit ratio, failure-category counting, mutual exclusion between the success and failure events, the per-invocation bucket cleanup, and the OTel cache-read attribute. The persist-output check drives the real persist node offline. --- ...t_production_observability_accumulators.py | 256 ++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100644 tests/test_production_observability_accumulators.py diff --git a/tests/test_production_observability_accumulators.py b/tests/test_production_observability_accumulators.py new file mode 100644 index 0000000..f7035f3 --- /dev/null +++ b/tests/test_production_observability_accumulators.py @@ -0,0 +1,256 @@ +"""Behavioral regression tests for the production-observability example's +queryable observers. + +``test_examples_smoke.py`` only proves the example loads and its +``build_graph()`` compiles. This file goes one level deeper into the two +queryable-observer classes the example ships, locking the logic a +happy-path live run never reaches: + +- ``LlmUsageAccumulator`` accumulating ``usage.cached_tokens`` and the + cache-hit ratio the persist node derives from it (a real OpenAI run + reports zero cached tokens, so the ratio is always 0.0% there). +- ``LlmFailureTracker`` counting failures by category (a successful run + produces an empty bucket and prints "none"). +- Mutual exclusion between the success and failure events: each observer + ignores the other's event type, so the two never double-count. +- The per-invocation bucket cleanup on ``InvocationCompletedEvent``. +- The OTel formatter surfacing the cache-read span attribute. + +The classes live in the example module, so the example is loaded with +``runpy.run_path`` (matching the smoke test). ``runpy`` returns a copy of +the executed namespace; the module's own functions hold the live dict via +``__globals__``, and ``build_graph()`` / ``persist()`` read and write +module-level singletons there, so the fixture hands back that live +namespace rather than the returned copy. +""" + +from __future__ import annotations + +import runpy +from pathlib import Path +from types import SimpleNamespace +from typing import Any + +import pytest + +# The example imports opentelemetry-sdk and langfuse record types at module +# top; skip cleanly when the extras aren't installed. +pytest.importorskip("opentelemetry.sdk.trace") +pytest.importorskip("langfuse") + +from openarmature.graph import ( # noqa: E402 + InvocationCompletedEvent, + LlmCompletionEvent, + LlmFailedEvent, +) +from openarmature.llm import Usage # noqa: E402 +from openarmature.observability.correlation import ( # noqa: E402 + _reset_invocation_id, + _set_invocation_id, +) + +EXAMPLES_DIR = Path(__file__).parent.parent / "examples" + + +@pytest.fixture +def example_ns() -> dict[str, Any]: + """Load the production-observability example and return its live + module namespace (the dict the module's functions actually read and + write, not ``runpy``'s returned copy).""" + main_py = EXAMPLES_DIR / "production-observability" / "main.py" + # A fresh run_path each call keeps tests isolated: build_graph() + # mutates module-level singletons, and a fresh namespace per test + # means those mutations don't leak. Reach the live namespace through + # a function's __globals__ rather than runpy's returned copy. + returned = runpy.run_path(str(main_py), run_name="__not_main__") + return returned["build_graph"].__globals__ + + +def _usage( + *, + prompt: int | None = None, + completion: int | None = None, + total: int | None = None, + cached: int | None = None, +) -> Usage: + return Usage( + prompt_tokens=prompt, + completion_tokens=completion, + total_tokens=total, + cached_tokens=cached, + ) + + +def _completion(invocation_id: str, usage: Usage) -> LlmCompletionEvent: + return LlmCompletionEvent( + invocation_id=invocation_id, + correlation_id="corr", + node_name="respond", + namespace=("respond",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + provider="openai", + model="gpt-4o-mini", + response_id="resp", + response_model="gpt-4o-mini-2024-07-18", + usage=usage, + latency_ms=12.3, + finish_reason="stop", + input_messages=[], + output_content="ok", + request_params={}, + request_extras={}, + active_prompt=None, + active_prompt_group=None, + call_id="call", + ) + + +def _failure(invocation_id: str, category: str) -> LlmFailedEvent: + return LlmFailedEvent( + invocation_id=invocation_id, + correlation_id="corr", + node_name="respond", + namespace=("respond",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + provider="openai", + model="gpt-4o-mini", + latency_ms=5.0, + input_messages=[], + request_params={}, + request_extras={}, + active_prompt=None, + active_prompt_group=None, + call_id="call", + error_category=category, + error_message="boom", + ) + + +def _completed(invocation_id: str) -> InvocationCompletedEvent: + return InvocationCompletedEvent( + final_state=None, + status="completed", + final_node="persist", + invocation_id=invocation_id, + correlation_id="corr", + ) + + +# --- LlmUsageAccumulator ------------------------------------------------- + + +async def test_usage_accumulator_accumulates_cache_tokens(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_completion("inv", _usage(prompt=100, completion=40, total=140, cached=30))) + await acc(_completion("inv", _usage(prompt=50, completion=20, total=60, cached=10))) + bucket = acc.get_bucket("inv") + assert ( + bucket.prompt_tokens, + bucket.completion_tokens, + bucket.total_tokens, + bucket.cached_tokens, + bucket.call_count, + ) == (150, 60, 200, 40, 2) + + +async def test_usage_accumulator_tolerates_null_cache(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_completion("inv", _usage(prompt=10, completion=5, total=15, cached=None))) + assert acc.get_bucket("inv").cached_tokens == 0 + + +async def test_usage_accumulator_ignores_failure_event(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_failure("inv", "provider_rate_limit")) + assert acc.get_bucket("inv") is None + + +async def test_usage_accumulator_drops_bucket_on_completion(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_completion("inv", _usage(prompt=1, completion=1, total=2, cached=0))) + await acc(_completed("inv")) + assert acc.get_bucket("inv") is None + + +# --- LlmFailureTracker --------------------------------------------------- + + +async def test_failure_tracker_counts_by_category(example_ns: dict[str, Any]) -> None: + tracker = example_ns["LlmFailureTracker"]() + await tracker(_failure("inv", "provider_rate_limit")) + await tracker(_failure("inv", "provider_unavailable")) + await tracker(_failure("inv", "provider_rate_limit")) + assert dict(tracker.get_bucket("inv").by_category) == { + "provider_rate_limit": 2, + "provider_unavailable": 1, + } + + +async def test_failure_tracker_ignores_completion_event(example_ns: dict[str, Any]) -> None: + tracker = example_ns["LlmFailureTracker"]() + await tracker(_completion("inv", _usage(prompt=1, completion=1, total=2, cached=0))) + assert tracker.get_bucket("inv") is None + + +async def test_failure_tracker_drops_bucket_on_completion(example_ns: dict[str, Any]) -> None: + tracker = example_ns["LlmFailureTracker"]() + await tracker(_failure("inv", "provider_rate_limit")) + await tracker(_completed("inv")) + assert tracker.get_bucket("inv") is None + + +# --- persist node output ------------------------------------------------- + + +async def test_persist_reports_cache_ratio_and_failure_breakdown( + example_ns: dict[str, Any], capsys: pytest.CaptureFixture[str] +) -> None: + # build_graph() compiles the graph and installs the two accumulators + # as module-level singletons that persist() reads. drain_events_for + # returns an empty summary for an invocation with no active worker, + # so persist() runs to completion offline without a live invoke(). + example_ns["build_graph"]() + acc = example_ns["_accumulator"] + tracker = example_ns["_failure_tracker"] + inv = "inv-persist" + await acc(_completion(inv, _usage(prompt=100, completion=40, total=140, cached=30))) + await tracker(_failure(inv, "provider_rate_limit")) + await tracker(_failure(inv, "provider_unavailable")) + await tracker(_failure(inv, "provider_rate_limit")) + + state = example_ns["BriefingState"](question="q") + token = _set_invocation_id(inv) + try: + await example_ns["persist"](state) + finally: + _reset_invocation_id(token) + + out = capsys.readouterr().out + assert ( + "[persist] LLM usage: prompt=100 (cached=30, 30.0% hit), " + "completion=40, total=140 across 1 call(s)" in out + ) + assert "[persist] LLM failures: provider_rate_limit=2, provider_unavailable=1" in out + + +# --- OTel span formatter ------------------------------------------------- + + +def test_otel_formatter_surfaces_cache_read_attribute(example_ns: dict[str, Any]) -> None: + # Stand-in for a ReadableSpan: the formatter reads name / attributes / + # start_time / end_time only. + span = SimpleNamespace( + name="openarmature.llm.complete", + attributes={ + "gen_ai.system": "openai", + "openarmature.llm.cache_read.input_tokens": 12, + }, + start_time=0, + end_time=1_000_000, + ) + rendered = example_ns["_format_otel_spans"]([span]) + assert "openarmature.llm.cache_read.input_tokens=12" in rendered From 20163de73cd103433bd8475cc52e4753f63665f4 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 9 Jun 2026 19:45:21 -0700 Subject: [PATCH 7/7] Guard legacy LLM observer snippet with NodeEvent check The legacy sentinel-namespace observer example accessed event.namespace / event.pre_state without narrowing to NodeEvent. A real observer receives the full ObserverEvent union, where variants like InvocationCompletedEvent have no namespace, so the snippet would raise AttributeError. Add an isinstance(event, NodeEvent) guard so the copy-paste example is correct. --- docs/concepts/observability.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index ba50fe9..1974993 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -890,9 +890,12 @@ with custom providers still using the sentinel pattern, the legacy shape is: ```python +from openarmature.graph.events import NodeEvent from openarmature.observability import LLM_NAMESPACE, LlmEventPayload async def legacy_llm_observer(event): + if not isinstance(event, NodeEvent): + return if event.namespace != LLM_NAMESPACE: return payload = event.pre_state