diff --git a/CHANGELOG.md b/CHANGELOG.md index 613754c..e8c1d89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,22 +8,20 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ### Added -- **Implicit prefix-cache wire-byte stability** (proposal 0047, spec v0.39.0). The OpenAI Chat Completions wire body is now byte-stable across equivalent OA inputs — equivalent calls produce byte-identical request bodies regardless of dict insertion order at every user-supplied-dict boundary (tool definitions including the top-level `function` record + the `parameters` JSON Schema, `response_format.json_schema.schema`, `RuntimeConfig` extras, `tool_call.arguments` JSON encoding). A new `_canonicalize_dict_keys` helper recursively sorts dict keys at every nesting level while preserving caller-supplied array ordering (the spec's split between "object keys MUST be sorted" and "array order MUST be preserved per caller-supplied order"). A top-level belt-and-suspenders canonicalization pass over the assembled body catches anything the per-field passes miss. Combined with the existing `Response.usage.cached_tokens` / `cache_creation_tokens` fields sourced from `prompt_tokens_details` (v0.12.0) and the OTel observer's `openarmature.llm.cache_read.input_tokens` + `openarmature.llm.cache_creation.input_tokens` attributes (also v0.12.0), this closes proposal 0047 end-to-end. Prompt-management §13 *Cross-variable substring stability* is satisfied by the existing Jinja2 `StrictUndefined` render path; pinned by a new test. Scope is the Chat Completions endpoint only — the OpenAI Responses API endpoint and the Anthropic / Gemini wire-format mappings are deferred (the providers aren't implemented in python today). +- **Implicit prefix-cache wire-byte stability** (proposal 0047, spec v0.39.0). Closes proposal 0047 end-to-end across three pieces all landing in v0.13.0: (1) `Response.usage.cached_tokens` / `cache_creation_tokens` fields sourced from the OpenAI `prompt_tokens_details` payload (PR #136); (2) the OTel observer emits `openarmature.llm.cache_read.input_tokens` and optional `openarmature.llm.cache_creation.input_tokens` when the corresponding usage field is populated (PR #140); (3) the OpenAI Chat Completions wire body is now byte-stable across equivalent OA inputs — equivalent calls produce byte-identical request bodies regardless of dict insertion order at every user-supplied-dict boundary (tool definitions including the top-level `function` record + the `parameters` JSON Schema, `response_format.json_schema.schema`, `RuntimeConfig` extras, `tool_call.arguments` JSON encoding) via a new `_canonicalize_dict_keys` helper that recursively sorts dict keys at every nesting level while preserving caller-supplied array ordering, plus a top-level belt-and-suspenders canonicalization pass over the assembled body (PR #145). Prompt-management §13 *Cross-variable substring stability* is satisfied by the existing Jinja2 `StrictUndefined` render path; pinned by a new test. Scope is the Chat Completions endpoint only — the OpenAI Responses API endpoint and the Anthropic / Gemini wire-format mappings are deferred (the providers aren't implemented in python today). - **`LlmFailedEvent` typed event variant** (proposal 0058, spec v0.53.0). Carves LLM provider failures into a spec-normatively-typed event variant alongside `LlmCompletionEvent`. 17 mirrored identity / scoping / request-side fields + 3 failure-specific fields (`error_category` always-present from the llm-provider §7 normative category enumeration; optional `error_type` for vendor-specific detail or upstream exception class name; always-present `error_message`). `OpenAIProvider.complete()` emits the typed event alongside the §7 exception on both raise paths — adapter-caught provider exceptions AND pre-send validation raises. Caller-side exception flow unchanged; the exception still raises out of `complete()`. Mutually exclusive with `LlmCompletionEvent` on the same call. Both bundled observers (OTel + Langfuse) consume `LlmFailedEvent` directly: same `openarmature.llm.complete` span / Generation shape as the success path with ERROR status / level + `openarmature.error.category` attribute (OTel) / `error_category` as statusMessage (Langfuse), `start_time` back-dated by `latency_ms` so the failure duration reflects the time-to-raise. +- **`LlmCompletionEvent` extended with proposal 0057 request-side fields** (spec v0.51.0). The typed event now carries `input_messages`, `output_content`, `request_params`, `request_extras`, `active_prompt`, `active_prompt_group`, `call_id`, and `response_model` alongside the existing v0.49.0 fields. `request_id` renamed to `response_id` per the proposal's response-side naming. Inline image bytes in `input_messages` stay redacted per observability §5.5.5 — the OpenAI provider reuses the existing message-serialization helper for the projection. Observer-side privacy gates (OTel `disable_llm_payload`, Langfuse equivalents) apply at rendering, symmetric with the §5.5.1 span attribute path. ### Changed - **Sentinel-namespace `NodeEvent` emission for LLM events retired entirely from `OpenAIProvider`** (proposal 0058 cleanup). The provider no longer dispatches the `("openarmature.llm.complete",)`-namespaced `NodeEvent`s on either outcome path; both success and failure flow through their respective typed variants exclusively. The `_make_llm_event` helper is removed. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` for success and `isinstance(event, LlmFailedEvent)` for failure to keep receiving LLM-call notifications. `LlmEventPayload` and `LLM_NAMESPACE` remain in `openarmature.observability.llm_event` as a documented compatibility surface for custom providers that haven't migrated; neither is referenced by the bundled provider or observers anymore. -- **Pinned spec advances from v0.51.0 to v0.53.0** (absorbs proposals 0023 + 0058). Proposal 0023 (canonical state reducers) ships in spec v0.52.0 but is not implemented this cycle — `conformance.toml` marks 0023 as `not-yet`; fixtures 034–038 stay parser-deferred. +- **Pinned spec advances from v0.46.0 to v0.53.0** across the v0.13.0 cycle. Absorbs four implemented proposals (0047 — implicit prefix-cache wire-byte stability; 0049 — typed `LlmCompletionEvent`; 0057 — `LlmCompletionEvent` request-side field-set extension; 0058 — typed `LlmFailedEvent`) plus 0023 (canonical state reducers, v0.52.0) carried as `not-yet` in the manifest. Pin journey: v0.46.0 → v0.51.0 (PR #141 absorbs 0057) → v0.53.0 (PR #144 absorbs 0058; spec v0.52.0's 0023 entry rides along as `not-yet`). Fixtures 034–038 (0023) stay parser-deferred. +- **`tool_call.arguments` JSON encoding now uses `sort_keys=True`** (proposal 0047 §8 byte-stability requirement for caller-supplied dicts JSON-encoded into a string field). Functionally equivalent — the encoded string parses to the same dict — but byte-different from the previous insertion-order encoding. Downstream consumers that snapshot wire bodies (golden-file tests, audit logging, recorded fixtures) will see byte-different `tool_calls[].function.arguments` strings across this upgrade for any call whose argument dict was emitted in non-sorted insertion order before. - **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. The §5.5 attribute set and §8.4 Generation metadata are unchanged. (Failure paths land on `LlmFailedEvent` later in the same cycle — see the proposal 0058 entry above.) - **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. (The failure-path sentinel emission is retired entirely later in the same cycle — see the proposal 0058 entry above.) - **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter handles v4 Langfuse SDK quirks transparently: `Langfuse.start_observation()` does NOT accept `start_time`, so back-dated generations are routed through the private `_otel_tracer.start_span(name=..., start_time=int_ns)` API (mirroring the SDK's own `create_event` precedent) and the resulting OTel span is wrapped in `LangfuseGeneration` directly; the non-back-dated path still uses `start_observation`. `LangfuseSpan.end()` is typed `Optional[int]` (nanoseconds), so the adapter converts the Protocol's `datetime` surface to int nanoseconds before forwarding. The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions. - **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips. -### Added - -- **`LlmCompletionEvent` extended with proposal 0057 request-side fields** (spec v0.51.0). The typed event now carries `input_messages`, `output_content`, `request_params`, `request_extras`, `active_prompt`, `active_prompt_group`, `call_id`, and `response_model` alongside the existing v0.49.0 fields. `request_id` renamed to `response_id` per the proposal's response-side naming. Inline image bytes in `input_messages` stay redacted per observability §5.5.5 — the OpenAI provider reuses the existing message-serialization helper for the projection. Observer-side privacy gates (OTel `disable_llm_payload`, Langfuse equivalents) apply at rendering, symmetric with the §5.5.1 span attribute path. - ## [0.12.0] — 2026-06-05 Observability release. The pinned spec advances from v0.38.0 to v0.46.0, absorbing eight accepted proposals (0047-0054). Three ship as fully implemented this cycle: proposal 0048 grows a read-symmetric `get_invocation_metadata()` API + a §9 *Queryable observer pattern* concept doc section; proposal 0052 puts `openarmature.implementation.name` + `.version` attribution attributes on every OTel invocation span + every Langfuse Trace; proposal 0054 ships `CompiledGraph.drain_events_for(invocation_id, *, timeout)` as the architectural pair to 0048's §9.4 accumulator lifecycle. Two ship as textual-only acks (0051 Langfuse trace I/O caveat; 0053 §3.4 shared-parent boundary clarification). One Fixed: the retry middleware now resets the invocation-metadata ContextVar between attempts per §3.4. The production-observability example grows the queryable accumulator + drain_events_for pattern end-to-end so the new APIs have a runnable demo. diff --git a/conformance.toml b/conformance.toml index 4d31243..587505c 100644 --- a/conformance.toml +++ b/conformance.toml @@ -266,33 +266,38 @@ status = "implemented" since = "0.11.0" # Spec v0.39.0 (proposal 0047). Implicit prefix-cache wire-byte -# stability. Cross-capability proposal landed in v0.13.0 across -# three pieces: (1) ``Response.usage`` cache-stat fields -# (``cached_tokens`` / ``cache_creation_tokens``) sourced from the -# OpenAI ``prompt_tokens_details`` payload, with conditional emission +# stability. Cross-capability proposal landed end-to-end in the +# v0.13.0 cycle across three pieces, all post-v0.12.0: +# (1) ``Response.usage`` cache-stat fields (``cached_tokens`` / +# ``cache_creation_tokens``) sourced from the OpenAI +# ``prompt_tokens_details`` payload, with conditional emission # preserved (absent-vs-zero distinction stays observable) — landed -# in the v0.12.0 cycle as the proposal's payload-side prerequisite; +# in PR #136 as the proposal's payload-side prerequisite; # (2) OTel observer emits ``openarmature.llm.cache_read.input_tokens`` # (and optional ``openarmature.llm.cache_creation.input_tokens``) -# when the corresponding usage field is populated — also v0.12.0; -# (3) §8.1 intra-impl wire-byte canonicalization in the OpenAI -# adapter — landed here. The canonicalizer recursively sorts dict -# keys at every nesting level while preserving caller-supplied -# array order, applied at the four user-input boundaries +# when the corresponding usage field is populated — landed in +# PR #140; (3) §8.1 intra-impl wire-byte canonicalization in the +# OpenAI adapter — landed in PR #145. The canonicalizer recursively +# sorts dict keys at every nesting level while preserving caller- +# supplied array order, applied at the four user-input boundaries # (``tool.parameters`` / ``tool.function`` record top-level per # spec Q5, ``response_format.json_schema.schema``, ``RuntimeConfig`` # extras, ``tool_call.arguments`` JSON encoding) plus a top-level -# belt-and-suspenders pass over the assembled request body. Scope -# is the Chat Completions endpoint only; the OpenAI Responses API -# endpoint is deferred to a future cycle (no python consumer -# today). Prompt-management §13 cross-variable substring stability -# is satisfied by the existing Jinja2 ``StrictUndefined`` render -# path; pinned by ``tests/unit/test_prompts.py:: +# belt-and-suspenders pass over the assembled request body. +# Downstream-observable wire-byte shift on +# ``tool_call.arguments``: the encoded string now uses +# ``sort_keys=True`` (functionally equivalent — parses to the same +# dict — but byte-different for golden-file / audit-snapshot +# consumers). Scope is the Chat Completions endpoint only; the +# OpenAI Responses API endpoint is deferred to a future cycle (no +# python consumer today). Prompt-management §13 cross-variable +# substring stability is satisfied by the existing Jinja2 +# ``StrictUndefined`` render path; pinned by +# ``tests/unit/test_prompts.py:: # test_cross_variable_substring_stability_text_prompt`` and # ``test_cross_variable_substring_stability_chat_prompt``. -# Anthropic / Gemini -# wire-byte conformance fixtures stay deferred — neither provider -# is implemented in python today. +# Anthropic / Gemini wire-byte conformance fixtures stay deferred +# — neither provider is implemented in python today. [proposals."0047"] status = "implemented" since = "0.13.0" @@ -344,16 +349,24 @@ status = "implemented" since = "0.12.0" # Spec v0.41.0 (proposal 0049). Typed LLM Completion Event — first -# typed event variant on the observer event union. Shipped in -# v0.13.0: provider dual-emits the typed event alongside the sentinel -# NodeEvent pair (success-only per spec scope); LlmCompletionEvent -# carries identity/scoping/outcome fields per the spec field table. -# Conformance fixtures 050-056 activated by the typed_event_collector -# harness directive. The OTel + Langfuse observers continue to drive -# their §5.5 / §8.4.4 surface off the sentinel NodeEvent pair during -# the dual-emit transition window; type-discrimination migration -# lands once the follow-on request-side-fields extension (proposal -# 0057) ships. +# typed event variant on the observer event union. Shipped fully in +# v0.13.0 across PRs #141 (typed-event definition + provider +# emission + 0057 field-set extension), #142 (OTel observer migration +# to type discrimination), #143 (Langfuse observer migration + +# success-side sentinel emission dropped), and #144 (0058 typed +# LlmFailedEvent + sentinel-namespace NodeEvent emission for LLM +# events retired entirely from the bundled OpenAIProvider). +# LlmCompletionEvent carries identity/scoping/outcome fields per +# the spec field table. Both bundled observers (OTel + Langfuse) +# consume the typed events via isinstance discrimination on both +# outcome paths. Conformance fixtures 050-056 activated by the +# typed_event_collector harness directive. Fixtures 057-068 +# (proposal 0057 request-side fields) and 069-073 (proposal 0058 +# typed failure event) stay parser-deferred pending the harness's +# typed_event_collector directive schema catch-up + the event_counts +# list directive introduced by fixture 071; behavior pinned by +# unit tests in tests/unit/test_llm_provider.py + +# test_observability_otel.py + test_observability_langfuse.py. [proposals."0049"] status = "implemented" since = "0.13.0" diff --git a/docs/agent/non-obvious-shapes.md b/docs/agent/non-obvious-shapes.md index f582c5c..b2bd6ec 100644 --- a/docs/agent/non-obvious-shapes.md +++ b/docs/agent/non-obvious-shapes.md @@ -127,7 +127,7 @@ Catching `Exception` works but is too broad; catching one hierarchy misses the o ### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes -OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: +OA emits observer events under sentinel node-names for some internal dispatch: `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014) and `openarmature.checkpoint.save` for checkpoint saves (proposal 0010) ride on `NodeEvent` with a sentinel namespace. (LLM provider calls used to follow the same pattern but moved to typed `LlmCompletionEvent` / `LlmFailedEvent` variants in v0.13.0 per proposals 0049 + 0058 — those are filtered by `isinstance` instead.) The sentinel-namespace events let the OTel / Langfuse observers emit checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: ```python async def __call__(self, event: NodeEvent) -> None: @@ -137,7 +137,7 @@ async def __call__(self, event: NodeEvent) -> None: # … user-node handling ``` -`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. +`event.namespace[0]` is the safest discriminator. Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. ### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field` diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 3b67740..1974993 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -767,7 +767,7 @@ informative for trace readers. Redaction is **not** gated by `disable_llm_payload` and is **not** configurable. Inline image bytes never leave the provider in event form, so custom observers consuming -[`LlmEventPayload`](#publishing-llm-events-for-custom-observers) +[`LlmCompletionEvent` / `LlmFailedEvent`](#consuming-llm-events-in-custom-observers) cannot accidentally leak raw bytes regardless of how they're written. @@ -834,33 +834,77 @@ OTel SDK silently drops `set_attribute` on ended spans. Exceptions raised by an enricher are caught and warned, never propagated. -### Publishing LLM events for custom observers +### Consuming LLM events in custom observers + +`openarmature.graph.events.LlmCompletionEvent` and +`openarmature.graph.events.LlmFailedEvent` are the two typed event +variants any `Provider` implementation emits around a `complete()` +call. Custom observers consume them via type discrimination: + +```python +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent + +async def my_llm_observer(event): + if isinstance(event, LlmCompletionEvent): + # Successful call. Read identity / scoping / outcome + # directly off the typed fields: + # event.model, event.input_messages (already image-redacted), + # event.output_content, event.request_params, event.response_id, + # event.active_prompt, event.usage, event.latency_ms, … + return + if isinstance(event, LlmFailedEvent): + # §7 category exception was raised. Same identity / scoping + # surface as the completion variant, plus three failure- + # specific fields: + # event.error_category — one of the 9 normative §7 categories + # event.error_type — vendor code or upstream class name + # event.error_message — human-readable, may be empty + return +``` + +The two variants are **mutually exclusive on a single `complete()` +call** — implementations MUST NOT emit both for the same call. +Conformance fixture 072 locks this down. The failure variant carries +the same identity + request-side fields as the completion variant, +minus the response-side fields (`response_id`, `response_model`, +`usage`, `output_content`, `finish_reason`) — there was no response +to record. + +A custom `Provider` that wants observers to see the same events +dispatches `LlmCompletionEvent(...)` on success and +`LlmFailedEvent(...)` alongside the §7 category exception on failure +via `current_dispatch()`. See +[Authoring providers](../model-providers/authoring.md) for the full +pattern. + +#### Legacy sentinel-namespace pattern (compatibility surface) `openarmature.observability.LLM_NAMESPACE` and -`openarmature.observability.LlmEventPayload` are part of the public -API. A custom observer subscribing to the dispatch stream can -recognize the LLM-event sentinel namespace and read the typed -payload directly: +`openarmature.observability.LlmEventPayload` remain in the public +API as a documented compatibility surface for custom providers and +observers that haven't migrated to typed events. The bundled +`OpenAIProvider` no longer emits the sentinel `NodeEvent` pair; the +bundled OTel and Langfuse observers no longer recognize it. If +you're writing a downstream observer that needs to interoperate +with custom providers still using the sentinel pattern, the legacy +shape is: ```python +from openarmature.graph.events import NodeEvent from openarmature.observability import LLM_NAMESPACE, LlmEventPayload -async def my_llm_observer(event): +async def legacy_llm_observer(event): + if not isinstance(event, NodeEvent): + return if event.namespace != LLM_NAMESPACE: return payload = event.pre_state if not isinstance(payload, LlmEventPayload): return - # payload.model, payload.input_messages (already image-redacted), - # payload.output_content, payload.request_params, - # payload.response_id, payload.active_prompt, ... + # payload.model, payload.input_messages, … ``` -A custom `Provider` that wants to participate in the same span -emission protocol dispatches `NodeEvent(namespace=LLM_NAMESPACE, -pre_state=LlmEventPayload(...))` via `current_dispatch()`. See -[Authoring providers](../model-providers/authoring.md) for the -full pattern. +New code should prefer the typed-event path above. ### Flushing under fast teardown diff --git a/docs/examples/production-observability.md b/docs/examples/production-observability.md index d4bf22d..26cb97f 100644 --- a/docs/examples/production-observability.md +++ b/docs/examples/production-observability.md @@ -9,17 +9,18 @@ graph, caller hooks deriving domain-shaped `trace.input` / `trace.output` from State, the built-in `TimingMiddleware` recording per-node duration, multi-tenant caller-supplied metadata propagating to both observers in one `invoke()` call, AND -a third queryable-accumulator observer that a terminal `persist` -node reads at request scope after synchronizing on the deliver -loop with `drain_events_for`. +two queryable-accumulator observers (one for successful-call token +usage incl. cache hits, one for failure-category attribution) that +a terminal `persist` node reads at request scope after +synchronizing on the deliver loop with `drain_events_for`. ## Overview -Two nodes (`respond` then `persist`), one LLM call, three observers +Two nodes (`respond` then `persist`), one LLM call, four observers attached before invoke. The pipeline takes a question, calls the LLM, returns the answer, then synchronizes on the observer queue -and rolls up token cost. The interesting part is the observability -wiring: +and rolls up token cost + failure attribution. The interesting +part is the observability wiring: - `OTelObserver` attached with an `InMemorySpanExporter` (production swaps this for `BatchSpanProcessor` + @@ -84,7 +85,7 @@ sees the same logical events represented two ways. observer call surface doesn't change. - **Queryable accumulator + `drain_events_for`** ([queryable observer pattern](../concepts/observability.md)). - A third observer — `LlmUsageAccumulator` — subscribes to the + A third observer (`LlmUsageAccumulator`) subscribes to the same event stream but only records the typed `LlmCompletionEvent` variant (one event per successful LLM call; outcome fields read directly off the event). It accumulates @@ -102,28 +103,44 @@ sees the same logical events represented two ways. count through State (a workaround that pollutes the state schema with non-pipeline data). - The filter shape is `isinstance(event, LlmCompletionEvent)` — + The filter shape is `isinstance(event, LlmCompletionEvent)`, one isinstance check against the typed event variant on the - observer event union. The provider also dual-emits a sentinel - `NodeEvent` pair during the transition period for backwards - compatibility with older accumulators; this example's - accumulator ignores the sentinel pair because the typed event - carries the same outcome data without the pair-join logic. New - accumulators should follow the isinstance-based filter shape - here; the CHANGELOG tracks when the sentinel emission is - removed. + observer event union. New accumulators should follow this + shape; the typed event carries every outcome field directly, + no namespace-string-match + payload-narrow dance against the + older sentinel-event family. - `LlmCompletionEvent` is success-only by spec design. Failed LLM - calls flow through the exception path and do not emit the typed - event, so `bucket.call_count` reflects successful calls only. - This is the right semantic for a usage accumulator (failed - calls produce no tokens). A pipeline tracking attempt-level - failure rates needs a separate listener — either a custom - observer on the sentinel `NodeEvent` pair or a future - failure-event typed variant if and when that proposal lands. - Production code migrating an existing accumulator from the - sentinel pattern should expect this counting shift if it was - previously counting failure-path events. + `LlmCompletionEvent` is a success-only event. Failed LLM + calls flow through the exception path and emit the parallel + `LlmFailedEvent` variant, so `bucket.call_count` reflects + successful calls only. This is the right semantic for a usage + accumulator (failed calls produce no tokens); the + `LlmFailureTracker` (below) is the listener that owns + attempt-level failure rates. + + The bucket also tracks `usage.cached_tokens` so the persist + node can print a cache-hit ratio. Backends with prefix caching + (vLLM `--enable-prefix-caching`, Anthropic prompt caching, + OpenAI's `prompt_token_usage` cache report when enabled) + populate the field; backends without cache visibility leave it + `None` and the ratio degrades to 0% gracefully. The cache-stat + fields surface both on the typed event's `Usage` and on the OTel + LLM span's `openarmature.llm.cache_read.input_tokens` attribute. +- **Failure-category tracker** + ([typed failure event](../concepts/observability.md)). + A fourth observer (`LlmFailureTracker`) subscribes to + `LlmFailedEvent`, the typed failure-side counterpart to + `LlmCompletionEvent`. The provider emits exactly one of + (`LlmCompletionEvent`, `LlmFailedEvent`) per LLM call, never + both. The tracker maintains a per-invocation + `{error_category: count}` bucket keyed by the nine canonical + category strings (`provider_rate_limit`, `provider_unavailable`, + `provider_invalid_model`, etc.), and the persist node prints + the per-category breakdown alongside the usage rollup. This + closes the symmetric-attribution gap: operators see "this tenant + had 4 successful calls (cost X) plus 1 rate-limit failure" at + request scope, without having to cross-join the OTel exception + spans or wait for a metrics backend to roll up. ## How to run @@ -152,12 +169,13 @@ request id: feature flag:v2-canary [timing] respond: 1234.5ms (success) -[persist] LLM usage: prompt=42, completion=38, total=80 across 1 call(s) +[persist] LLM usage: prompt=42 (cached=0, 0.0% hit), completion=38, total=80 across 1 call(s) +[persist] LLM failures: none answer: The primary objective of Apollo 11 was ... model: gpt-4o-mini-2024-07-18 --- captured OTel spans --- - [openarmature.invocation] 1240.0ms openarmature.graph.entry_node='respond', openarmature.graph.spec_version='0.46.0', openarmature.implementation.name='openarmature-python', openarmature.implementation.version='0.12.0' + [openarmature.invocation] 1240.0ms openarmature.graph.entry_node='respond', openarmature.graph.spec_version='0.53.0', openarmature.implementation.name='openarmature-python', openarmature.implementation.version='0.13.0' [respond] 1235.0ms openarmature.node.name='respond', openarmature.user.tenantId='demo-acme', ... [openarmature.llm.complete] 1200.0ms openarmature.user.tenantId='demo-acme', gen_ai.system='openai', gen_ai.usage.input_tokens=42, ... [persist] 2.0ms openarmature.node.name='persist', openarmature.user.tenantId='demo-acme', ... @@ -184,28 +202,40 @@ Trace id= as `outcome="exception"` with `exception_category="provider_rate_limit"`. - **`[persist] LLM usage: ...`**: emitted by the `persist` node after it drains the deliver loop and reads the - `LlmUsageAccumulator`'s bucket for this invocation. If the drain - times out (slow / hung observer), the persist line is prefixed by - a `[persist] drain incomplete: N events still pending after 2.0s` - surface — the production version of that log would also flip an - SLO-breach metric. + `LlmUsageAccumulator`'s bucket for this invocation. The + `cached=N, X.X% hit` segment is the ratio of cache-read input + tokens to total prompt tokens for the invocation, sourced from + `usage.cached_tokens`. OpenAI's `gpt-4o-mini` (the default + model) reports zero cache hits unless `prompt_token_usage` cache + reporting is explicitly enabled; vLLM with + `--enable-prefix-caching` or Anthropic with prompt caching will + show real cache attribution against repeated prefixes. If the + drain times out (slow / hung observer), the persist line is + prefixed by a `[persist] drain incomplete: N events still + pending after 2.0s` surface, and the production version of that + log would also flip an SLO-breach metric. +- **`[persist] LLM failures: ...`**: emitted by the `persist` + node after reading the `LlmFailureTracker`'s bucket. On a + success-only invocation the line reads `none`; on a run with + retried provider errors it reads e.g. `provider_rate_limit=2, + provider_unavailable=1` with categories ordered noisiest-first. + Because every LLM call emits exactly one of `LlmCompletionEvent` + or `LlmFailedEvent` and never both, the bucket counts attempts + the success-side accumulator did NOT see, which is the right + shape for retry-rate dashboards. - **OTel spans block**: one line per captured span, sorted by start time. The relevant attributes shown are a curated subset for readability; the full attribute set is on each `Span` object - for any reader inspecting them programmatically. The - `openarmature.llm.complete` span name + the `gen_ai.usage.*` - attribute family come from the OTel observer's current - sentinel-`NodeEvent` handler — the OTel and Langfuse observers - have not yet migrated to consuming the typed `LlmCompletionEvent` - variant. Span names and attribute paths may shift when the - observer migration lands; the example's emitted span structure - tracks the current observer behavior. Note three attribute - families worth telling apart: + for any reader inspecting them programmatically. On runs against + a cache-reporting backend the LLM span also carries + `openarmature.llm.cache_read.input_tokens` (the OA-namespace + cache attribute). Note three attribute families worth telling + apart: - The root `openarmature.invocation` span carries `openarmature.graph.spec_version` plus the `openarmature.implementation.name` / `.version` attribution - attributes. These are invocation-span-only (per spec §5.1) — - operators filtering by library version use these. + attributes. These are invocation-span-only; operators + filtering by library version use these. - The `openarmature.user.*` attributes appear on every span, reflecting the cross-cutting propagation from `invoke(metadata=...)`. diff --git a/docs/model-providers/authoring.md b/docs/model-providers/authoring.md index 4fc81a4..361b38f 100644 --- a/docs/model-providers/authoring.md +++ b/docs/model-providers/authoring.md @@ -216,71 +216,132 @@ of: raise `StructuredOutputInvalid` on parse or validation failure. Use `validate_response_schema` and `strict_mode_supported` from `openarmature.llm` to share the provider-agnostic boundary checks. -- **Observability spans.** Opt-in `started`/`completed` events - around the wire call so the OTel observer can build LLM spans. - Dispatch via the public `openarmature.observability.LLM_NAMESPACE` - sentinel and the typed `LlmEventPayload`. The sketch below is - what lives around the wire call inside `complete()`; the - `OpenAIProvider`'s `_make_llm_event` helper is the reference - implementation: +- **Observability events.** Opt-in typed-event dispatch around the + wire call so the bundled OTel and Langfuse observers (plus any + custom observer using type discrimination) can build LLM spans + and Generation observations. Dispatch + `openarmature.graph.events.LlmCompletionEvent` on successful + calls and `LlmFailedEvent` alongside any `LlmProviderError` your + adapter raises. The sketch below is the success and failure + emission shape inside `complete()`; the bundled + `OpenAIProvider._build_llm_completion_event` and + `_build_llm_failed_event` are the reference implementations. ```python + import time import uuid - from typing import Any - - from openarmature.graph.events import NodeEvent - from openarmature.observability import LLM_NAMESPACE, LlmEventPayload - from openarmature.observability.correlation import current_dispatch - - - def dispatch_llm_event( - phase: str, - *, - call_id: str, - model: str, - **extra: Any, - ) -> None: - """Emit one half of the started/completed pair. The same - ``call_id`` MUST appear on both events so observers can match - them under concurrency.""" + + from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent + from openarmature.llm.errors import LlmProviderError + from openarmature.observability.correlation import ( + current_attempt_index, + current_branch_name, + current_correlation_id, + current_dispatch, + current_fan_out_index, + current_invocation_id, + current_namespace_prefix, + ) + + + async def complete(self, messages, /, *, tools=None, config=None): dispatch = current_dispatch() - if dispatch is None: - return - dispatch(NodeEvent( - node_name="openarmature.llm.complete", - namespace=LLM_NAMESPACE, - step=-1, - phase=phase, - pre_state=LlmEventPayload( + call_id = str(uuid.uuid4()) # fresh per call (per-attempt under retry) + adapter_start = time.perf_counter() + + # Capture request-side data ONCE so both success and failure + # paths populate the typed event from the same projection. + serialized_messages: list[dict] = [] # image-redacted; see below + request_params = self._project_request_params(config) + request_extras = self._project_request_extras(config) + + try: + serialized_messages = self._serialize_messages(messages) + response = await self._wire_call(messages, tools, config) + except LlmProviderError as exc: + # Alongside the exception — caller-side flow unchanged + # (the exception still raises out of complete()). + if dispatch is not None: + latency_ms = (time.perf_counter() - adapter_start) * 1000.0 + dispatch(LlmFailedEvent( + invocation_id=current_invocation_id() or "", + correlation_id=current_correlation_id(), + node_name=(current_namespace_prefix() or ("",))[-1], + namespace=current_namespace_prefix(), + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + provider="my-provider", + model=self.model, + latency_ms=latency_ms, + input_messages=serialized_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=None, + active_prompt_group=None, + call_id=call_id, + error_category=exc.category, + error_type=type(exc).__name__, + error_message=str(exc), + )) + raise + + if dispatch is not None: + latency_ms = (time.perf_counter() - adapter_start) * 1000.0 + dispatch(LlmCompletionEvent( + invocation_id=current_invocation_id() or "", + correlation_id=current_correlation_id(), + node_name=(current_namespace_prefix() or ("",))[-1], + namespace=current_namespace_prefix(), + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + provider="my-provider", + model=self.model, + response_id=response.response_id, + response_model=response.response_model, + usage=response.usage, + latency_ms=latency_ms, + finish_reason=response.finish_reason, + input_messages=serialized_messages, + output_content=response.message.content or None, + request_params=request_params, + request_extras=request_extras, + active_prompt=None, + active_prompt_group=None, call_id=call_id, - model=model, - genai_system="my-provider", - **extra, - ), - post_state=None, - error=None, - parent_states=(), - )) - - - # Inside Provider.complete(), the call_id is minted once per call: - call_id = str(uuid.uuid4()) - dispatch_llm_event("started", call_id=call_id, model="my-model") - # ... wire call here, populating finish_reason / usage / output ... - dispatch_llm_event( - "completed", - call_id=call_id, - model="my-model", - finish_reason="stop", - ) + )) + return response ``` + Two contracts the bundled provider follows that custom providers + SHOULD match: + + - **Mutual exclusion.** A single `complete()` call emits exactly + one `LlmCompletionEvent` (success) **or** exactly one + `LlmFailedEvent` (failure) — never both. Conformance fixture 072 + locks this down. + - **Exception-flow preservation.** `LlmFailedEvent` is dispatched + **alongside** the §7 exception, not in place of it. The + exception still raises out of `complete()`; caller-side error + handling is unchanged. The typed event is on the observer + queue. + Inline image bytes MUST be redacted in the provider's - serialization step before reaching the payload (see + serialization step before reaching the typed event's + `input_messages` field (see [Observability: Inline image redaction](../concepts/observability.md#inline-image-redaction-always-on)) - so custom observers consuming `LlmEventPayload` cannot leak raw - bytes. + so observers consuming the typed events cannot leak raw bytes. + + **Legacy sentinel-namespace pattern** (`LLM_NAMESPACE` + + `LlmEventPayload` dispatched as a `NodeEvent` pair) remains in + the public API as a documented compatibility surface. The bundled + `OpenAIProvider` retired it in v0.13.0; the bundled OTel and + Langfuse observers no longer recognize it. New providers should + emit typed events directly; the sentinel pattern is preserved for + providers shipped before the v0.13.0 migration that haven't yet + switched. - **Lenient response parsing** under `finish_reason="error"`. Degraded responses surface what they can; tool-call arguments that fail to parse populate `arguments=None` instead of raising. diff --git a/examples/chat-with-multimodal/main.py b/examples/chat-with-multimodal/main.py index e49d18a..1dbdaae 100644 --- a/examples/chat-with-multimodal/main.py +++ b/examples/chat-with-multimodal/main.py @@ -8,8 +8,8 @@ chat-history shape. Turns 1, 2, 4 are text-only. **Demonstrates:** ChatPrompt + ContentSegment (system + user) + -PlaceholderSegment for chat-history injection (proposal 0046, -spec v0.38.0). PromptManager.render with the `placeholders` kwarg. +PlaceholderSegment for chat-history injection. PromptManager.render +with the `placeholders` kwarg. Multi-turn message threading through state with the `append` reducer; the conversation history grows over turns and feeds back into render() on each turn. The same chat template carries an diff --git a/examples/production-observability/main.py b/examples/production-observability/main.py index 2e904c7..8f82efc 100644 --- a/examples/production-observability/main.py +++ b/examples/production-observability/main.py @@ -13,15 +13,15 @@ **Demonstrates (mapped to shipped features):** -- Dual observers on one graph (proposal 0031 + the no-double-export - posture from the README pitch). Both consume the same NodeEvent - stream independently; nothing in node code knows there are two. +- Dual observers on one graph, with no double-export between them. + Both consume the same NodeEvent stream independently; nothing in + node code knows there are two. - ``trace_input_from_state`` / ``trace_output_from_state`` caller - hooks on ``LangfuseObserver`` (proposal 0043 §8.4.1). The hooks - derive domain dicts (``{"question": ...}`` / ``{"answer": ..., - "model": ...}``) instead of letting the observer dump the raw - State. Production teams use this to keep PII out of trace - payloads while still surfacing operational signal. + hooks on ``LangfuseObserver``. The hooks derive domain dicts + (``{"question": ...}`` / ``{"answer": ..., "model": ...}``) + instead of letting the observer dump the raw State. Production + teams use this to keep PII out of trace payloads while still + surfacing operational signal. - Built-in ``TimingMiddleware`` from ``openarmature.graph`` wrapping the respond node. An ``on_complete`` callback receives a ``TimingRecord(node_name, duration_ms, outcome, @@ -43,7 +43,9 @@ observer call surface doesn't change. - **Queryable accumulator observer + per-invocation drain.** A third observer (``LlmUsageAccumulator``) rolls up LLM token - totals per invocation. A terminal ``persist`` node calls + totals per invocation, including a cache-hit ratio from + ``usage.cached_tokens`` for backends with prefix caching (vLLM, + Anthropic, etc.). A terminal ``persist`` node calls ``await graph.drain_events_for(current_invocation_id(), timeout=2.0)`` to synchronize on the deliver loop, then reads the accumulator's bucket and drops it. Without the drain, the bucket would be @@ -56,6 +58,18 @@ ``Observer`` itself stays a single-callable protocol; the queryable accumulator just exposes its own read methods (``get_bucket`` / ``drop``) that the persist node knows about. +- **Failure-category tracker observer.** A fourth observer + (``LlmFailureTracker``) subscribes to ``LlmFailedEvent``, the + typed failure-side counterpart to ``LlmCompletionEvent``, fired + once per LLM call that fails with a provider error category. The + tracker maintains a per-invocation ``{category: count}`` bucket; + the persist node reads + reports it alongside the usage rollup. + Together the two observers give operators success/failure + attribution at request scope without joining against external + trace storage. The provider emits exactly one of + (``LlmCompletionEvent``, ``LlmFailedEvent``) per LLM call, never + both, so attempt counts derive cleanly as + ``usage.call_count + sum(failure.by_category.values())``. Complementary to the observer-hooks example (three observers side-by-side) and the langfuse-observability example (Langfuse @@ -101,6 +115,7 @@ GraphBuilder, InvocationCompletedEvent, LlmCompletionEvent, + LlmFailedEvent, NodeException, ObserverEvent, State, @@ -164,30 +179,30 @@ class BriefingState(State): # for accumulators. # # The accumulator subscribes to every event but only records the -# typed ``LlmCompletionEvent`` variant — one event per successful LLM -# call, structured outcome fields read directly off the event without -# the namespace-string-match + payload-narrow dance the legacy -# sentinel pattern needed. The provider also dual-emits a sentinel -# ``NodeEvent`` pair during the transition period for backwards -# compatibility with older accumulators; this accumulator ignores -# the sentinel pair because the typed event carries the same outcome -# data without the pair-join logic. New accumulators should follow -# the isinstance-based filter shape here; the CHANGELOG tracks when -# the sentinel emission is removed. +# typed ``LlmCompletionEvent`` variant (one event per successful LLM +# call), structured outcome fields read directly off the event without +# the namespace-string-match + payload-narrow dance older sentinel- +# based filters needed. # -# Per-invocation isolation is by ``LlmCompletionEvent.invocation_id`` -# — read directly off the event, no ContextVar lookup needed. +# Per-invocation isolation is by ``LlmCompletionEvent.invocation_id``, +# read directly off the event, no ContextVar lookup needed. # Concurrent invocations on one observer each get their own bucket. # -# ``LlmCompletionEvent`` is success-only by spec design. Failed LLM -# calls flow through the exception path and do NOT emit the typed -# event, so ``bucket.call_count`` here reflects successful calls -# only. This is the right semantic for a usage accumulator (failed -# calls produce no tokens / cost). A pipeline tracking attempt-level -# failure rates needs a separate listener — either a custom observer -# on the sentinel ``NodeEvent`` pair, or a future -# ``LlmCallFailedEvent`` typed variant if and when that proposal -# lands. +# Cache-stat tracking: the bucket also rolls up ``usage.cached_tokens`` +# when the provider reports it. Backends with prefix caching (vLLM +# ``--enable-prefix-caching``, Anthropic prompt caching, OpenAI's +# ``prompt_token_usage`` cache report when enabled) populate the +# field; backends without cache visibility leave it ``None`` and the +# accumulator records zero. The persist node prints a cache-hit +# ratio so operators see whether prefix caching is paying off at +# request scope without having to cross-join Langfuse rows. +# +# ``LlmCompletionEvent`` is a success-only event. Failed LLM calls +# flow through the exception path and emit the parallel +# ``LlmFailedEvent`` variant (see ``LlmFailureTracker`` below), so +# ``bucket.call_count`` here reflects successful calls only. This is +# the right semantic for a usage accumulator (failed calls produce +# no tokens / cost). @dataclass @@ -195,6 +210,7 @@ class _UsageBucket: prompt_tokens: int = 0 completion_tokens: int = 0 total_tokens: int = 0 + cached_tokens: int = 0 call_count: int = 0 @@ -220,21 +236,21 @@ async def __call__(self, event: ObserverEvent) -> None: if not isinstance(event, LlmCompletionEvent): return # call_count tracks successful LLM calls (the typed event is - # success-only by spec design). Spec contract has "call - # happened" and "usage reported" as INDEPENDENT — a provider - # may legitimately omit usage on a successful call. Create the - # bucket and increment call_count unconditionally so the - # counter reflects all successful calls; gate only the - # token-counting math on usage being populated. + # emitted on success only). "Call happened" and "usage + # reported" are independent; a provider may legitimately omit + # usage on a successful call. Create the bucket and increment + # call_count unconditionally so the counter reflects all + # successful calls; gate only the token-counting math on usage + # being populated. bucket = self._by_invocation.setdefault(event.invocation_id, _UsageBucket()) bucket.call_count += 1 - # The typed event's usage field is nullable per the spec - # contract ("may be null when the provider does not report - # usage"). Python's provider always passes a Usage instance - # (with all-None fields when not reported), but the defensive - # guard keeps the accumulator robust against future providers - # that exercise the null option. Calls without reported usage - # contribute zero tokens (the only honest value we can record). + # The typed event's usage field is nullable (it may be ``None`` + # when the provider does not report usage). Python's provider + # always passes a Usage instance (with all-None fields when not + # reported), but the defensive guard keeps the accumulator + # robust against future providers that exercise the null + # option. Calls without reported usage contribute zero tokens + # (the only honest value we can record). usage = event.usage if usage is None: return @@ -251,9 +267,16 @@ async def __call__(self, event: ObserverEvent) -> None: bucket.total_tokens += usage.total_tokens elif usage.prompt_tokens is not None or usage.completion_tokens is not None: bucket.total_tokens += (usage.prompt_tokens or 0) + (usage.completion_tokens or 0) + # Cache-stat fields are populated only when the provider + # reports them. Backends without prefix-cache visibility + # leave ``cached_tokens`` ``None`` and the bucket records + # zero; the persist node's cache-hit ratio degrades to 0% + # gracefully in that case. + if usage.cached_tokens is not None: + bucket.cached_tokens += usage.cached_tokens # Consumers MUST synchronize on ``drain_events_for`` before - # calling ``get_bucket`` if completeness matters — without the + # calling ``get_bucket`` if completeness matters; without the # drain the deliver loop may still hold pending events whose # tokens have not been added yet. ``None`` is returned when # nothing has been recorded yet (e.g., an invocation with no @@ -263,7 +286,7 @@ def get_bucket(self, invocation_id: str) -> _UsageBucket | None: return self._by_invocation.get(invocation_id) # Bucket lifecycle is two-step. Fast path: a terminal node calls - # ``drop()`` immediately after reading via ``get_bucket()`` — + # ``drop()`` immediately after reading via ``get_bucket()``; # that's the normal case and runs while the invocation is still # active. Backstop: the accumulator's ``__call__`` also drops # any bucket still present when ``InvocationCompletedEvent`` @@ -276,21 +299,91 @@ def drop(self, invocation_id: str) -> None: self._by_invocation.pop(invocation_id, None) +# --------------------------------------------------------------------------- +# Failure tracker (per-invocation LLM error-category rollup) +# --------------------------------------------------------------------------- +# Parallel queryable observer for the failure path. Subscribes to +# ``LlmFailedEvent``, the typed counterpart to ``LlmCompletionEvent``, +# fired exactly once for every LLM call that fails with a provider +# error category. Every LLM call emits either one +# ``LlmCompletionEvent`` (success) or one ``LlmFailedEvent`` +# (failure), never both, so the tracker can count attempt-level +# failures by category without joining against the success-side +# accumulator. +# +# This is the listener the success-side accumulator delegates to: +# ``LlmUsageAccumulator.bucket.call_count`` counts successful calls +# only. Operators wanting attempt-level failure rates (e.g. ``how +# often did this tenant's calls land on a rate-limited provider this +# hour?``) read the failure tracker's bucket alongside the usage +# accumulator's bucket and compute the ratio at request scope. +# +# Bucket shape is a per-category counter: ``{"provider_rate_limit": 2, +# "provider_unavailable": 1, ...}``. ``error_category`` is one of the +# nine canonical category strings, so the dict keys form a stable, +# greppable vocabulary across providers. ``error_type`` (vendor- +# specific code) and ``error_message`` are NOT recorded here; the +# tracker's job is rate / category attribution at request scope, not +# vendor-error forensics; that lives in the OTel + Langfuse spans +# where the full exception detail is captured. + + +@dataclass +class _FailureBucket: + # ``dict[category, count]`` keyed by the canonical + # ``error_category`` strings. Total attempts can be derived as + # ``sum(by_category.values())``; the tracker doesn't carry a + # separate counter for it. + by_category: dict[str, int] + + @classmethod + def empty(cls) -> _FailureBucket: + return cls(by_category={}) + + +class LlmFailureTracker: + """Per-invocation LLM failure-category rollup.""" + + def __init__(self) -> None: + self._by_invocation: dict[str, _FailureBucket] = {} + + async def __call__(self, event: ObserverEvent) -> None: + # Backstop cleanup mirrors LlmUsageAccumulator's pattern so + # late-delivered failure events after a partial drain + # cannot leak a bucket. + if isinstance(event, InvocationCompletedEvent): + self._by_invocation.pop(event.invocation_id, None) + return + if not isinstance(event, LlmFailedEvent): + return + bucket = self._by_invocation.setdefault(event.invocation_id, _FailureBucket.empty()) + bucket.by_category[event.error_category] = bucket.by_category.get(event.error_category, 0) + 1 + + def get_bucket(self, invocation_id: str) -> _FailureBucket | None: + """Read the accumulated failure bucket for an invocation.""" + return self._by_invocation.get(invocation_id) + + def drop(self, invocation_id: str) -> None: + """Release the failure bucket for an invocation.""" + self._by_invocation.pop(invocation_id, None) + + # Module-level singletons make the persist node closure-free and # match how ``_provider_instance`` is handled. In an application # server, these would live on a request-scoped or app-scoped # container instead. _accumulator: LlmUsageAccumulator | None = None +_failure_tracker: LlmFailureTracker | None = None _compiled_graph: CompiledGraph[BriefingState] | None = None # --------------------------------------------------------------------------- # Caller hooks for Langfuse trace.input / trace.output # --------------------------------------------------------------------------- -# Per proposal 0043 §8.4.1, the LangfuseObserver lets callers derive -# domain-shaped trace.input and trace.output from State rather than -# letting the framework dump the raw State object. The hooks fire -# once per invocation: trace_input_from_state on InvocationStartedEvent, +# The LangfuseObserver lets callers derive domain-shaped trace.input +# and trace.output from State rather than letting the framework dump +# the raw State object. The hooks fire once per invocation: +# trace_input_from_state on InvocationStartedEvent, # trace_output_from_state on InvocationCompletedEvent. Production # teams use this to keep PII out of trace payloads while still # surfacing the operational signal a Langfuse UI viewer needs. @@ -354,19 +447,19 @@ async def respond(state: BriefingState) -> dict[str, Any]: } -# Terminal node. State is intentionally unused — this node's job is +# Terminal node. State is intentionally unused: this node's job is # to synchronize on the observer deliver loop and report a derived # rollup, not to read or modify pipeline state. # # ``drain_events_for`` blocks until every event dispatched up to this # point has reached every attached observer. Without it the # accumulator's bucket may still be missing the most-recent LLM -# event's tokens — the deliver loop hasn't processed them yet when -# the node body runs. Snapshot semantic: the drain awaits only +# event's tokens, since the deliver loop hasn't processed them yet +# when the node body runs. Snapshot semantic: the drain awaits only # events dispatched BEFORE the call (this node's own ``started`` # event included), not events that fire after the call begins # (notably this node's own ``completed`` event, which only fires -# after the body returns — that's how the call avoids deadlocking +# after the body returns; that's how the call avoids deadlocking # on itself). # # Default timeout is 5.0s; the demo tightens to 2.0s so a stuck @@ -375,15 +468,15 @@ async def respond(state: BriefingState) -> dict[str, Any]: # lets the caller record an SLO breach and proceed with whatever # data is available, rather than failing the whole invocation. async def persist(_state: BriefingState) -> dict[str, Any]: - """Drain the deliver loop, read the LLM-usage rollup, drop the bucket.""" + """Drain the deliver loop, read the LLM-usage + failure rollups, drop the buckets.""" # Use explicit RuntimeError rather than ``assert`` so the failure # mode stays informative under ``python -O`` (which strips asserts # and would otherwise turn these into silent ``None`` dereferences # at the next attribute access). - if _compiled_graph is None or _accumulator is None: + if _compiled_graph is None or _accumulator is None or _failure_tracker is None: raise RuntimeError( - "persist node requires _compiled_graph and _accumulator to be set " - "before invoke() — see build_graph() for the initialization pattern" + "persist node requires _compiled_graph, _accumulator, and _failure_tracker " + "to be set before invoke(); see build_graph() for the initialization pattern" ) invocation_id = current_invocation_id() if invocation_id is None: @@ -394,21 +487,42 @@ async def persist(_state: BriefingState) -> dict[str, Any]: # gap inline so a reader sees what an incomplete drain looks # like. print(f"[persist] drain incomplete: {summary.undelivered_count} events still pending after 2.0s") - bucket = _accumulator.get_bucket(invocation_id) + # Read both buckets up front so the drop calls run in pairs and + # neither bucket leaks if a later print raises. + usage_bucket = _accumulator.get_bucket(invocation_id) + failure_bucket = _failure_tracker.get_bucket(invocation_id) _accumulator.drop(invocation_id) - if bucket is None: - print("[persist] no LLM usage recorded for this invocation") - return {} + _failure_tracker.drop(invocation_id) # In production, this is where you'd write the canonical # invocation artifact to durable storage: a JSON record with the - # answer + per-invocation token cost + caller metadata + trace - # IDs for cross-system join. The demo prints the rollup so the - # pattern is legible. - print( - f"[persist] LLM usage: prompt={bucket.prompt_tokens}, " - f"completion={bucket.completion_tokens}, total={bucket.total_tokens} " - f"across {bucket.call_count} call(s)" - ) + # answer + per-invocation token cost + cache-hit ratio + failure + # category counts + caller metadata + trace IDs for cross-system + # join. The demo prints the rollups so the pattern is legible. + if usage_bucket is None: + print("[persist] no LLM usage recorded for this invocation") + else: + # Cache-hit ratio is ``cached_tokens / prompt_tokens`` when the + # provider reports cache stats. Backends without cache + # visibility report ``cached_tokens=0``; the ratio degrades + # to 0% gracefully without special-casing. + if usage_bucket.prompt_tokens > 0: + cache_hit_pct = (usage_bucket.cached_tokens / usage_bucket.prompt_tokens) * 100 + else: + cache_hit_pct = 0.0 + print( + f"[persist] LLM usage: prompt={usage_bucket.prompt_tokens} " + f"(cached={usage_bucket.cached_tokens}, {cache_hit_pct:.1f}% hit), " + f"completion={usage_bucket.completion_tokens}, " + f"total={usage_bucket.total_tokens} " + f"across {usage_bucket.call_count} call(s)" + ) + if failure_bucket is None or not failure_bucket.by_category: + print("[persist] LLM failures: none") + else: + # Sort by count descending so the noisiest category leads. + ordered = sorted(failure_bucket.by_category.items(), key=lambda kv: (-kv[1], kv[0])) + summary_str = ", ".join(f"{cat}={n}" for cat, n in ordered) + print(f"[persist] LLM failures: {summary_str}") return {} @@ -428,18 +542,20 @@ def build_graph() -> CompiledGraph[BriefingState]: (RetryMiddleware lives in the fan-out-with-retry / parallel- branches examples; this one's scope is observability). - The ``LlmUsageAccumulator`` is constructed, attached to the - compiled graph, and registered to the module-level singletons - so the persist node (which reads from globals to stay closure- - free) can reach it without help from the caller. The factory - is self-contained — ``graph = build_graph(); await graph.invoke(...)`` - works on its own. OTel + Langfuse observers are NOT attached - here; those are observability-stack choices made at the call - site, and ``main()`` attaches them after build_graph() returns. + The ``LlmUsageAccumulator`` and ``LlmFailureTracker`` are + constructed, attached to the compiled graph, and registered to + the module-level singletons so the persist node (which reads + from globals to stay closure-free) can reach them without help + from the caller. The factory is self-contained: ``graph = + build_graph(); await graph.invoke(...)`` works on its own. OTel + + Langfuse observers are NOT attached here; those are + observability-stack choices made at the call site, and + ``main()`` attaches them after build_graph() returns. """ - global _accumulator, _compiled_graph + global _accumulator, _failure_tracker, _compiled_graph timing = TimingMiddleware(node_name="respond", on_complete=_emit_timing) _accumulator = LlmUsageAccumulator() + _failure_tracker = LlmFailureTracker() _compiled_graph = ( GraphBuilder(BriefingState) .add_node("respond", respond, middleware=[timing]) @@ -450,6 +566,7 @@ def build_graph() -> CompiledGraph[BriefingState]: .compile() ) _compiled_graph.attach_observer(_accumulator) + _compiled_graph.attach_observer(_failure_tracker) return _compiled_graph @@ -467,7 +584,8 @@ def build_graph() -> CompiledGraph[BriefingState]: # Caller hooks attach to LangfuseObserver via constructor kwargs. # ``disable_llm_payload=False`` opts in to capturing the input # messages + output content on Generation observations so the demo -# output is meaningful; default-True is the spec privacy posture. +# output is meaningful; the default-True is the privacy-preserving +# setting. def _build_otel_observer(exporter: InMemorySpanExporter) -> OTelObserver: @@ -505,10 +623,10 @@ def _build_langfuse_observer(client: InMemoryLangfuseClient) -> LangfuseObserver # have ingested. -# Invocation-span-only attributes (spec 5.1). Surface these only on -# the root ``openarmature.invocation`` span line; inner spans don't -# carry them (they're invocation-level constants, not cross-cutting -# 5.6 attributes). +# Invocation-level attributes. Surface these only on the root +# ``openarmature.invocation`` span line; inner spans don't carry them +# (they're invocation-level constants, not cross-cutting per-node +# attributes). _INVOCATION_SPAN_KEYS = ( "openarmature.graph.entry_node", "openarmature.graph.spec_version", @@ -516,10 +634,21 @@ def _build_langfuse_observer(client: InMemoryLangfuseClient) -> LangfuseObserver "openarmature.implementation.version", ) -# Per-node + cross-cutting attributes (5.6 + GenAI semconv). Surface +# Per-node + cross-cutting attributes. The ``gen_ai.*`` family +# follows the OpenTelemetry GenAI semantic conventions. Surface # these on inner-node spans only; they propagate to the invocation # span too but showing them there is redundant once they appear on # every node line below. +# +# ``openarmature.llm.cache_read.input_tokens`` is the OA-namespace +# cache-stat attribute. Lands on the LLM span only and only when the +# provider reports cache hits. Backends with prefix caching (vLLM +# ``--enable-prefix-caching``, Anthropic prompt caching, OpenAI's +# ``prompt_token_usage`` cache report when enabled) populate it; +# OpenAI's default ``gpt-4o-mini`` configuration leaves it absent. +# The formatter omits the entry when absent rather than showing +# ``None`` so a reader instantly sees whether prefix caching is +# paying off in the observed run. _INNER_SPAN_KEYS = ( "openarmature.node.name", "openarmature.user.tenantId", @@ -528,6 +657,7 @@ def _build_langfuse_observer(client: InMemoryLangfuseClient) -> LangfuseObserver "gen_ai.system", "gen_ai.usage.input_tokens", "gen_ai.usage.output_tokens", + "openarmature.llm.cache_read.input_tokens", ) @@ -536,11 +666,11 @@ def _format_otel_spans(spans: list[ReadableSpan]) -> str: The ``openarmature.invocation`` root span closes on observer ``shutdown()`` and surfaces only its invocation-level - attributes (spec 5.1 — entry_node, spec_version, implementation - name + version). Inner-node spans surface the cross-cutting - caller metadata + GenAI semconv attributes; printing them on - the invocation line too would just repeat data shown three - more times below. + attributes (entry_node, spec_version, implementation name + + version). Inner-node spans surface the cross-cutting caller + metadata + ``gen_ai.*`` attributes; printing them on the + invocation line too would just repeat data shown three more + times below. """ if not spans: return " (no spans captured)" @@ -564,8 +694,8 @@ def _format_langfuse_trace(trace: LangfuseTrace) -> str: Mirrors what the Langfuse production UI renders for the same invocation: trace.input / trace.output (sourced via the caller - hooks), top-level metadata (caller-supplied + spec keys), and - the Observation tree underneath. + hooks), top-level metadata (caller-supplied + framework-reserved + keys), and the Observation tree underneath. """ lines: list[str] = [] lines.append(f"Trace id={trace.id}") @@ -615,7 +745,7 @@ async def main() -> None: # observability-stack observers on top. graph = build_graph() # Keep the OTel observer reachable so we can ``shutdown()`` it - # after drain — the root ``openarmature.invocation`` span only + # after drain; the root ``openarmature.invocation`` span only # closes on shutdown, and the in-memory exporter only surfaces # closed spans through ``get_finished_spans()``. Production # deployments do the same dance at process exit. diff --git a/examples/tool-use/main.py b/examples/tool-use/main.py index 01ed273..72f26ea 100644 --- a/examples/tool-use/main.py +++ b/examples/tool-use/main.py @@ -29,9 +29,9 @@ schema (or ``None`` only under ``finish_reason="error"``). - The dispatcher node parses each ``ToolCall``, runs the matching local Python function, and appends one - ``ToolMessage(content=..., tool_call_id=...)`` per call. Spec - requires the ``tool_call_id`` round-trip exactly so the model can - pair its requests with the responses. + ``ToolMessage(content=..., tool_call_id=...)`` per call. The + ``tool_call_id`` must round-trip exactly so the model can pair + its requests with the responses. - The loop is just a conditional edge on the graph: ``call_llm`` → ``dispatch_tools`` → back to ``call_llm`` when the model wants more tools, or → ``present`` when it's done. No special "agent diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index c1e65aa..e93024b 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1456,7 +1456,7 @@ Catching `Exception` works but is too broad; catching one hierarchy misses the o ### Filter `openarmature.*`-namespaced events when your observer only cares about user nodes -OA emits observer events under sentinel node-names for its own internal dispatch: `openarmature.llm.complete` for LLM provider calls (proposal 0024), `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014), `openarmature.checkpoint.save` for checkpoint saves (proposal 0010). These events let the OTel / Langfuse observers emit LLM-provider spans, checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: +OA emits observer events under sentinel node-names for some internal dispatch: `openarmature.checkpoint.migrate` for state-migration runs (proposal 0014) and `openarmature.checkpoint.save` for checkpoint saves (proposal 0010) ride on `NodeEvent` with a sentinel namespace. (LLM provider calls used to follow the same pattern but moved to typed `LlmCompletionEvent` / `LlmFailedEvent` variants in v0.13.0 per proposals 0049 + 0058 — those are filtered by `isinstance` instead.) The sentinel-namespace events let the OTel / Langfuse observers emit checkpoint-migrate spans, etc., but a custom observer that only cares about user-defined node activity sees them as noise: ```python async def __call__(self, event: NodeEvent) -> None: @@ -1466,7 +1466,7 @@ async def __call__(self, event: NodeEvent) -> None: # … user-node handling ``` -`event.namespace[0]` is the safest discriminator (the leaf `event.node_name` would also work for LLM events but won't match the checkpoint sentinels since those repurpose `node_name` differently). Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. +`event.namespace[0]` is the safest discriminator. Don't try to filter on `current_invocation_id() is None`: OA-internal events are dispatched within the same invocation context as user-node events, so `invocation_id` is set for both; the namespace-prefix check is the stable contract. ### Fan-out subgraphs that emit `list[X]` per instance produce `list[list[X]]` at `target_field` diff --git a/tests/test_production_observability_accumulators.py b/tests/test_production_observability_accumulators.py new file mode 100644 index 0000000..f7035f3 --- /dev/null +++ b/tests/test_production_observability_accumulators.py @@ -0,0 +1,256 @@ +"""Behavioral regression tests for the production-observability example's +queryable observers. + +``test_examples_smoke.py`` only proves the example loads and its +``build_graph()`` compiles. This file goes one level deeper into the two +queryable-observer classes the example ships, locking the logic a +happy-path live run never reaches: + +- ``LlmUsageAccumulator`` accumulating ``usage.cached_tokens`` and the + cache-hit ratio the persist node derives from it (a real OpenAI run + reports zero cached tokens, so the ratio is always 0.0% there). +- ``LlmFailureTracker`` counting failures by category (a successful run + produces an empty bucket and prints "none"). +- Mutual exclusion between the success and failure events: each observer + ignores the other's event type, so the two never double-count. +- The per-invocation bucket cleanup on ``InvocationCompletedEvent``. +- The OTel formatter surfacing the cache-read span attribute. + +The classes live in the example module, so the example is loaded with +``runpy.run_path`` (matching the smoke test). ``runpy`` returns a copy of +the executed namespace; the module's own functions hold the live dict via +``__globals__``, and ``build_graph()`` / ``persist()`` read and write +module-level singletons there, so the fixture hands back that live +namespace rather than the returned copy. +""" + +from __future__ import annotations + +import runpy +from pathlib import Path +from types import SimpleNamespace +from typing import Any + +import pytest + +# The example imports opentelemetry-sdk and langfuse record types at module +# top; skip cleanly when the extras aren't installed. +pytest.importorskip("opentelemetry.sdk.trace") +pytest.importorskip("langfuse") + +from openarmature.graph import ( # noqa: E402 + InvocationCompletedEvent, + LlmCompletionEvent, + LlmFailedEvent, +) +from openarmature.llm import Usage # noqa: E402 +from openarmature.observability.correlation import ( # noqa: E402 + _reset_invocation_id, + _set_invocation_id, +) + +EXAMPLES_DIR = Path(__file__).parent.parent / "examples" + + +@pytest.fixture +def example_ns() -> dict[str, Any]: + """Load the production-observability example and return its live + module namespace (the dict the module's functions actually read and + write, not ``runpy``'s returned copy).""" + main_py = EXAMPLES_DIR / "production-observability" / "main.py" + # A fresh run_path each call keeps tests isolated: build_graph() + # mutates module-level singletons, and a fresh namespace per test + # means those mutations don't leak. Reach the live namespace through + # a function's __globals__ rather than runpy's returned copy. + returned = runpy.run_path(str(main_py), run_name="__not_main__") + return returned["build_graph"].__globals__ + + +def _usage( + *, + prompt: int | None = None, + completion: int | None = None, + total: int | None = None, + cached: int | None = None, +) -> Usage: + return Usage( + prompt_tokens=prompt, + completion_tokens=completion, + total_tokens=total, + cached_tokens=cached, + ) + + +def _completion(invocation_id: str, usage: Usage) -> LlmCompletionEvent: + return LlmCompletionEvent( + invocation_id=invocation_id, + correlation_id="corr", + node_name="respond", + namespace=("respond",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + provider="openai", + model="gpt-4o-mini", + response_id="resp", + response_model="gpt-4o-mini-2024-07-18", + usage=usage, + latency_ms=12.3, + finish_reason="stop", + input_messages=[], + output_content="ok", + request_params={}, + request_extras={}, + active_prompt=None, + active_prompt_group=None, + call_id="call", + ) + + +def _failure(invocation_id: str, category: str) -> LlmFailedEvent: + return LlmFailedEvent( + invocation_id=invocation_id, + correlation_id="corr", + node_name="respond", + namespace=("respond",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + provider="openai", + model="gpt-4o-mini", + latency_ms=5.0, + input_messages=[], + request_params={}, + request_extras={}, + active_prompt=None, + active_prompt_group=None, + call_id="call", + error_category=category, + error_message="boom", + ) + + +def _completed(invocation_id: str) -> InvocationCompletedEvent: + return InvocationCompletedEvent( + final_state=None, + status="completed", + final_node="persist", + invocation_id=invocation_id, + correlation_id="corr", + ) + + +# --- LlmUsageAccumulator ------------------------------------------------- + + +async def test_usage_accumulator_accumulates_cache_tokens(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_completion("inv", _usage(prompt=100, completion=40, total=140, cached=30))) + await acc(_completion("inv", _usage(prompt=50, completion=20, total=60, cached=10))) + bucket = acc.get_bucket("inv") + assert ( + bucket.prompt_tokens, + bucket.completion_tokens, + bucket.total_tokens, + bucket.cached_tokens, + bucket.call_count, + ) == (150, 60, 200, 40, 2) + + +async def test_usage_accumulator_tolerates_null_cache(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_completion("inv", _usage(prompt=10, completion=5, total=15, cached=None))) + assert acc.get_bucket("inv").cached_tokens == 0 + + +async def test_usage_accumulator_ignores_failure_event(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_failure("inv", "provider_rate_limit")) + assert acc.get_bucket("inv") is None + + +async def test_usage_accumulator_drops_bucket_on_completion(example_ns: dict[str, Any]) -> None: + acc = example_ns["LlmUsageAccumulator"]() + await acc(_completion("inv", _usage(prompt=1, completion=1, total=2, cached=0))) + await acc(_completed("inv")) + assert acc.get_bucket("inv") is None + + +# --- LlmFailureTracker --------------------------------------------------- + + +async def test_failure_tracker_counts_by_category(example_ns: dict[str, Any]) -> None: + tracker = example_ns["LlmFailureTracker"]() + await tracker(_failure("inv", "provider_rate_limit")) + await tracker(_failure("inv", "provider_unavailable")) + await tracker(_failure("inv", "provider_rate_limit")) + assert dict(tracker.get_bucket("inv").by_category) == { + "provider_rate_limit": 2, + "provider_unavailable": 1, + } + + +async def test_failure_tracker_ignores_completion_event(example_ns: dict[str, Any]) -> None: + tracker = example_ns["LlmFailureTracker"]() + await tracker(_completion("inv", _usage(prompt=1, completion=1, total=2, cached=0))) + assert tracker.get_bucket("inv") is None + + +async def test_failure_tracker_drops_bucket_on_completion(example_ns: dict[str, Any]) -> None: + tracker = example_ns["LlmFailureTracker"]() + await tracker(_failure("inv", "provider_rate_limit")) + await tracker(_completed("inv")) + assert tracker.get_bucket("inv") is None + + +# --- persist node output ------------------------------------------------- + + +async def test_persist_reports_cache_ratio_and_failure_breakdown( + example_ns: dict[str, Any], capsys: pytest.CaptureFixture[str] +) -> None: + # build_graph() compiles the graph and installs the two accumulators + # as module-level singletons that persist() reads. drain_events_for + # returns an empty summary for an invocation with no active worker, + # so persist() runs to completion offline without a live invoke(). + example_ns["build_graph"]() + acc = example_ns["_accumulator"] + tracker = example_ns["_failure_tracker"] + inv = "inv-persist" + await acc(_completion(inv, _usage(prompt=100, completion=40, total=140, cached=30))) + await tracker(_failure(inv, "provider_rate_limit")) + await tracker(_failure(inv, "provider_unavailable")) + await tracker(_failure(inv, "provider_rate_limit")) + + state = example_ns["BriefingState"](question="q") + token = _set_invocation_id(inv) + try: + await example_ns["persist"](state) + finally: + _reset_invocation_id(token) + + out = capsys.readouterr().out + assert ( + "[persist] LLM usage: prompt=100 (cached=30, 30.0% hit), " + "completion=40, total=140 across 1 call(s)" in out + ) + assert "[persist] LLM failures: provider_rate_limit=2, provider_unavailable=1" in out + + +# --- OTel span formatter ------------------------------------------------- + + +def test_otel_formatter_surfaces_cache_read_attribute(example_ns: dict[str, Any]) -> None: + # Stand-in for a ReadableSpan: the formatter reads name / attributes / + # start_time / end_time only. + span = SimpleNamespace( + name="openarmature.llm.complete", + attributes={ + "gen_ai.system": "openai", + "openarmature.llm.cache_read.input_tokens": 12, + }, + start_time=0, + end_time=1_000_000, + ) + rendered = example_ns["_format_otel_spans"]([span]) + assert "openarmature.llm.cache_read.input_tokens=12" in rendered