Skip to content

Commit 5e98f4e

Browse files
Migrate LlmUsageAccumulator to typed event filter (#138)
* Migrate LlmUsageAccumulator to typed event filter Switch the reference accumulator's filter from the 4-stage NodeEvent + namespace + phase + payload-narrow check to a single isinstance(event, LlmCompletionEvent) call. Field access moves from event.pre_state.X to event.usage.X (reusing the same Usage shape the sentinel payload mirrored). Per-invocation bucketing reads event.invocation_id directly off the typed event instead of going through current_invocation_id(), reducing one level of indirection that the typed-event surface enables. Add a defensive guard against a None usage record. Spec types the field as nullable; python's provider always passes a Usage instance today, but the guard keeps the accumulator robust against future providers that exercise the null option. Document the success-only semantic of LlmCompletionEvent and its effect on bucket.call_count: failed LLM calls flow through the exception path and do not emit the typed event, so call_count now reflects successful calls only. Production code migrating an existing accumulator from the sentinel pattern should expect this counting shift if it was previously counting failure-path events. A pipeline tracking attempt-level failure rates needs a separate listener (sentinel NodeEvent pair, or a future failure-event typed variant if that proposal lands). Disclaimer in the docs walkthrough that the captured OTel span name and gen_ai.usage.* attribute family still come from the sentinel handler; the OTel + Langfuse observers have not yet migrated to consuming the typed event. Span names and attribute paths may shift when the observer migration lands. Reframe comment and docs language to describe the architectural state without pinning specific release numbers. Forward-looking promises and historical version pins both age poorly; the CHANGELOG is the authoritative reference for cutoff timing. * Count successful LLM calls without reported usage Address PR review feedback: the early-return on usage is None was tying call_count to "successful AND reported usage" rather than just "successful." The spec contract has those independent — providers may legitimately omit usage on a successful call. Restructure so the bucket creation and call_count increment happen unconditionally for any LlmCompletionEvent, with the usage-None guard gating only the token-counting math. Successful calls without reported usage now count toward call_count and contribute zero tokens (the only honest value we can record). Smoke test confirms a sequence of one event with usage populated and one event with usage=None produces call_count=2 with token totals matching the populated event only.
1 parent 3307b5b commit 5e98f4e

2 files changed

Lines changed: 100 additions & 49 deletions

File tree

docs/examples/production-observability.md

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,11 @@ sees the same logical events represented two ways.
8585
- **Queryable accumulator + `drain_events_for`**
8686
([queryable observer pattern](../concepts/observability.md)).
8787
A third observer — `LlmUsageAccumulator` — subscribes to the
88-
same event stream but only records the LLM-namespace events
89-
carrying an `LlmEventPayload`. It accumulates per-invocation
90-
token totals in memory, indexed by `current_invocation_id()`.
91-
The terminal `persist` node calls
88+
same event stream but only records the typed
89+
`LlmCompletionEvent` variant (one event per successful LLM call;
90+
outcome fields read directly off the event). It accumulates
91+
per-invocation token totals in memory, indexed by
92+
`event.invocation_id`. The terminal `persist` node calls
9293
`await graph.drain_events_for(current_invocation_id(), timeout=2.0)`
9394
to synchronize on the deliver loop, then reads the accumulator's
9495
bucket and drops it. Without the drain, the bucket might be
@@ -97,8 +98,32 @@ sees the same logical events represented two ways.
9798
a single-callable shape; the accumulator just exposes its own
9899
read methods (`get_bucket` / `drop`) that the persist node knows
99100
about. This is the canonical shape for per-invocation cost
100-
attribution at request scope, replacing the round-trip-through-
101-
State workarounds that pre-v0.12.0 deployments used.
101+
attribution at request scope, in place of routing every token
102+
count through State (a workaround that pollutes the state
103+
schema with non-pipeline data).
104+
105+
The filter shape is `isinstance(event, LlmCompletionEvent)`
106+
one isinstance check against the typed event variant on the
107+
observer event union. The provider also dual-emits a sentinel
108+
`NodeEvent` pair during the transition period for backwards
109+
compatibility with older accumulators; this example's
110+
accumulator ignores the sentinel pair because the typed event
111+
carries the same outcome data without the pair-join logic. New
112+
accumulators should follow the isinstance-based filter shape
113+
here; the CHANGELOG tracks when the sentinel emission is
114+
removed.
115+
116+
`LlmCompletionEvent` is success-only by spec design. Failed LLM
117+
calls flow through the exception path and do not emit the typed
118+
event, so `bucket.call_count` reflects successful calls only.
119+
This is the right semantic for a usage accumulator (failed
120+
calls produce no tokens). A pipeline tracking attempt-level
121+
failure rates needs a separate listener — either a custom
122+
observer on the sentinel `NodeEvent` pair or a future
123+
failure-event typed variant if and when that proposal lands.
124+
Production code migrating an existing accumulator from the
125+
sentinel pattern should expect this counting shift if it was
126+
previously counting failure-path events.
102127

103128
## How to run
104129

@@ -167,8 +192,15 @@ Trace id=<uuid>
167192
- **OTel spans block**: one line per captured span, sorted by
168193
start time. The relevant attributes shown are a curated subset
169194
for readability; the full attribute set is on each `Span` object
170-
for any reader inspecting them programmatically. Note three
171-
attribute families worth telling apart:
195+
for any reader inspecting them programmatically. The
196+
`openarmature.llm.complete` span name + the `gen_ai.usage.*`
197+
attribute family come from the OTel observer's current
198+
sentinel-`NodeEvent` handler — the OTel and Langfuse observers
199+
have not yet migrated to consuming the typed `LlmCompletionEvent`
200+
variant. Span names and attribute paths may shift when the
201+
observer migration lands; the example's emitted span structure
202+
tracks the current observer behavior. Note three attribute
203+
families worth telling apart:
172204
- The root `openarmature.invocation` span carries
173205
`openarmature.graph.spec_version` plus the
174206
`openarmature.implementation.name` / `.version` attribution

examples/production-observability/main.py

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@
4949
bucket and drops it. Without the drain, the bucket would be
5050
missing the most recent LLM event's tokens (the deliver loop
5151
hasn't reached them yet). This is the canonical shape for
52-
per-invocation cost attribution at request scope, replacing the
53-
round-trip-through-State workarounds that pre-v0.12.0 deployments
54-
used. The pattern is convention-only at the observer level:
52+
per-invocation cost attribution at request scope, in place of
53+
routing every token count through State (a workaround pattern
54+
that pollutes the state schema with non-pipeline data). The
55+
pattern is convention-only at the observer level:
5556
``Observer`` itself stays a single-callable protocol; the
5657
queryable accumulator just exposes its own read methods
5758
(``get_bucket`` / ``drop``) that the persist node knows about.
@@ -99,7 +100,7 @@
99100
CompiledGraph,
100101
GraphBuilder,
101102
InvocationCompletedEvent,
102-
NodeEvent,
103+
LlmCompletionEvent,
103104
NodeException,
104105
ObserverEvent,
105106
State,
@@ -112,7 +113,6 @@
112113
SystemMessage,
113114
UserMessage,
114115
)
115-
from openarmature.observability import LLM_NAMESPACE, LlmEventPayload
116116
from openarmature.observability.correlation import current_invocation_id
117117
from openarmature.observability.langfuse import (
118118
InMemoryLangfuseClient,
@@ -163,13 +163,31 @@ class BriefingState(State):
163163
# consume. Convention only; openarmature does not ship a base class
164164
# for accumulators.
165165
#
166-
# The accumulator subscribes to every event but only records the LLM-
167-
# namespace ones (provider-emitted ``openarmature.llm.complete`` event
168-
# pair carrying an LlmEventPayload on ``pre_state``). Per-invocation
169-
# isolation is by ``current_invocation_id()`` — read inside the
170-
# observer callback from the worker's Context, populated by the
171-
# engine at worker create time. Concurrent invocations on one
172-
# observer each get their own bucket.
166+
# The accumulator subscribes to every event but only records the
167+
# typed ``LlmCompletionEvent`` variant — one event per successful LLM
168+
# call, structured outcome fields read directly off the event without
169+
# the namespace-string-match + payload-narrow dance the legacy
170+
# sentinel pattern needed. The provider also dual-emits a sentinel
171+
# ``NodeEvent`` pair during the transition period for backwards
172+
# compatibility with older accumulators; this accumulator ignores
173+
# the sentinel pair because the typed event carries the same outcome
174+
# data without the pair-join logic. New accumulators should follow
175+
# the isinstance-based filter shape here; the CHANGELOG tracks when
176+
# the sentinel emission is removed.
177+
#
178+
# Per-invocation isolation is by ``LlmCompletionEvent.invocation_id``
179+
# — read directly off the event, no ContextVar lookup needed.
180+
# Concurrent invocations on one observer each get their own bucket.
181+
#
182+
# ``LlmCompletionEvent`` is success-only by spec design. Failed LLM
183+
# calls flow through the exception path and do NOT emit the typed
184+
# event, so ``bucket.call_count`` here reflects successful calls
185+
# only. This is the right semantic for a usage accumulator (failed
186+
# calls produce no tokens / cost). A pipeline tracking attempt-level
187+
# failure rates needs a separate listener — either a custom observer
188+
# on the sentinel ``NodeEvent`` pair, or a future
189+
# ``LlmCallFailedEvent`` typed variant if and when that proposal
190+
# lands.
173191

174192

175193
@dataclass
@@ -199,39 +217,40 @@ async def __call__(self, event: ObserverEvent) -> None:
199217
if isinstance(event, InvocationCompletedEvent):
200218
self._by_invocation.pop(event.invocation_id, None)
201219
return
202-
if not isinstance(event, NodeEvent):
203-
return
204-
if event.namespace != LLM_NAMESPACE:
205-
return
206-
# Only the completed half of the pair carries the token counts.
207-
if event.phase != "completed":
220+
if not isinstance(event, LlmCompletionEvent):
208221
return
209-
if not isinstance(event.pre_state, LlmEventPayload):
210-
return
211-
# NodeEvent doesn't carry invocation_id on the dataclass;
212-
# observers read it from the ContextVar, which the
213-
# deliver-loop worker's Context carries from the engine task
214-
# at worker create-time (per-invocation worker, per-invocation
215-
# Context).
216-
invocation_id = current_invocation_id()
217-
if invocation_id is None:
222+
# call_count tracks successful LLM calls (the typed event is
223+
# success-only by spec design). Spec contract has "call
224+
# happened" and "usage reported" as INDEPENDENT — a provider
225+
# may legitimately omit usage on a successful call. Create the
226+
# bucket and increment call_count unconditionally so the
227+
# counter reflects all successful calls; gate only the
228+
# token-counting math on usage being populated.
229+
bucket = self._by_invocation.setdefault(event.invocation_id, _UsageBucket())
230+
bucket.call_count += 1
231+
# The typed event's usage field is nullable per the spec
232+
# contract ("may be null when the provider does not report
233+
# usage"). Python's provider always passes a Usage instance
234+
# (with all-None fields when not reported), but the defensive
235+
# guard keeps the accumulator robust against future providers
236+
# that exercise the null option. Calls without reported usage
237+
# contribute zero tokens (the only honest value we can record).
238+
usage = event.usage
239+
if usage is None:
218240
return
219-
payload = event.pre_state
220-
bucket = self._by_invocation.setdefault(invocation_id, _UsageBucket())
221-
if payload.prompt_tokens is not None:
222-
bucket.prompt_tokens += payload.prompt_tokens
223-
if payload.completion_tokens is not None:
224-
bucket.completion_tokens += payload.completion_tokens
241+
if usage.prompt_tokens is not None:
242+
bucket.prompt_tokens += usage.prompt_tokens
243+
if usage.completion_tokens is not None:
244+
bucket.completion_tokens += usage.completion_tokens
225245
# Prefer the provider-reported total when present; otherwise
226246
# derive from prompt + completion when at least one is known.
227-
# A payload with all three None (rare; provider didn't report
228-
# usage at all) contributes zero, which is the only honest
229-
# value we can record.
230-
if payload.total_tokens is not None:
231-
bucket.total_tokens += payload.total_tokens
232-
elif payload.prompt_tokens is not None or payload.completion_tokens is not None:
233-
bucket.total_tokens += (payload.prompt_tokens or 0) + (payload.completion_tokens or 0)
234-
bucket.call_count += 1
247+
# A usage record with all three None (rare; provider didn't
248+
# report counts at all) contributes zero, which is the only
249+
# honest value we can record.
250+
if usage.total_tokens is not None:
251+
bucket.total_tokens += usage.total_tokens
252+
elif usage.prompt_tokens is not None or usage.completion_tokens is not None:
253+
bucket.total_tokens += (usage.prompt_tokens or 0) + (usage.completion_tokens or 0)
235254

236255
# Consumers MUST synchronize on ``drain_events_for`` before
237256
# calling ``get_bucket`` if completeness matters — without the

0 commit comments

Comments
 (0)