diff --git a/CHANGELOG.md b/CHANGELOG.md index c7043dd..0a14405 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,15 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ## [Unreleased] +### Changed + +- **OTel observer drives the `openarmature.llm.complete` span lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the span in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so the span duration reflects the actual adapter-boundary measurement rather than dispatcher queue delay. Failure-path spans continue to fire from the sentinel `NodeEvent` pair (the typed event is success-only per the proposal). The §5.5 attribute set is unchanged. Dual-emit window: the provider still emits both the sentinel pair AND the typed event during v0.13.0; the sentinel pair drops in v0.15.0. +- **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips. + +### Added + +- **`LlmCompletionEvent` extended with proposal 0057 request-side fields** (spec v0.51.0). The typed event now carries `input_messages`, `output_content`, `request_params`, `request_extras`, `active_prompt`, `active_prompt_group`, `call_id`, and `response_model` alongside the existing v0.49.0 fields. `request_id` renamed to `response_id` per the proposal's response-side naming. Inline image bytes in `input_messages` stay redacted per observability §5.5.5 — the OpenAI provider reuses the existing message-serialization helper for the projection. Observer-side privacy gates (OTel `disable_llm_payload`, Langfuse equivalents) apply at rendering, symmetric with the §5.5.1 span attribute path. + ## [0.12.0] — 2026-06-05 Observability release. The pinned spec advances from v0.38.0 to v0.46.0, absorbing eight accepted proposals (0047-0054). Three ship as fully implemented this cycle: proposal 0048 grows a read-symmetric `get_invocation_metadata()` API + a §9 *Queryable observer pattern* concept doc section; proposal 0052 puts `openarmature.implementation.name` + `.version` attribution attributes on every OTel invocation span + every Langfuse Trace; proposal 0054 ships `CompiledGraph.drain_events_for(invocation_id, *, timeout)` as the architectural pair to 0048's §9.4 accumulator lifecycle. Two ship as textual-only acks (0051 Langfuse trace I/O caveat; 0053 §3.4 shared-parent boundary clarification). One Fixed: the retry middleware now resets the invocation-metadata ContextVar between attempts per §3.4. The production-observability example grows the queryable accumulator + drain_events_for pattern end-to-end so the new APIs have a runnable demo. diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index ccb9176..fa97273 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -539,9 +539,13 @@ class LlmCompletionEvent: lifetime, unique within the run. Distinct from ``response_id``. - ``caller_invocation_metadata``: optional snapshot of caller- - supplied invocation metadata at LLM-call time. Populated - only when the provider's opt-in flag is set (per-language - mechanism); default ``None``. + supplied invocation metadata at LLM-call time. Spec-defined as + OPTIONAL; the python OpenAIProvider populates it by default so + the bundled OTel/Langfuse observers can emit the §5.6 + ``openarmature.user.`` span-attribute family without an + extra opt-in. Pass ``populate_caller_metadata=False`` to suppress + the snapshot. Future non-OpenAI providers MAY default to + ``None``. """ invocation_id: str diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 7959d6c..7a6a054 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -160,7 +160,7 @@ def __init__( force_prompt_augmentation_fallback: bool = False, genai_system: str = "openai", readiness_probe: Literal["models", "chat_completions", "both"] = "chat_completions", - populate_caller_metadata: bool = False, + populate_caller_metadata: bool = True, ) -> None: self.base_url = _validate_and_normalize_base_url(base_url) self.model = model @@ -194,12 +194,13 @@ def __init__( ) self._readiness_probe = readiness_probe # Proposal 0049's caller_invocation_metadata field is OPTIONAL - # on the typed LlmCompletionEvent: default absent, populated - # only when the consumer opts in. The per-language opt-in - # mechanism is constructor-knob here so the provider can decide - # at emission time without engine-level observer introspection. - # Off by default to avoid bloating every event with potentially- - # large metadata snapshots when nothing downstream consumes them. + # on the typed LlmCompletionEvent. The python implementation + # defaults the opt-in to True because the bundled OTel and + # Langfuse observers read the field to populate caller-metadata + # span attributes (§5.6); leaving it off by default would + # silently strip those attributes after the typed-event + # migration. Pass ``populate_caller_metadata=False`` to suppress + # the snapshot when no downstream consumer needs it. self._populate_caller_metadata = populate_caller_metadata self._headers: dict[str, str] = {"Content-Type": "application/json"} if api_key is not None: diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index b654de0..18bdb63 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -75,6 +75,7 @@ from __future__ import annotations import json +import time from collections.abc import Callable, Mapping, Sequence from dataclasses import dataclass, field from typing import Any, cast @@ -316,7 +317,6 @@ class _InvState: the same correlation_id) don't collide.""" open_spans: dict[_StackKey, _OpenSpan] = field(default_factory=dict[_StackKey, _OpenSpan]) - open_llm_spans: dict[str, _OpenSpan] = field(default_factory=dict[str, _OpenSpan]) subgraph_spans: dict[tuple[str, ...], _OpenSpan] = field(default_factory=dict[tuple[str, ...], _OpenSpan]) detached_roots: dict[tuple[str, ...], _OpenSpan] = field(default_factory=dict[tuple[str, ...], _OpenSpan]) fan_out_instance_root_prefixes: set[tuple[str, ...]] = field(default_factory=set[tuple[str, ...]]) @@ -408,9 +408,10 @@ class OTelObserver: construction time below that). - ``attribute_enrichers``: optional sequence of callables run just before the observer ends each span. Each receives the live - :class:`Span` plus the :class:`NodeEvent` that triggered the - close (or ``None`` on synthetic close sites). Exceptions are - caught and warned; never propagated. + :class:`Span` plus the :class:`NodeEvent` or + :class:`LlmCompletionEvent` that triggered the close (or + ``None`` on synthetic close sites). Exceptions are caught and + warned; never propagated. - ``spec_version``: string surfaced as ``openarmature.graph.spec_version`` on the invocation span. - ``implementation_name``: string surfaced as @@ -459,7 +460,7 @@ class OTelObserver: # span.end() the observer issues. NodeEvent is None on synthetic # close sites (subgraph dispatch, detached root, fan-out instance, # invocation span, shutdown drain). - attribute_enrichers: Sequence[Callable[[Span, NodeEvent | None], None]] = () + attribute_enrichers: Sequence[Callable[[Span, NodeEvent | LlmCompletionEvent | None], None]] = () # Read from the package's ``__spec_version__`` (one of the three # places the spec version is pinned per CLAUDE.md). Bumping the # spec submodule + the two version fields automatically updates @@ -530,7 +531,7 @@ def __post_init__(self) -> None: # warned, never propagated to the dispatch worker. # ``event`` is None on synthetic close sites (subgraph dispatch, # detached root, fan-out instance, invocation span, orphan drain). - def _run_enrichers(self, span: Span, event: NodeEvent | None) -> None: + def _run_enrichers(self, span: Span, event: NodeEvent | LlmCompletionEvent | None) -> None: """Invoke configured enrichers against ``span`` before ``span.end()`` is called.""" if not self.attribute_enrichers: @@ -583,23 +584,25 @@ async def __call__( # before any node-specific logic runs. if isinstance(event, InvocationStartedEvent | InvocationCompletedEvent): return - # Proposal 0049 typed LlmCompletionEvent: ignored during the - # dual-emit window — the OTel mapping continues to drive its - # §5.5 LLM span lifecycle off the sentinel NodeEvent pair the - # provider emits alongside the typed event. Migration to type - # discrimination lands in a subsequent PR; this early-return - # keeps the observer Protocol-compatible without changing - # behavior. + # Proposal 0049 typed LlmCompletionEvent (success path). + # Drives the openarmature.llm.complete span lifecycle for + # successful provider calls. Failures don't emit this variant; + # they flow through the sentinel-pair error path below. if isinstance(event, LlmCompletionEvent): + if not self.disable_llm_spans: + self._handle_typed_llm_completion(event) return if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return - # LLM provider events use a sentinel namespace so we can route - # them to the dedicated §5.5 span path. + # LLM provider sentinel events: success-path is a no-op (the + # typed event handler above owns the span). Failure-path + # completed events open + close an error span; sentinel-started + # and successful-completed are no-ops here. Dual-emit window + # closes in v0.15.0 per the CHANGELOG note pinned to v0.13.0. if event.namespace == _LLM_NAMESPACE: if not self.disable_llm_spans: - self._handle_llm_event(event) + self._handle_llm_error_event(event) return if event.phase == "checkpoint_saved": self._emit_checkpoint_save_span(event) @@ -1073,14 +1076,22 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: span.end() # LLM provider span per observability §5.5 — parented to the - # calling node's span via the calling-node identity carried on - # the LlmEventPayload (namespace_prefix + attempt_index + - # fan_out_index). Lookup hits the per-invocation_id open_spans - # so concurrent fan-out instances each find their own calling - # node, not a sibling's. + # calling node's span via the calling-node identity carried on the + # event (namespace + attempt_index + fan_out_index + branch_name). + # Lookup hits the per-invocation_id open_spans so concurrent fan-out + # instances each find their own calling node, not a sibling's. + # + # v0.13.0 (proposal 0049 + 0057): the success-path span lifecycle is + # driven by the typed LlmCompletionEvent — opened and closed in one + # shot at typed-event arrival, with start_time back-dated by + # latency_ms so the span duration matches the adapter-boundary + # measurement. The error-path span is still driven by the sentinel + # NodeEvent pair the provider emits (LlmCompletionEvent is success- + # only per proposal 0049 §3 alternative 3). Dual-emit window closes + # in v0.15.0 — the sentinel pair will then drop entirely. # - # v0.17.0 attribute set (proposal 0024): - # - Baseline openarmature.llm.* attributes (preserved) + # v0.17.0 attribute set (proposal 0024) preserved unchanged: + # - Baseline openarmature.llm.* attributes # - §5.5.1 payload (input.messages, output.content, # request.extras) gated by disable_llm_payload # - §5.5.2 gen_ai.request.* request params @@ -1088,184 +1099,256 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: # - §5.5.4 opt-out flags # - §5.5.5 truncation contract on payload attributes # - # Prompt-identity attributes come from the LlmEventPayload - # active_prompt / active_prompt_group snapshots taken at dispatch - # time — NOT the ContextVar. The dispatch worker's task-local - # Context doesn't see node-body ContextVar writes. - def _handle_llm_event(self, event: NodeEvent) -> None: - """Build and close the ``openarmature.llm.complete`` span for an - LLM provider event pair.""" + # Prompt-identity attributes come from the active_prompt / + # active_prompt_group snapshots taken at dispatch time — NOT the + # ContextVar. The dispatch worker's task-local Context doesn't see + # node-body ContextVar writes. + def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: + """Open + close the ``openarmature.llm.complete`` span from the + typed LlmCompletionEvent (success path).""" + # Mid-call metadata augmentation can't reach this span: the + # typed event arrives only after complete() returns, and the + # span is back-dated past any augmentation event that fired + # while the call was in flight. Since complete() is awaited, + # node bodies can't actually run augmentation mid-call, so + # this is theoretical only — but it does mean the snapshot + # on the event is what the span reflects, not a later view. from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, ) - if not isinstance(event.pre_state, LlmEventPayload): - # Defensive — callers other than the OpenAIProvider hook - # shouldn't dispatch through the LLM_NAMESPACE sentinel. - return invocation_id = current_invocation_id() if invocation_id is None: return inv_state = self._inv_state_for(invocation_id) - payload = event.pre_state - if event.phase == "started": - parent_ctx = self._resolve_llm_parent(inv_state, invocation_id, payload) - attrs: dict[str, Any] = {"openarmature.llm.model": payload.model} - cid = current_correlation_id() - if cid is not None: - attrs["openarmature.correlation_id"] = cid - _apply_caller_metadata(attrs, payload.caller_invocation_metadata) - # Prompt-identity attributes: sourced from the dispatch- - # time snapshot on the payload. Reading the ContextVar - # here would return None because the dispatch worker - # task's Context was snapshotted at ``invoke()`` entry, - # before any node body opened a ``with_active_prompt`` - # block. - active_prompt = payload.active_prompt - if active_prompt is not None: - attrs["openarmature.prompt.name"] = active_prompt.name - attrs["openarmature.prompt.version"] = active_prompt.version - attrs["openarmature.prompt.label"] = active_prompt.label - attrs["openarmature.prompt.template_hash"] = active_prompt.template_hash - attrs["openarmature.prompt.rendered_hash"] = active_prompt.rendered_hash - active_group = payload.active_prompt_group - if active_group is not None: - attrs["openarmature.prompt.group_name"] = active_group.group_name - # §5.5.2 + §5.5.3 GenAI semconv attributes (gated by - # ``disable_genai_semconv``). Emit gen_ai.system, - # gen_ai.request.model (mirrors openarmature.llm.model), - # and per-set gen_ai.request.* params (only fields the - # caller supplied — absence is meaningful). - if not self.disable_genai_semconv: - attrs["gen_ai.system"] = payload.genai_system - attrs["gen_ai.request.model"] = payload.model - request_params = payload.request_params or {} - if "temperature" in request_params: - attrs["gen_ai.request.temperature"] = request_params["temperature"] - if "max_tokens" in request_params: - attrs["gen_ai.request.max_tokens"] = request_params["max_tokens"] - if "top_p" in request_params: - attrs["gen_ai.request.top_p"] = request_params["top_p"] - if "seed" in request_params: - attrs["gen_ai.request.seed"] = request_params["seed"] - # Three new request-param attrs from proposal 0032 - # (spec v0.24.0). The §8.4.3 Langfuse mapping picks - # these up by inclusion via the gen_ai.request.* → - # generation.modelParameters. rule with no - # §8 edit. - if "frequency_penalty" in request_params: - attrs["gen_ai.request.frequency_penalty"] = request_params["frequency_penalty"] - if "presence_penalty" in request_params: - attrs["gen_ai.request.presence_penalty"] = request_params["presence_penalty"] - if "stop_sequences" in request_params: - attrs["gen_ai.request.stop_sequences"] = request_params["stop_sequences"] - # §5.5.1 payload attributes (gated by ``disable_llm_payload``). - # ``input.messages`` and ``request.extras`` go on the started - # span; ``output.content`` lands on the completed branch. - if not self.disable_llm_payload: - if payload.input_messages: - serialized = _serialize_for_attribute(payload.input_messages) - attrs["openarmature.llm.input.messages"] = _truncate_for_attribute( - serialized, self.payload_max_bytes - ) - if payload.request_extras: - serialized_extras = _serialize_for_attribute(payload.request_extras) - attrs["openarmature.llm.request.extras"] = _truncate_for_attribute( - serialized_extras, self.payload_max_bytes - ) - span = self._tracer.start_span( - name="openarmature.llm.complete", - context=cast("Any", parent_ctx), - kind=SpanKind.CLIENT, - attributes=attrs, - ) - inv_state.open_llm_spans[payload.call_id] = _OpenSpan( - span=span, - fan_out_index_chain=event.fan_out_index_chain, - branch_name_chain=event.branch_name_chain, - ) - elif event.phase == "completed": - open_span = inv_state.open_llm_spans.pop(payload.call_id, None) - if open_span is None: - return - span = open_span.span - # Baseline §5.5 attributes (preserved from v0.7.0). - if payload.finish_reason is not None: - span.set_attribute("openarmature.llm.finish_reason", payload.finish_reason) - if payload.prompt_tokens is not None: - span.set_attribute("openarmature.llm.usage.prompt_tokens", payload.prompt_tokens) - if payload.completion_tokens is not None: - span.set_attribute("openarmature.llm.usage.completion_tokens", payload.completion_tokens) - if payload.total_tokens is not None: - span.set_attribute("openarmature.llm.usage.total_tokens", payload.total_tokens) - # Spec proposal 0047 §5.5.3.1: OA-namespace cache attributes. - # Conditional emission per the §5.5.3 convention — the - # absent-vs-zero distinction is preserved: absent (None) - # means the provider did not report cache stats; 0 means - # the provider reported zero hits. OA-namespace per the - # stable-only upstream adoption policy because the upstream - # OTel GenAI cache attributes are at Development status. - if payload.cached_tokens is not None: - span.set_attribute("openarmature.llm.cache_read.input_tokens", payload.cached_tokens) - if payload.cache_creation_tokens is not None: + # Back-date start_time using latency_ms so the span's duration + # reflects the actual adapter-boundary measurement rather than + # dispatcher queue delay. When latency is missing, fall back to + # a zero-duration span at end_time. + end_time_ns = time.time_ns() + if event.latency_ms is not None: + start_time_ns = end_time_ns - int(event.latency_ms * 1_000_000) + else: + start_time_ns = end_time_ns + parent_ctx = self._resolve_llm_parent( + inv_state, + invocation_id, + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, + ) + attrs: dict[str, Any] = {"openarmature.llm.model": event.model} + cid = current_correlation_id() + if cid is not None: + attrs["openarmature.correlation_id"] = cid + # Asymmetric guard with _handle_llm_error_event below: the typed + # event types ``caller_invocation_metadata`` as ``Mapping | None`` + # while LlmEventPayload defaults to an empty mapping (never None). + # Don't "normalize" the two paths without also normalizing the + # source types. + if event.caller_invocation_metadata is not None: + _apply_caller_metadata(attrs, event.caller_invocation_metadata) + active_prompt = event.active_prompt + if active_prompt is not None: + attrs["openarmature.prompt.name"] = active_prompt.name + attrs["openarmature.prompt.version"] = active_prompt.version + attrs["openarmature.prompt.label"] = active_prompt.label + attrs["openarmature.prompt.template_hash"] = active_prompt.template_hash + attrs["openarmature.prompt.rendered_hash"] = active_prompt.rendered_hash + active_group = event.active_prompt_group + if active_group is not None: + attrs["openarmature.prompt.group_name"] = active_group.group_name + if not self.disable_genai_semconv: + attrs["gen_ai.system"] = event.provider + attrs["gen_ai.request.model"] = event.model + request_params = event.request_params or {} + if "temperature" in request_params: + attrs["gen_ai.request.temperature"] = request_params["temperature"] + if "max_tokens" in request_params: + attrs["gen_ai.request.max_tokens"] = request_params["max_tokens"] + if "top_p" in request_params: + attrs["gen_ai.request.top_p"] = request_params["top_p"] + if "seed" in request_params: + attrs["gen_ai.request.seed"] = request_params["seed"] + if "frequency_penalty" in request_params: + attrs["gen_ai.request.frequency_penalty"] = request_params["frequency_penalty"] + if "presence_penalty" in request_params: + attrs["gen_ai.request.presence_penalty"] = request_params["presence_penalty"] + if "stop_sequences" in request_params: + attrs["gen_ai.request.stop_sequences"] = request_params["stop_sequences"] + if not self.disable_llm_payload: + if event.input_messages: + serialized = _serialize_for_attribute(event.input_messages) + attrs["openarmature.llm.input.messages"] = _truncate_for_attribute( + serialized, self.payload_max_bytes + ) + if event.request_extras: + serialized_extras = _serialize_for_attribute(event.request_extras) + attrs["openarmature.llm.request.extras"] = _truncate_for_attribute( + serialized_extras, self.payload_max_bytes + ) + span = self._tracer.start_span( + name="openarmature.llm.complete", + context=cast("Any", parent_ctx), + kind=SpanKind.CLIENT, + attributes=attrs, + start_time=start_time_ns, + ) + usage = event.usage + if event.finish_reason is not None: + span.set_attribute("openarmature.llm.finish_reason", event.finish_reason) + if usage is not None: + if usage.prompt_tokens is not None: + span.set_attribute("openarmature.llm.usage.prompt_tokens", usage.prompt_tokens) + if usage.completion_tokens is not None: + span.set_attribute("openarmature.llm.usage.completion_tokens", usage.completion_tokens) + if usage.total_tokens is not None: + span.set_attribute("openarmature.llm.usage.total_tokens", usage.total_tokens) + # Proposal 0047 §5.5.3.1 cache attributes. Absent (None) + # means the provider didn't report; 0 is "reported miss" + # and distinct from absent. + if usage.cached_tokens is not None: + span.set_attribute("openarmature.llm.cache_read.input_tokens", usage.cached_tokens) + if usage.cache_creation_tokens is not None: span.set_attribute( "openarmature.llm.cache_creation.input_tokens", - payload.cache_creation_tokens, + usage.cache_creation_tokens, ) - # §5.5.3 GenAI semconv response attributes (gated by - # ``disable_genai_semconv``). Tokens mirror the baseline - # OA-prefixed usage attributes; finish_reasons wraps the - # scalar in a single-element array per semconv; - # response.{id,model} emit only when the provider - # returned non-null values. - if not self.disable_genai_semconv: - if payload.prompt_tokens is not None: - span.set_attribute("gen_ai.usage.input_tokens", payload.prompt_tokens) - if payload.completion_tokens is not None: - span.set_attribute("gen_ai.usage.output_tokens", payload.completion_tokens) - if payload.finish_reason is not None: - span.set_attribute("gen_ai.response.finish_reasons", [payload.finish_reason]) - if payload.response_id is not None: - span.set_attribute("gen_ai.response.id", payload.response_id) - if payload.response_model is not None: - span.set_attribute("gen_ai.response.model", payload.response_model) - # §5.5.1 output payload. Assistant messages with empty - # content (tool-call-only responses) MUST NOT emit this - # attribute per spec — ``output_content`` on the payload - # is already None in that case (see provider.py). - if not self.disable_llm_payload and payload.output_content: - attrs_out = _truncate_for_attribute(payload.output_content, self.payload_max_bytes) - span.set_attribute("openarmature.llm.output.content", attrs_out) - if payload.error_type is not None: - span.set_status( - Status( - StatusCode.ERROR, - description=payload.error_category or payload.error_type, - ) + if not self.disable_genai_semconv: + if usage is not None: + if usage.prompt_tokens is not None: + span.set_attribute("gen_ai.usage.input_tokens", usage.prompt_tokens) + if usage.completion_tokens is not None: + span.set_attribute("gen_ai.usage.output_tokens", usage.completion_tokens) + if event.finish_reason is not None: + span.set_attribute("gen_ai.response.finish_reasons", [event.finish_reason]) + if event.response_id is not None: + span.set_attribute("gen_ai.response.id", event.response_id) + if event.response_model is not None: + span.set_attribute("gen_ai.response.model", event.response_model) + # §5.5.1 output payload. Assistant messages with empty content + # (tool-call-only responses) MUST NOT emit this attribute per + # spec — ``output_content`` is already None in that case (see + # provider.py). + if not self.disable_llm_payload and event.output_content: + attrs_out = _truncate_for_attribute(event.output_content, self.payload_max_bytes) + span.set_attribute("openarmature.llm.output.content", attrs_out) + span.set_status(Status(StatusCode.OK)) + self._run_enrichers(span, event) + span.end(end_time=end_time_ns) + + def _handle_llm_error_event(self, event: NodeEvent) -> None: + """Emit the error-path ``openarmature.llm.complete`` span from + the sentinel NodeEvent. Success-path completed events and the + started event are no-ops here — the typed LlmCompletionEvent + handler owns the success-path span.""" + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) + + if event.phase != "completed": + return + if not isinstance(event.pre_state, LlmEventPayload): + return + payload = event.pre_state + if payload.error_type is None: + # Success-path completed: typed-event handler owns the span. + return + invocation_id = current_invocation_id() + if invocation_id is None: + return + inv_state = self._inv_state_for(invocation_id) + parent_ctx = self._resolve_llm_parent( + inv_state, + invocation_id, + calling_namespace_prefix=payload.calling_namespace_prefix, + calling_attempt_index=payload.calling_attempt_index, + calling_fan_out_index=payload.calling_fan_out_index, + calling_branch_name=payload.calling_branch_name, + ) + attrs: dict[str, Any] = {"openarmature.llm.model": payload.model} + cid = current_correlation_id() + if cid is not None: + attrs["openarmature.correlation_id"] = cid + _apply_caller_metadata(attrs, payload.caller_invocation_metadata) + active_prompt = payload.active_prompt + if active_prompt is not None: + attrs["openarmature.prompt.name"] = active_prompt.name + attrs["openarmature.prompt.version"] = active_prompt.version + attrs["openarmature.prompt.label"] = active_prompt.label + attrs["openarmature.prompt.template_hash"] = active_prompt.template_hash + attrs["openarmature.prompt.rendered_hash"] = active_prompt.rendered_hash + active_group = payload.active_prompt_group + if active_group is not None: + attrs["openarmature.prompt.group_name"] = active_group.group_name + if not self.disable_genai_semconv: + attrs["gen_ai.system"] = payload.genai_system + attrs["gen_ai.request.model"] = payload.model + request_params = payload.request_params or {} + if "temperature" in request_params: + attrs["gen_ai.request.temperature"] = request_params["temperature"] + if "max_tokens" in request_params: + attrs["gen_ai.request.max_tokens"] = request_params["max_tokens"] + if "top_p" in request_params: + attrs["gen_ai.request.top_p"] = request_params["top_p"] + if "seed" in request_params: + attrs["gen_ai.request.seed"] = request_params["seed"] + if "frequency_penalty" in request_params: + attrs["gen_ai.request.frequency_penalty"] = request_params["frequency_penalty"] + if "presence_penalty" in request_params: + attrs["gen_ai.request.presence_penalty"] = request_params["presence_penalty"] + if "stop_sequences" in request_params: + attrs["gen_ai.request.stop_sequences"] = request_params["stop_sequences"] + if not self.disable_llm_payload: + if payload.input_messages: + serialized = _serialize_for_attribute(payload.input_messages) + attrs["openarmature.llm.input.messages"] = _truncate_for_attribute( + serialized, self.payload_max_bytes ) - if payload.error_category is not None: - span.set_attribute("openarmature.error.category", payload.error_category) - else: - span.set_status(Status(StatusCode.OK)) - self._run_enrichers(span, event) - span.end() + if payload.request_extras: + serialized_extras = _serialize_for_attribute(payload.request_extras) + attrs["openarmature.llm.request.extras"] = _truncate_for_attribute( + serialized_extras, self.payload_max_bytes + ) + span = self._tracer.start_span( + name="openarmature.llm.complete", + context=cast("Any", parent_ctx), + kind=SpanKind.CLIENT, + attributes=attrs, + ) + span.set_status( + Status( + StatusCode.ERROR, + description=payload.error_category or payload.error_type, + ) + ) + if payload.error_category is not None: + span.set_attribute("openarmature.error.category", payload.error_category) + self._run_enrichers(span, event) + span.end() def _resolve_llm_parent( self, inv_state: _InvState, invocation_id: str, - payload: Any, + *, + calling_namespace_prefix: tuple[str, ...], + calling_attempt_index: int, + calling_fan_out_index: int | None, + calling_branch_name: str | None, ) -> object: """Look up the calling node's span using the calling-node - identity carried on the LLM event payload, fall back through - subgraph dispatch / invocation span.""" + identity, fall back through subgraph dispatch / invocation + span.""" # 1. Direct match on the calling node's ``_StackKey``. calling_key: _StackKey = ( - payload.calling_namespace_prefix, - payload.calling_attempt_index, - payload.calling_fan_out_index, - payload.calling_branch_name, + calling_namespace_prefix, + calling_attempt_index, + calling_fan_out_index, + calling_branch_name, ) calling = inv_state.open_spans.get(calling_key) if calling is not None: @@ -1273,9 +1356,8 @@ def _resolve_llm_parent( # 2. Walk up the calling namespace prefix for a synthetic # subgraph dispatch span at any ancestor — covers LLM # calls from inside subgraph wrapper middleware. - prefix = payload.calling_namespace_prefix - for plen in range(len(prefix), 0, -1): - ancestor = prefix[:plen] + for plen in range(len(calling_namespace_prefix), 0, -1): + ancestor = calling_namespace_prefix[:plen] sg = inv_state.subgraph_spans.get(ancestor) if sg is not None: return set_span_in_context(sg.span) @@ -1985,15 +2067,13 @@ def close_invocation(self, invocation_id: str) -> None: def _drain_inv_state(self, inv_state: _InvState) -> None: """Close any still-open spans in a per-invocation state - container in child→parent order. LLM spans (deepest leaves) - → leaf node spans (sorted deepest-first by namespace) → - non-detached fan-out per-instance dispatch spans → detached - roots → subgraph dispatch spans. Matches the ordering used in - ``shutdown``.""" - for call_id in list(inv_state.open_llm_spans.keys()): - open_span = inv_state.open_llm_spans.pop(call_id, None) - if open_span is not None: - self._drain_open_span(open_span) + container in child→parent order. Leaf node spans (sorted + deepest-first by namespace) → non-detached fan-out per-instance + dispatch spans → detached roots → subgraph dispatch spans. + Matches the ordering used in ``shutdown``. LLM spans don't + appear here — both the success and error paths open + close + the span in one shot at handler-time, so there are no in-flight + LLM spans to drain.""" # Inner-node spans (depth >= 2) drain first — these include # the inner-node bodies inside fan-out instances, which are # children of the per-instance dispatch spans. diff --git a/tests/_helpers/__init__.py b/tests/_helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/_helpers/typed_event.py b/tests/_helpers/typed_event.py new file mode 100644 index 0000000..ad420cf --- /dev/null +++ b/tests/_helpers/typed_event.py @@ -0,0 +1,44 @@ +"""Shared test helper for constructing ``LlmCompletionEvent`` instances. + +Replaces 20+ kwargs of boilerplate at each call site with a one-liner +plus the overrides relevant to the test. Used by unit tests against +the OTel and Langfuse observers; conformance-harness unit tests have +their own variant with conformance-specific defaults. +""" + +from __future__ import annotations + +from typing import Any + +from openarmature.graph.events import LlmCompletionEvent + + +def make_typed_event(**overrides: Any) -> LlmCompletionEvent: + """Build a ``LlmCompletionEvent`` with neutral defaults; ``overrides`` + swap individual fields for the test case.""" + base: dict[str, Any] = { + "invocation_id": "inv-1", + "correlation_id": None, + "node_name": "ask", + "namespace": ("ask",), + "attempt_index": 0, + "fan_out_index": None, + "branch_name": None, + "provider": "openai", + "model": "test-m", + "response_id": None, + "response_model": None, + "usage": None, + "latency_ms": 10.0, + "finish_reason": "stop", + "input_messages": [], + "output_content": None, + "request_params": {}, + "request_extras": {}, + "active_prompt": None, + "active_prompt_group": None, + "call_id": "cc-1", + "caller_invocation_metadata": None, + } + base.update(overrides) + return LlmCompletionEvent(**base) diff --git a/tests/unit/test_llm_provider.py b/tests/unit/test_llm_provider.py index bd6f2ed..a604442 100644 --- a/tests/unit/test_llm_provider.py +++ b/tests/unit/test_llm_provider.py @@ -1622,27 +1622,31 @@ async def test_llm_completion_event_input_messages_redacts_inline_image_bytes() assert "byte_count" in serialized -async def test_caller_invocation_metadata_off_by_default() -> None: - # Per proposal 0049's OPT-IN contract: default absent / None. +async def test_caller_invocation_metadata_populated_by_default() -> None: + # Python default flips proposal 0049's spec-recommended off-by-default + # so the bundled OTel/Langfuse observers can emit caller-metadata + # span attributes (§5.6) without callers having to opt in. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} ) provider = OpenAIProvider(base_url="http://test", model="m", api_key="k", transport=transport) try: + set_invocation_metadata(user_id="u-123") await provider.complete([UserMessage(content="hi")]) finally: await provider.aclose() _release_dispatch(token) typed = next(e for e in events if isinstance(e, LlmCompletionEvent)) - assert typed.caller_invocation_metadata is None + assert typed.caller_invocation_metadata is not None + assert typed.caller_invocation_metadata.get("user_id") == "u-123" -async def test_caller_invocation_metadata_populated_when_opted_in() -> None: - # Per proposal 0049 Q2 ack: opt-in via provider constructor knob. - # When True, the typed event carries a snapshot of the metadata - # mapping at LLM-call time. +async def test_caller_invocation_metadata_omitted_when_opted_out() -> None: + # The spec's OPTIONAL contract on caller_invocation_metadata is + # still honored: pass ``populate_caller_metadata=False`` to suppress + # the snapshot. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} @@ -1652,7 +1656,7 @@ async def test_caller_invocation_metadata_populated_when_opted_in() -> None: model="m", api_key="k", transport=transport, - populate_caller_metadata=True, + populate_caller_metadata=False, ) try: set_invocation_metadata(user_id="u-123") @@ -1662,8 +1666,7 @@ async def test_caller_invocation_metadata_populated_when_opted_in() -> None: _release_dispatch(token) typed = next(e for e in events if isinstance(e, LlmCompletionEvent)) - assert typed.caller_invocation_metadata is not None - assert typed.caller_invocation_metadata.get("user_id") == "u-123" + assert typed.caller_invocation_metadata is None async def test_llm_completion_event_request_id_none_when_response_omits_id() -> None: diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index bb4c2d4..cc4c556 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -499,9 +499,7 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: ``with_active_prompt_group`` adds ``openarmature.prompt.group_name``.""" from datetime import UTC, datetime - from openarmature.graph.events import NodeEvent from openarmature.llm.messages import UserMessage - from openarmature.llm.providers.openai import LlmEventPayload from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, @@ -511,6 +509,7 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: PromptResult, TextPrompt, ) + from tests._helpers.typed_event import make_typed_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) @@ -541,46 +540,11 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: try: # Proposal 0024 / friction-roundup #3: the provider captures # ``current_prompt_result()`` and ``current_prompt_group()`` - # at dispatch time and puts them on the LLM event payload. - # The observer reads from the payload, NOT from the live + # at dispatch time and puts them on the LlmCompletionEvent. + # The observer reads from the typed event, NOT from the live # ContextVar — that ContextVar is unreachable from the - # dispatch worker's task-local Context. This test verifies - # the observer correctly surfaces prompt attributes when the - # payload carries them; the cross-task regression case is - # covered separately by an end-to-end test. - started = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="started", - pre_state=LlmEventPayload( - call_id="test-call-prompt", - model="test-m", - active_prompt=result, - active_prompt_group=group, - ), - post_state=None, - error=None, - parent_states=(), - ) - completed = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="completed", - pre_state=LlmEventPayload( - call_id="test-call-prompt", - model="test-m", - finish_reason="stop", - active_prompt=result, - active_prompt_group=group, - ), - post_state=None, - error=None, - parent_states=(), - ) - await observer(started) - await observer(completed) + # dispatch worker's task-local Context. + await observer(make_typed_event(active_prompt=result, active_prompt_group=group)) finally: _reset_invocation_id(token) @@ -599,40 +563,18 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None: """Without ``with_active_prompt``, the LLM-call span MUST NOT carry ``openarmature.prompt.*`` attributes.""" - from openarmature.graph.events import NodeEvent from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, ) - from openarmature.observability.llm_event import LlmEventPayload + from tests._helpers.typed_event import make_typed_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-2") try: - started = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="started", - pre_state=LlmEventPayload(call_id="test-call-noprompt", model="test-m"), - post_state=None, - error=None, - parent_states=(), - ) - completed = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="completed", - pre_state=LlmEventPayload(call_id="test-call-noprompt", model="test-m", finish_reason="stop"), - post_state=None, - error=None, - parent_states=(), - ) - await observer(started) - await observer(completed) + await observer(make_typed_event()) finally: _reset_invocation_id(token) observer.shutdown() @@ -648,52 +590,32 @@ async def _drive_llm_span_with_cached_tokens( cached_tokens: int | None, cache_creation_tokens: int | None = None, ) -> dict[str, Any]: - """Drive the OTel observer through a sentinel started/completed - NodeEvent pair with the supplied cache-stat fields on the - completed-phase payload. Returns the LLM-span's attribute map. + """Drive the OTel observer through a typed LlmCompletionEvent + carrying the supplied cache-stat fields on the event's Usage + record. Returns the LLM-span's attribute map. """ - from openarmature.graph.events import NodeEvent + from openarmature.llm.response import Usage from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, ) - from openarmature.observability.llm_event import LlmEventPayload + from tests._helpers.typed_event import make_typed_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-cache") try: - started = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="started", - pre_state=LlmEventPayload(call_id="cc-cache", model="test-m"), - post_state=None, - error=None, - parent_states=(), - ) - completed = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="completed", - pre_state=LlmEventPayload( - call_id="cc-cache", - model="test-m", - finish_reason="stop", - prompt_tokens=100, - completion_tokens=5, - total_tokens=105, - cached_tokens=cached_tokens, - cache_creation_tokens=cache_creation_tokens, - ), - post_state=None, - error=None, - parent_states=(), + await observer( + make_typed_event( + usage=Usage( + prompt_tokens=100, + completion_tokens=5, + total_tokens=105, + cached_tokens=cached_tokens, + cache_creation_tokens=cache_creation_tokens, + ), + ) ) - await observer(started) - await observer(completed) finally: _reset_invocation_id(token) observer.shutdown() @@ -783,6 +705,162 @@ async def test_disable_llm_spans_skips_llm_provider_span() -> None: assert llm_spans == [] +async def test_llm_span_duration_matches_typed_event_latency() -> None: + # Proposal 0049 + PR 3b: the success-path span's duration is + # back-dated using LlmCompletionEvent.latency_ms, so observers see + # the adapter-boundary measurement instead of dispatcher queue + # delay. Verify the span's end-minus-start lands within tolerance + # of the typed event's latency_ms. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + latency_ms = 123.456 + token = _set_invocation_id("inv-duration") + try: + await observer(make_typed_event(latency_ms=latency_ms)) + finally: + _reset_invocation_id(token) + observer.shutdown() + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert len(llm_spans) == 1 + span = llm_spans[0] + assert span.start_time is not None and span.end_time is not None + duration_ms = (span.end_time - span.start_time) / 1_000_000 + # Tolerance covers integer-nanosecond truncation and float->int + # rounding; the back-date is exact apart from those. + assert abs(duration_ms - latency_ms) < 1.0 + + +async def test_llm_span_zero_duration_when_latency_missing() -> None: + # When the typed event omits latency_ms (None), the handler falls + # back to a zero-duration span at end_time rather than guessing + # the start. Pin the fallback so a future "let's just use now() for + # both endpoints" tweak doesn't accidentally swap to a small + # positive duration. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + token = _set_invocation_id("inv-no-latency") + try: + await observer(make_typed_event(latency_ms=None)) + finally: + _reset_invocation_id(token) + observer.shutdown() + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert len(llm_spans) == 1 + span = llm_spans[0] + assert span.start_time is not None and span.end_time is not None + assert span.start_time == span.end_time + + +async def test_typed_llm_event_drops_silently_outside_invocation() -> None: + # No invocation in scope (no _set_invocation_id) → the handler + # MUST early-return without emitting a span. Symmetric with the + # error path's no-invocation drop. + from tests._helpers.typed_event import make_typed_event + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + await observer(make_typed_event()) + observer.shutdown() + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert llm_spans == [] + + +async def test_disable_llm_spans_skips_typed_event_path() -> None: + # disable_llm_spans MUST gate the typed-event handler too — not + # just the sentinel-pair branch. Companion to + # ``test_disable_llm_spans_skips_llm_provider_span`` which covers + # the sentinel side. + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_typed_event + + exporter = InMemorySpanExporter() + observer = OTelObserver( + span_processor=SimpleSpanProcessor(exporter), + disable_llm_spans=True, + ) + token = _set_invocation_id("inv-disabled") + try: + await observer(make_typed_event()) + finally: + _reset_invocation_id(token) + observer.shutdown() + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert llm_spans == [] + + +async def test_llm_error_path_emits_error_span_from_sentinel_completed() -> None: + # Failure-path LLM calls don't emit the typed LlmCompletionEvent + # (per proposal 0049 §3 alternative 3). The OTel observer keeps + # the sentinel-pair error path alive so error-status spans still + # fire. Sentinel started + non-error completed are no-ops; only + # completed-with-error_type produces a span. + from opentelemetry.trace import StatusCode + + from openarmature.graph.events import NodeEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from openarmature.observability.llm_event import LlmEventPayload + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + token = _set_invocation_id("inv-err") + try: + started = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="started", + pre_state=LlmEventPayload(call_id="cc-err", model="test-m"), + post_state=None, + error=None, + parent_states=(), + ) + completed = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="completed", + pre_state=LlmEventPayload( + call_id="cc-err", + model="test-m", + error_type="ProviderRateLimit", + error_category="provider_rate_limited", + error_message="429 from upstream", + ), + post_state=None, + error=None, + parent_states=(), + ) + await observer(started) + await observer(completed) + finally: + _reset_invocation_id(token) + observer.shutdown() + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert len(llm_spans) == 1 + span = llm_spans[0] + assert span.status.status_code == StatusCode.ERROR + attrs = dict(span.attributes or {}) + assert attrs.get("openarmature.error.category") == "provider_rate_limited" + + # --------------------------------------------------------------------------- # §7 log bridge: correlation_id injection # ---------------------------------------------------------------------------