Skip to content

Commit 644af66

Browse files
Emit OA-namespace cache attributes on the LLM span (#140)
* Emit OA-namespace cache attributes on the LLM span Wire the §5.5.3.1 cache-attribute emission from proposal 0047 into the existing OTel observer's sentinel NodeEvent handler: - Extend LlmEventPayload with cached_tokens and cache_creation_tokens, defaulting to None to preserve the absent-vs-reported-zero distinction the spec mandates. - Have the OpenAI provider's _make_llm_event populate both fields from Response.usage (the fields PR 136 of 0047 added). The cache_creation_tokens field stays None for the OpenAI-compat mapping per spec §8.1.2 but the wiring is symmetric for future providers that source it. - In the OTel observer's _handle_llm_event completed phase, emit openarmature.llm.cache_read.input_tokens when cached_tokens is not None and openarmature.llm.cache_creation.input_tokens when cache_creation_tokens is not None. The conditional emission honors the §5.5.3 convention: absent (None) means the provider did not report; 0 means the provider reported zero hits. Cover the new behavior at three levels: - Four OTel observer unit tests drive the cache-attribute emission through synthetic sentinel started/completed NodeEvent pairs: cache hit, reported zero, absent, both fields populated. - Two provider-side unit tests verify _make_llm_event populates the payload's cache fields from Response.usage at the provider-payload boundary, so a regression here surfaces independently of the observer rendering layer. - Three conformance fixtures (040, 041, 042) activated via a new _run_llm_cache_fixture handler. The handler builds a single-LLM- call graph, captures the response, and asserts on response_usage + llm_span_attributes + llm_span_attributes_absent expectations. The dispatcher uses a set-membership check matching the precedent set by _run_llm_payload_fixture. The conformance.toml 0047 status stays at not-yet — wire-byte canonicalization and prompt-management substring stability are still ahead in the v0.13.0 cycle. * Close OpenAIProvider httpx clients in typed-event runners Address PR review feedback: _run_llm_cache_fixture_case constructed an OpenAIProvider but never awaited aclose(), leaking the underlying httpx.AsyncClient connection pool. Fixture 005 and 038 runners elsewhere in this file close the provider in a finally block; the new cache-fixture runner now follows the same convention. Extending the fix to the typed-event runners introduced in PR #139 (_build_simple_llm_graph, _run_typed_event_fanout_case, _run_typed_event_branches_case) — same bug class, same file, CoPilot missed them in the original review. _build_simple_llm_graph now returns (graph, state_cls, provider) so the caller can close the provider in a finally; the fan-out and parallel-branches runners close their inline-constructed providers symmetrically. No behavior change for fixture pass/fail outcomes; the leak was warning-level rather than failure-inducing. Full suite stays at 1191 pass.
1 parent e8906da commit 644af66

6 files changed

Lines changed: 380 additions & 34 deletions

File tree

src/openarmature/llm/providers/openai.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,6 +1457,8 @@ def _make_llm_event(
14571457
prompt_tokens=usage.prompt_tokens if usage is not None else None,
14581458
completion_tokens=usage.completion_tokens if usage is not None else None,
14591459
total_tokens=usage.total_tokens if usage is not None else None,
1460+
cached_tokens=usage.cached_tokens if usage is not None else None,
1461+
cache_creation_tokens=usage.cache_creation_tokens if usage is not None else None,
14601462
error_type=error_type,
14611463
error_message=error_message,
14621464
error_category=error_category,

src/openarmature/observability/llm_event.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,16 @@ class LlmEventPayload(BaseModel):
7979
prompt_tokens: int | None = None
8080
completion_tokens: int | None = None
8181
total_tokens: int | None = None
82+
# Cache-stat fields sourced from Response.usage per spec proposal
83+
# 0047 (§5.5.3.1 OA-namespace cache attributes). Absent (None)
84+
# when the provider does not report cache stats; set to 0 when
85+
# the provider reports zero hits (the "reported miss" case,
86+
# distinct from absent). The OTel observer emits the
87+
# openarmature.llm.cache_read.input_tokens span attribute when
88+
# cached_tokens is populated; same conditional for
89+
# cache_creation.
90+
cached_tokens: int | None = None
91+
cache_creation_tokens: int | None = None
8292
# error_category is the canonical llm-provider §7 category
8393
# (provider_unavailable, etc.) when the failed exception carried
8494
# one — the provider caller doesn't have a graph-engine §4

src/openarmature/observability/otel/observer.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,20 @@ def _handle_llm_event(self, event: NodeEvent) -> None:
11991199
span.set_attribute("openarmature.llm.usage.completion_tokens", payload.completion_tokens)
12001200
if payload.total_tokens is not None:
12011201
span.set_attribute("openarmature.llm.usage.total_tokens", payload.total_tokens)
1202+
# Spec proposal 0047 §5.5.3.1: OA-namespace cache attributes.
1203+
# Conditional emission per the §5.5.3 convention — the
1204+
# absent-vs-zero distinction is preserved: absent (None)
1205+
# means the provider did not report cache stats; 0 means
1206+
# the provider reported zero hits. OA-namespace per the
1207+
# stable-only upstream adoption policy because the upstream
1208+
# OTel GenAI cache attributes are at Development status.
1209+
if payload.cached_tokens is not None:
1210+
span.set_attribute("openarmature.llm.cache_read.input_tokens", payload.cached_tokens)
1211+
if payload.cache_creation_tokens is not None:
1212+
span.set_attribute(
1213+
"openarmature.llm.cache_creation.input_tokens",
1214+
payload.cache_creation_tokens,
1215+
)
12021216
# §5.5.3 GenAI semconv response attributes (gated by
12031217
# ``disable_genai_semconv``). Tokens mirror the baseline
12041218
# OA-prefixed usage attributes; finish_reasons wraps the

tests/conformance/test_observability.py

Lines changed: 192 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,13 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
121121
# gen_ai.*) MUST raise at the ``invoke()`` boundary before
122122
# any work begins. Two cases (one per reserved prefix).
123123
"028-caller-metadata-namespace-rejection",
124+
# v0.41.0 — proposal 0047 (§5.5.3.1 OA-namespace cache
125+
# attributes). Three fixtures cover cache-hit emission (040),
126+
# absence (041 — no prompt_tokens_details on the wire), and
127+
# reported-zero (042 — distinct from absent).
128+
"040-llm-cache-attribute-emission",
129+
"041-llm-cache-attribute-absence",
130+
"042-llm-cache-attribute-reported-zero",
124131
# v0.41.0 — proposal 0049 (typed LlmCompletionEvent variant on
125132
# the observer event union). Seven fixtures exercise dispatch
126133
# shape (050), type discrimination (051), opt-in caller
@@ -205,6 +212,12 @@ async def test_observability_fixture(fixture_path: Path) -> None:
205212
await _run_fixture_028(spec)
206213
elif fixture_id == "038-otel-parallel-branches-dispatch-span":
207214
await _run_fixture_038(spec)
215+
elif fixture_id in {
216+
"040-llm-cache-attribute-emission",
217+
"041-llm-cache-attribute-absence",
218+
"042-llm-cache-attribute-reported-zero",
219+
}:
220+
await _run_llm_cache_fixture(spec)
208221
elif fixture_id == "050-llm-completion-event-dispatch":
209222
await _run_fixture_050(spec)
210223
elif fixture_id == "051-llm-completion-event-type-discrimination":
@@ -2776,12 +2789,13 @@ def _build_simple_llm_graph(
27762789
case: Mapping[str, Any],
27772790
*,
27782791
populate_caller_metadata: bool,
2779-
) -> tuple[Any, type[Any]]:
2792+
) -> tuple[Any, type[Any], Any]:
27802793
"""Build a single-node graph that calls the LLM provider against a
27812794
mock transport. Matches the simple entry → ask → END pattern used
27822795
by fixtures 050, 051, 052, 053, 056. Returns ``(compiled_graph,
2783-
state_cls)`` so the caller can construct State instances without
2784-
re-deriving the class.
2796+
state_cls, provider)`` — the caller owns the provider's lifecycle
2797+
and MUST call ``await provider.aclose()`` after invoke completes
2798+
to release the underlying httpx.AsyncClient connection pool.
27852799
"""
27862800
import json
27872801

@@ -2837,7 +2851,7 @@ async def ask_body(_s: Any) -> dict[str, str]:
28372851
builder = (
28382852
GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name)
28392853
)
2840-
return builder.compile(), state_cls
2854+
return builder.compile(), state_cls, provider
28412855

28422856

28432857
def _make_state_instance(case: Mapping[str, Any], state_cls: type[Any]) -> Any:
@@ -3154,6 +3168,138 @@ async def __call__(self, event: Any) -> None:
31543168
self.events.append(event)
31553169

31563170

3171+
async def _run_llm_cache_fixture(spec: Mapping[str, Any]) -> None:
3172+
"""Run the proposal 0047 §5.5.3.1 cache-attribute fixtures (040,
3173+
041, 042). All three share the same simple-shape graph and assert
3174+
on ``Response.usage`` cache fields plus the LLM provider span's
3175+
``openarmature.llm.cache_read.input_tokens`` /
3176+
``openarmature.llm.cache_creation.input_tokens`` attribute set.
3177+
"""
3178+
cases = cast("list[dict[str, Any]]", spec["cases"])
3179+
for case in cases:
3180+
case_name = cast("str", case["name"])
3181+
try:
3182+
await _run_llm_cache_fixture_case(case)
3183+
except AssertionError as e:
3184+
raise AssertionError(f"case {case_name!r}: {e}") from e
3185+
3186+
3187+
async def _run_llm_cache_fixture_case(case: Mapping[str, Any]) -> None:
3188+
"""Build a simple LLM-calling graph, capture the response, and
3189+
assert on response_usage + llm_span_attributes /
3190+
llm_span_attributes_absent expectations.
3191+
"""
3192+
import json
3193+
3194+
import httpx
3195+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: PLC0415
3196+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: PLC0415
3197+
InMemorySpanExporter,
3198+
)
3199+
3200+
from openarmature.graph import END, GraphBuilder
3201+
from openarmature.llm import OpenAIProvider, UserMessage
3202+
from openarmature.llm.response import Response
3203+
from openarmature.observability.otel import OTelObserver
3204+
3205+
from .adapter import build_state_cls
3206+
3207+
mock_responses = list(cast("list[dict[str, Any]]", case.get("mock_llm") or []))
3208+
3209+
def _handler(_request: httpx.Request) -> httpx.Response:
3210+
if not mock_responses:
3211+
raise AssertionError("mock_llm queue exhausted")
3212+
spec_resp = mock_responses.pop(0)
3213+
body = cast("dict[str, Any]", spec_resp.get("body") or {})
3214+
return httpx.Response(
3215+
int(spec_resp.get("status", 200)),
3216+
content=json.dumps(body).encode("utf-8"),
3217+
headers={"Content-Type": "application/json"},
3218+
)
3219+
3220+
provider = OpenAIProvider(
3221+
base_url="http://mock-llm.test",
3222+
model=_mock_model_from_first_response(case) or "test-model",
3223+
api_key="test",
3224+
transport=httpx.MockTransport(_handler),
3225+
)
3226+
3227+
state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"])
3228+
state_cls = build_state_cls("LlmCacheFixtureState", state_fields)
3229+
3230+
nodes = cast("dict[str, Any]", case["nodes"])
3231+
entry_name = cast("str", case["entry"])
3232+
calls_llm_spec = cast("dict[str, Any]", nodes[entry_name]["calls_llm"])
3233+
stores_in = cast("str", calls_llm_spec.get("stores_response_in", "answer"))
3234+
messages_spec = cast("list[dict[str, str]]", calls_llm_spec.get("messages", []))
3235+
messages = [UserMessage(content=m["content"]) for m in messages_spec if m.get("role") == "user"]
3236+
3237+
captured_responses: list[Response] = []
3238+
3239+
async def ask_body(_s: Any) -> dict[str, str]:
3240+
response = await provider.complete(messages)
3241+
captured_responses.append(response)
3242+
return {stores_in: response.message.content or ""}
3243+
3244+
builder = (
3245+
GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name)
3246+
)
3247+
graph = builder.compile()
3248+
3249+
exporter = InMemorySpanExporter()
3250+
observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter))
3251+
graph.attach_observer(observer)
3252+
try:
3253+
await graph.invoke(state_cls())
3254+
finally:
3255+
await graph.drain()
3256+
observer.shutdown()
3257+
# OpenAIProvider owns an httpx.AsyncClient; closing it releases
3258+
# the connection pool. Matches the convention used by fixture
3259+
# 005 / 038 runners elsewhere in this file.
3260+
await provider.aclose()
3261+
3262+
expected = cast("dict[str, Any]", case["expected"])
3263+
3264+
# ---- Response.usage assertion
3265+
expected_usage = cast("dict[str, Any] | None", expected.get("response_usage"))
3266+
if expected_usage is not None:
3267+
# The cache-attribute fixtures (040/041/042) are single-LLM-call
3268+
# by shape — one ``ask`` node, one mocked response. A future
3269+
# fixture extending to multi-call would need this assertion to
3270+
# loop over captured_responses rather than indexing [0].
3271+
assert len(captured_responses) == 1, (
3272+
f"response_usage assertion expects exactly one LLM call; captured {len(captured_responses)}"
3273+
)
3274+
actual_usage = captured_responses[0].usage
3275+
for field_name, expected_value in expected_usage.items():
3276+
actual = getattr(actual_usage, field_name)
3277+
assert actual == expected_value, (
3278+
f"response_usage.{field_name}: expected {expected_value!r}, got {actual!r}"
3279+
)
3280+
3281+
# ---- LLM span attribute assertions
3282+
llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"]
3283+
assert len(llm_spans) == 1, f"expected exactly one LLM provider span; got {len(llm_spans)}"
3284+
llm_span_attrs = dict(llm_spans[0].attributes or {})
3285+
3286+
expected_attrs = cast("dict[str, Any] | None", expected.get("llm_span_attributes"))
3287+
if expected_attrs is not None:
3288+
for attr_name, expected_value in expected_attrs.items():
3289+
actual = llm_span_attrs.get(attr_name)
3290+
assert actual == expected_value, (
3291+
f"llm_span_attributes[{attr_name!r}]: expected {expected_value!r}, got {actual!r}"
3292+
)
3293+
3294+
absent_attrs = cast("list[str] | None", expected.get("llm_span_attributes_absent"))
3295+
if absent_attrs is not None:
3296+
for attr_name in absent_attrs:
3297+
assert attr_name not in llm_span_attrs, (
3298+
f"llm_span_attributes_absent: {attr_name!r} unexpectedly present "
3299+
f"with value {llm_span_attrs[attr_name]!r}"
3300+
)
3301+
3302+
31573303
async def _run_typed_event_fixture_case(
31583304
case: Mapping[str, Any],
31593305
*,
@@ -3171,36 +3317,44 @@ async def _run_typed_event_fixture_case(
31713317
the same surface.
31723318
"""
31733319
collectors, populate_caller_metadata = _parse_typed_observers(case)
3174-
graph, state_cls = _build_simple_llm_graph(case, populate_caller_metadata=populate_caller_metadata)
3175-
extra: _AllEventsCollector | None = None
3176-
if expect_failure and not any(c.filter_event_type is None for c in collectors.values()):
3177-
extra = _AllEventsCollector()
3178-
final, exc = await _invoke_typed_fixture(case, collectors, graph, state_cls, extra_observer=extra)
3179-
3180-
expected = cast("dict[str, Any]", case.get("expected") or {})
3181-
if expect_failure:
3182-
assert exc is not None, "failure-path fixture expected an exception"
3183-
node_completed = cast("dict[str, Any] | None", expected.get("node_completed_event_carries_error"))
3184-
if node_completed:
3185-
# Source for the assertion: an unfiltered named collector
3186-
# when present, otherwise the failure-path-only extra
3187-
# ``_AllEventsCollector``.
3188-
unfiltered_named = next((c for c in collectors.values() if c.filter_event_type is None), None)
3189-
source = (
3190-
unfiltered_named.events
3191-
if unfiltered_named is not None
3192-
else (extra.events if extra is not None else [])
3193-
)
3194-
_assert_node_completed_event_carries_error(source, node_completed)
3195-
else:
3196-
if final is None:
3197-
raise AssertionError("expected a non-None final state on success path")
3198-
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})
3199-
for name, expectations in observer_expectations.items():
3200-
collector = collectors.get(name)
3201-
if collector is None:
3202-
raise AssertionError(f"fixture references unknown observer {name!r}")
3203-
_assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations))
3320+
graph, state_cls, provider = _build_simple_llm_graph(
3321+
case, populate_caller_metadata=populate_caller_metadata
3322+
)
3323+
try:
3324+
extra: _AllEventsCollector | None = None
3325+
if expect_failure and not any(c.filter_event_type is None for c in collectors.values()):
3326+
extra = _AllEventsCollector()
3327+
final, exc = await _invoke_typed_fixture(case, collectors, graph, state_cls, extra_observer=extra)
3328+
3329+
expected = cast("dict[str, Any]", case.get("expected") or {})
3330+
if expect_failure:
3331+
assert exc is not None, "failure-path fixture expected an exception"
3332+
node_completed = cast("dict[str, Any] | None", expected.get("node_completed_event_carries_error"))
3333+
if node_completed:
3334+
# Source for the assertion: an unfiltered named collector
3335+
# when present, otherwise the failure-path-only extra
3336+
# ``_AllEventsCollector``.
3337+
unfiltered_named = next((c for c in collectors.values() if c.filter_event_type is None), None)
3338+
source = (
3339+
unfiltered_named.events
3340+
if unfiltered_named is not None
3341+
else (extra.events if extra is not None else [])
3342+
)
3343+
_assert_node_completed_event_carries_error(source, node_completed)
3344+
else:
3345+
if final is None:
3346+
raise AssertionError("expected a non-None final state on success path")
3347+
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})
3348+
for name, expectations in observer_expectations.items():
3349+
collector = collectors.get(name)
3350+
if collector is None:
3351+
raise AssertionError(f"fixture references unknown observer {name!r}")
3352+
_assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations))
3353+
finally:
3354+
# _build_simple_llm_graph hands ownership of the provider's
3355+
# httpx.AsyncClient to the runner; close it to release the
3356+
# connection pool.
3357+
await provider.aclose()
32043358

32053359

32063360
async def _run_fixture_050(spec: Mapping[str, Any]) -> None:
@@ -3375,6 +3529,8 @@ async def _ask_body(_s: Any) -> dict[str, str]:
33753529
for handle in handles:
33763530
handle.remove()
33773531
await outer_compiled.drain()
3532+
# Release the underlying httpx.AsyncClient connection pool.
3533+
await provider.aclose()
33783534

33793535
expected = cast("dict[str, Any]", case.get("expected") or {})
33803536
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})
@@ -3477,6 +3633,8 @@ async def _body(_s: Any, _msgs: Any = msgs, _stores: str = stores_in) -> dict[st
34773633
for handle in handles:
34783634
handle.remove()
34793635
await outer_compiled.drain()
3636+
# Release the underlying httpx.AsyncClient connection pool.
3637+
await provider.aclose()
34803638

34813639
expected = cast("dict[str, Any]", case.get("expected") or {})
34823640
observer_expectations = cast("dict[str, Any]", expected.get("observers") or {})

0 commit comments

Comments
 (0)