From 9a7a4b079dfd7148c75203a72f6cc411b3bd8789 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 18:04:45 -0700 Subject: [PATCH 01/10] Add LlmRetryAttemptEvent for per-attempt LLM spans (0050) --- src/openarmature/graph/events.py | 67 ++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index 09b3b50..2171ca1 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -656,6 +656,72 @@ class LlmFailedEvent: caller_invocation_metadata: Mapping[str, AttributeValue] | None = None +# Python-internal per-attempt LLM event. NOT a spec-normative event type +# (unlike LlmCompletionEvent / LlmFailedEvent): it is the observer-side +# vehicle for the observability §5.5 per-attempt span surface under +# llm-provider §7.1 call-level retry. One is dispatched per in-call +# attempt (including the single attempt of a no-retry call); the OTel +# observer renders one openarmature.llm.complete span from each, while +# the terminal LlmCompletionEvent / LlmFailedEvent stay one-per-call +# (payload/latency, Langfuse mapping, the fixture-072 mutual exclusion). +@dataclass(frozen=True) +class LlmRetryAttemptEvent: + """One LLM-call attempt delivered to observers for per-attempt span + rendering. + + Carries the full request-side surface plus that attempt's outcome. + ``error_category`` discriminates the outcome: ``None`` for a + successful attempt (the response-side fields are populated), a + category string for a failed attempt (the response-side fields are + ``None`` — no response was received). + + Field set: + + - ``llm_attempt_index``: the call-level retry-attempt index, ``0`` + for the first attempt and ``0..N-1`` across the N attempts of a + call-level retry. Distinct from ``attempt_index`` (the node-level + retry index used for calling-span resolution); the two are + independent. + - identity / scoping (``invocation_id`` ... ``call_id``) and the + request side (``input_messages`` / ``request_params`` / + ``request_extras`` / ``active_prompt`` / ``active_prompt_group``) + mirror :class:`LlmCompletionEvent`, carried on every attempt. + - response side (``response_id`` / ``response_model`` / ``usage`` / + ``finish_reason`` / ``output_content``): populated on a successful + attempt; ``None`` on a failed attempt. + - failure side (``error_category`` / ``error_message`` / + ``error_type``): populated on a failed attempt; ``None`` on a + successful one. + """ + + invocation_id: str + correlation_id: str | None + node_name: str + namespace: tuple[str, ...] + attempt_index: int + fan_out_index: int | None + branch_name: str | None + provider: str + model: str + call_id: str + llm_attempt_index: int + latency_ms: float | None + input_messages: list[dict[str, Any]] + request_params: Mapping[str, Any] + request_extras: Mapping[str, Any] + active_prompt: Any + active_prompt_group: Any + response_id: str | None = None + response_model: str | None = None + usage: "Usage | None" = None + finish_reason: str | None = None + output_content: str | None = None + error_category: str | None = None + error_message: str | None = None + error_type: str | None = None + caller_invocation_metadata: Mapping[str, AttributeValue] | None = None + + # Spec: pipeline-utilities §6.3 cause chain (proposal 0068). A ``carrier`` # link is a graph-engine §4 ``node_exception`` wrapper the engine applies at a # non-node placement (§9.7 instance / §11.7 branch / §9.6 / §11.6 parent-node @@ -758,6 +824,7 @@ class FailureIsolatedEvent: "InvocationStartedEvent", "LlmCompletionEvent", "LlmFailedEvent", + "LlmRetryAttemptEvent", "MetadataAugmentationEvent", "NodeEvent", "ParallelBranchesEventConfig", From 2fb069fcba0b7b20cf01c54ccce6528bc7aeb7ef Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 18:25:11 -0700 Subject: [PATCH 02/10] Emit per-attempt LlmRetryAttemptEvent from complete() (0050) --- src/openarmature/graph/observer.py | 5 + src/openarmature/llm/providers/openai.py | 167 ++++++++++++++++-- src/openarmature/observability/correlation.py | 6 + .../observability/langfuse/observer.py | 8 + .../observability/otel/observer.py | 8 + tests/conformance/test_observability.py | 6 + tests/unit/test_llm_provider.py | 26 +-- 7 files changed, 202 insertions(+), 24 deletions(-) diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index ce5da36..da6b3e7 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -40,6 +40,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -55,6 +56,9 @@ # typed LLM provider call event, dispatched on every successful LLM # completion), LlmFailedEvent (proposal 0058 typed LLM failure event, # dispatched alongside the §7 exception when provider.complete raises), +# LlmRetryAttemptEvent (proposal 0050 per-attempt LLM span event, +# python-internal, dispatched once per in-call attempt under call-level +# retry to drive the per-attempt OTel span surface), # and FailureIsolatedEvent (proposal 0050 §6.3 framework-emitted event, # dispatched by FailureIsolationMiddleware when it catches an exception # escaping the inner chain and substitutes a degraded partial update). @@ -65,6 +69,7 @@ | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ) diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 8c86cb4..2a5d66b 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -56,7 +56,7 @@ import re import time import uuid -from collections.abc import Mapping, Sequence +from collections.abc import Callable, Mapping, Sequence from typing import TYPE_CHECKING, Any, Literal, cast from urllib.parse import urlparse @@ -64,7 +64,11 @@ import jsonschema from pydantic import BaseModel, ValidationError -from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent +from openarmature.graph.events import ( + LlmCompletionEvent, + LlmFailedEvent, + LlmRetryAttemptEvent, +) from openarmature.observability.correlation import ( current_attempt_index, current_branch_name, @@ -381,9 +385,12 @@ async def complete( backoff (defaulting to the canonical transient categories with exponential jittered backoff). The request is built and validated once; pre-send validation errors are never retried. - Exactly one observability event fires for the call's terminal - outcome regardless of attempt count, and its ``latency_ms`` - covers the whole call, retries and backoff included. The + Exactly one terminal observability event (LlmCompletionEvent or + LlmFailedEvent) fires for the call's terminal outcome regardless + of attempt count, and its ``latency_ms`` covers the whole call, + retries and backoff included. A per-attempt LlmRetryAttemptEvent + also fires for each attempt (driving the per-attempt LLM span + surface), each carrying just that attempt's latency. The ``on_retry`` hook is not exception-isolated (mirroring ``RetryMiddleware``); an exception raised by it propagates out of the call. @@ -481,7 +488,38 @@ async def complete( include_response_format=(schema_dict is None or not self._force_prompt_augmentation_fallback), tool_choice=tool_choice, ) - response = await self._do_complete_with_retry(body, schema_dict, schema_class, retry) + + # Per-attempt LLM span surface (observability §5.5 under + # llm-provider §7.1 call-level retry): dispatch one + # LlmRetryAttemptEvent per attempt so the observer can render + # one openarmature.llm.complete span per attempt. No-op + # outside an invocation (``dispatch`` is None). + def emit_attempt( + idx: int, + attempt_latency_ms: float, + attempt_response: Response | None, + attempt_exc: LlmProviderError | None, + ) -> None: + if dispatch is None: + return + dispatch( + self._build_llm_retry_attempt_event( + llm_attempt_index=idx, + latency_ms=attempt_latency_ms, + call_id=call_id, + input_messages=serialized_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + response=attempt_response, + exc=attempt_exc, + ), + ) + + response = await self._do_complete_with_retry( + body, schema_dict, schema_class, retry, emit_attempt + ) except LlmProviderError as exc: # Failure path: dispatch a typed LlmFailedEvent per # proposal 0058. Only §7 category exceptions @@ -533,18 +571,30 @@ async def _do_complete_with_retry( schema_dict: dict[str, Any] | None, schema_class: type[BaseModel] | None, retry: RetryConfig | None, + emit_attempt: Callable[[int, float, Response | None, LlmProviderError | None], None] | None = None, ) -> Response: """Run the wire call with optional call-level retry. Loops the underlying wire call on transient provider errors per - the retry config. Intermediate transient attempts are caught - here and emit no observability event; only the terminal outcome - (success, retry exhaustion, or a non-transient error) reaches - ``complete()``'s typed-event dispatch, so exactly one event - fires per ``complete()`` call. + the retry config. ``emit_attempt`` (when supplied) fires once + per attempt with that attempt's index, latency, and outcome + (response on success, exception on failure) so the caller can + dispatch the per-attempt LLM span event. The terminal outcome + (success, retry exhaustion, or a non-transient error) is what + reaches ``complete()`` for the single terminal typed event, so + exactly one terminal event still fires per ``complete()`` call. """ if retry is None: - return await self._do_complete(body, schema_dict, schema_class) + attempt_start = time.perf_counter() + try: + response = await self._do_complete(body, schema_dict, schema_class) + except LlmProviderError as exc: + if emit_attempt is not None: + emit_attempt(0, (time.perf_counter() - attempt_start) * 1000.0, None, exc) + raise + if emit_attempt is not None: + emit_attempt(0, (time.perf_counter() - attempt_start) * 1000.0, response, None) + return response # Lazy import avoids a module-load cycle: graph.middleware.retry # imports llm.errors. Resolve None config fields to the canonical # defaults, mirroring RetryMiddleware. @@ -557,9 +607,15 @@ async def _do_complete_with_retry( backoff = retry.backoff or exponential_jitter_backoff attempt = 0 while True: + attempt_start = time.perf_counter() try: - return await self._do_complete(body, schema_dict, schema_class) + response = await self._do_complete(body, schema_dict, schema_class) except LlmProviderError as exc: + # Per-attempt event fires for every attempt, including + # this failed one (the terminal failed attempt's span + # carries the final category). + if emit_attempt is not None: + emit_attempt(attempt, (time.perf_counter() - attempt_start) * 1000.0, None, exc) # No graph state at the call boundary; pass None (the # default classifier ignores it). Re-raise on exhaustion # or a non-transient category so complete() emits the @@ -572,6 +628,10 @@ async def _do_complete_with_retry( await retry.on_retry(exc, attempt) await asyncio.sleep(backoff(attempt)) attempt += 1 + continue + if emit_attempt is not None: + emit_attempt(attempt, (time.perf_counter() - attempt_start) * 1000.0, response, None) + return response def _build_llm_completion_event( self, @@ -711,6 +771,87 @@ def _build_llm_failed_event( caller_invocation_metadata=caller_metadata, ) + def _build_llm_retry_attempt_event( + self, + *, + llm_attempt_index: int, + latency_ms: float, + call_id: str, + input_messages: list[dict[str, Any]], + request_params: dict[str, Any], + request_extras: dict[str, Any], + active_prompt: Any, + active_prompt_group: Any, + response: Response | None = None, + exc: LlmProviderError | None = None, + ) -> LlmRetryAttemptEvent: + """Construct an LlmRetryAttemptEvent for one in-call attempt. + + Identity / scoping + request-side fields mirror the terminal + events; the outcome comes from ``response`` (a successful + attempt) or ``exc`` (a failed attempt). Exactly one of the two + is supplied. ``llm_attempt_index`` is the call-level retry + index, distinct from the node-level ``attempt_index`` sourced + from the ContextVar. + """ + namespace = current_namespace_prefix() + node_name = namespace[-1] if namespace else "" + invocation_id = current_invocation_id() or "" + caller_metadata: Mapping[str, AttributeValue] | None = None + if self._populate_caller_metadata: + caller_metadata = dict(current_invocation_metadata()) + if response is not None: + return LlmRetryAttemptEvent( + invocation_id=invocation_id, + correlation_id=current_correlation_id(), + node_name=node_name, + namespace=namespace, + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + provider=self._genai_system, + model=self.model, + call_id=call_id, + llm_attempt_index=llm_attempt_index, + latency_ms=latency_ms, + input_messages=input_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + response_id=response.response_id, + response_model=response.response_model, + usage=response.usage, + finish_reason=response.finish_reason, + output_content=response.message.content or None, + caller_invocation_metadata=caller_metadata, + ) + if exc is None: + raise ValueError("_build_llm_retry_attempt_event requires response or exc") + return LlmRetryAttemptEvent( + invocation_id=invocation_id, + correlation_id=current_correlation_id(), + node_name=node_name, + namespace=namespace, + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + provider=self._genai_system, + model=self.model, + call_id=call_id, + llm_attempt_index=llm_attempt_index, + latency_ms=latency_ms, + input_messages=input_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + error_category=exc.category, + error_type=type(exc).__name__, + error_message=str(exc), + caller_invocation_metadata=caller_metadata, + ) + async def _do_complete( self, body: dict[str, Any], diff --git a/src/openarmature/observability/correlation.py b/src/openarmature/observability/correlation.py index 7b6660f..ba83e00 100644 --- a/src/openarmature/observability/correlation.py +++ b/src/openarmature/observability/correlation.py @@ -42,6 +42,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -227,6 +228,7 @@ def _reset_active_observers(token: Token[tuple[SubscribedObserver, ...]]) -> Non | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -244,6 +246,7 @@ def current_dispatch() -> ( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -272,6 +275,7 @@ def _set_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -285,6 +289,7 @@ def _set_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -306,6 +311,7 @@ def _reset_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 587641f..08b4a7e 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -35,6 +35,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -368,6 +369,7 @@ async def __call__( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ), ) -> None: @@ -377,6 +379,12 @@ async def __call__( if isinstance(event, InvocationCompletedEvent): self._handle_invocation_completed(event) return + # Proposal 0050 per-attempt LLM events are OTel-span-only: the + # Langfuse mapping renders one Generation per call from the + # terminal LlmCompletionEvent / LlmFailedEvent, so the + # per-attempt event is ignored here. + if isinstance(event, LlmRetryAttemptEvent): + return # Proposal 0049 typed LlmCompletionEvent (success path). Drives # the §5.5 Generation observation lifecycle for successful # provider calls. diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 35a2929..6875296 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -103,6 +103,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -594,6 +595,7 @@ async def __call__( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ), ) -> None: @@ -604,6 +606,12 @@ async def __call__( # before any node-specific logic runs. if isinstance(event, InvocationStartedEvent | InvocationCompletedEvent): return + # Proposal 0050 per-attempt LLM span surface (LlmRetryAttemptEvent): + # the per-attempt openarmature.llm.complete span render lands in a + # follow-up step; ignored here for now so the terminal + # LlmCompletionEvent / LlmFailedEvent keep driving the span. + if isinstance(event, LlmRetryAttemptEvent): + return # Proposal 0049 typed LlmCompletionEvent (success path). # Drives the openarmature.llm.complete span lifecycle for # successful provider calls. diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index ab2c25a..c6c249e 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -2948,6 +2948,12 @@ def __init__(self, *, filter_event_type: str | None = None) -> None: self.events: list[Any] = [] async def __call__(self, event: Any) -> None: + # LlmRetryAttemptEvent is python-internal (it drives the OTel + # per-attempt span surface), not a spec-normative observer + # event, so the conformance collector excludes it from the + # captured stream that spec fixtures assert against. + if type(event).__name__ == "LlmRetryAttemptEvent": + return if self.filter_event_type is not None: if type(event).__name__ != self.filter_event_type: return diff --git a/tests/unit/test_llm_provider.py b/tests/unit/test_llm_provider.py index 13f6e3d..05dc039 100644 --- a/tests/unit/test_llm_provider.py +++ b/tests/unit/test_llm_provider.py @@ -20,7 +20,7 @@ import pytest from pydantic import ValidationError -from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, NodeEvent +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, LlmRetryAttemptEvent, NodeEvent from openarmature.graph.middleware import RetryConfig, deterministic_backoff from openarmature.graph.observer import ObserverEvent from openarmature.llm import ( @@ -1959,14 +1959,15 @@ def _handler(_req: httpx.Request) -> httpx.Response: assert typed.response_id is None -async def test_complete_success_emits_typed_event_as_single_emission() -> None: - # v0.13.0 dropped the success-side sentinel emission, so the - # provider's success-path emission window is now a single typed - # event — no within-emission ordering question remains. This test - # locks the single-emission shape so a regression that re-adds a - # sentinel NodeEvent on success would surface here. Spec fixture - # 056 pins the broader bracketing (typed event arrives between - # the CALLING NODE's started/completed pair) end-to-end. +async def test_complete_success_emits_per_attempt_then_terminal_typed_event() -> None: + # A successful no-retry complete() emits two typed events: a + # per-attempt LlmRetryAttemptEvent (attempt 0, driving the OTel + # per-attempt span surface) followed by the terminal + # LlmCompletionEvent. Both are typed — this locks the no-sentinel + # shape so a regression re-adding a sentinel NodeEvent on success + # would surface here as an extra NodeEvent. Spec fixture 056 pins + # the terminal event's bracketing (it arrives between the CALLING + # NODE's started/completed pair) end-to-end. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} @@ -1978,8 +1979,11 @@ async def test_complete_success_emits_typed_event_as_single_emission() -> None: await provider.aclose() _release_dispatch(token) - assert len(events) == 1 - assert isinstance(events[0], LlmCompletionEvent) + assert [type(e).__name__ for e in events] == ["LlmRetryAttemptEvent", "LlmCompletionEvent"] + first, terminal = events + assert isinstance(first, LlmRetryAttemptEvent) + assert first.llm_attempt_index == 0 + assert isinstance(terminal, LlmCompletionEvent) async def test_llm_completion_event_sources_node_identity_from_calling_context() -> None: From e6f62851fa42da01af48eb74e8c7e4416e1abfcc Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 18:41:07 -0700 Subject: [PATCH 03/10] Render per-attempt LLM spans from LlmRetryAttemptEvent (0050) --- .../observability/otel/observer.py | 152 ++++-------------- tests/_helpers/typed_event.py | 41 ++++- tests/unit/test_observability_otel.py | 33 ++-- 3 files changed, 92 insertions(+), 134 deletions(-) diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 6875296..cb45ec9 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -476,7 +476,7 @@ class OTelObserver: # close sites (subgraph dispatch, detached root, fan-out instance, # invocation span, shutdown drain). attribute_enrichers: Sequence[ - Callable[[Span, NodeEvent | LlmCompletionEvent | LlmFailedEvent | None], None] + Callable[[Span, NodeEvent | LlmCompletionEvent | LlmFailedEvent | LlmRetryAttemptEvent | None], None] ] = () # Read from the package's ``__spec_version__`` (one of the three # places the spec version is pinned per CLAUDE.md). Bumping the @@ -549,7 +549,7 @@ def __post_init__(self) -> None: # ``event`` is None on synthetic close sites (subgraph dispatch, # detached root, fan-out instance, invocation span, orphan drain). def _run_enrichers( - self, span: Span, event: NodeEvent | LlmCompletionEvent | LlmFailedEvent | None + self, span: Span, event: NodeEvent | LlmCompletionEvent | LlmFailedEvent | LlmRetryAttemptEvent | None ) -> None: """Invoke configured enrichers against ``span`` before ``span.end()`` is called.""" @@ -606,26 +606,19 @@ async def __call__( # before any node-specific logic runs. if isinstance(event, InvocationStartedEvent | InvocationCompletedEvent): return - # Proposal 0050 per-attempt LLM span surface (LlmRetryAttemptEvent): - # the per-attempt openarmature.llm.complete span render lands in a - # follow-up step; ignored here for now so the terminal - # LlmCompletionEvent / LlmFailedEvent keep driving the span. + # Proposal 0050 per-attempt LLM span surface: the + # openarmature.llm.complete span(s) render from the per-attempt + # LlmRetryAttemptEvent — one span per attempt under call-level + # retry (attempt_index 0..N-1), one for a no-retry call. if isinstance(event, LlmRetryAttemptEvent): - return - # Proposal 0049 typed LlmCompletionEvent (success path). - # Drives the openarmature.llm.complete span lifecycle for - # successful provider calls. - if isinstance(event, LlmCompletionEvent): if not self.disable_llm_spans: - self._handle_typed_llm_completion(event) + self._handle_typed_llm_retry_attempt(event) return - # Proposal 0058 typed LlmFailedEvent (failure path). Drives - # the same openarmature.llm.complete span lifecycle for failed - # provider calls, with ERROR status + openarmature.error.category - # attribute. - if isinstance(event, LlmFailedEvent): - if not self.disable_llm_spans: - self._handle_typed_llm_failed(event) + # The terminal LlmCompletionEvent / LlmFailedEvent no longer + # drive the OTel span (the per-attempt event does); they stay on + # the queue for the Langfuse mapping and payload/latency + # consumers, so the OTel observer ignores them here. + if isinstance(event, LlmCompletionEvent | LlmFailedEvent): return # Proposal 0050 §6.3 framework-emitted failure-isolation event. if isinstance(event, FailureIsolatedEvent): @@ -1188,9 +1181,17 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: # active_prompt_group snapshots taken at dispatch time — NOT the # ContextVar. The dispatch worker's task-local Context doesn't see # node-body ContextVar writes. - def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: - """Open + close the ``openarmature.llm.complete`` span from the - typed LlmCompletionEvent (success path).""" + def _handle_typed_llm_retry_attempt(self, event: LlmRetryAttemptEvent) -> None: + """Open + close one ``openarmature.llm.complete`` span from a + per-attempt LlmRetryAttemptEvent. + + N call-level retry attempts produce N spans, all parented under + the calling node span, each carrying + ``openarmature.llm.attempt_index`` 0..N-1. A successful attempt + (``error_category is None``) carries the full response surface + with OK status; a failed attempt carries ERROR status + the + error category and no response attributes. + """ # Mid-call metadata augmentation can't reach this span: the # typed event arrives only after complete() returns, and the # span is back-dated past any augmentation event that fired @@ -1224,7 +1225,10 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: calling_fan_out_index=event.fan_out_index, calling_branch_name=event.branch_name, ) - attrs: dict[str, Any] = {"openarmature.llm.model": event.model} + attrs: dict[str, Any] = { + "openarmature.llm.model": event.model, + "openarmature.llm.attempt_index": event.llm_attempt_index, + } cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid @@ -1276,6 +1280,14 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: attributes=attrs, start_time=start_time_ns, ) + if event.error_category is not None: + # Failed attempt: ERROR + the §4 category, no response + # attributes (no response was received). + span.set_status(Status(StatusCode.ERROR, description=event.error_category)) + span.set_attribute("openarmature.error.category", event.error_category) + self._run_enrichers(span, event) + span.end(end_time=end_time_ns) + return usage = event.usage if event.finish_reason is not None: span.set_attribute("openarmature.llm.finish_reason", event.finish_reason) @@ -1319,100 +1331,6 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: self._run_enrichers(span, event) span.end(end_time=end_time_ns) - def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: - """Open + close the ``openarmature.llm.complete`` span from the - typed LlmFailedEvent (failure path). Same span shape as the - success path with ERROR status + - ``openarmature.error.category`` attribute attached.""" - from openarmature.observability.correlation import ( - current_correlation_id, - current_invocation_id, - ) - - invocation_id = current_invocation_id() - if invocation_id is None: - return - inv_state = self._inv_state_for(invocation_id) - # Back-date start_time using latency_ms (mirrors the success- - # path handler). For failures, latency reflects time spent - # until the §7 exception was raised — useful for diagnosing - # whether failures are fast-failing (pre-send validation) or - # slow-failing (provider timeout). - end_time_ns = time.time_ns() - if event.latency_ms is not None: - start_time_ns = end_time_ns - int(event.latency_ms * 1_000_000) - else: - start_time_ns = end_time_ns - parent_ctx = self._resolve_llm_parent( - inv_state, - invocation_id, - calling_namespace_prefix=event.namespace, - calling_attempt_index=event.attempt_index, - calling_fan_out_index=event.fan_out_index, - calling_branch_name=event.branch_name, - ) - attrs: dict[str, Any] = {"openarmature.llm.model": event.model} - cid = current_correlation_id() - if cid is not None: - attrs["openarmature.correlation_id"] = cid - if event.caller_invocation_metadata is not None: - _apply_caller_metadata(attrs, event.caller_invocation_metadata) - active_prompt = event.active_prompt - if active_prompt is not None: - attrs["openarmature.prompt.name"] = active_prompt.name - attrs["openarmature.prompt.version"] = active_prompt.version - attrs["openarmature.prompt.label"] = active_prompt.label - attrs["openarmature.prompt.template_hash"] = active_prompt.template_hash - attrs["openarmature.prompt.rendered_hash"] = active_prompt.rendered_hash - active_group = event.active_prompt_group - if active_group is not None: - attrs["openarmature.prompt.group_name"] = active_group.group_name - if not self.disable_genai_semconv: - attrs["gen_ai.system"] = event.provider - attrs["gen_ai.request.model"] = event.model - request_params = event.request_params or {} - if "temperature" in request_params: - attrs["gen_ai.request.temperature"] = request_params["temperature"] - if "max_tokens" in request_params: - attrs["gen_ai.request.max_tokens"] = request_params["max_tokens"] - if "top_p" in request_params: - attrs["gen_ai.request.top_p"] = request_params["top_p"] - if "seed" in request_params: - attrs["gen_ai.request.seed"] = request_params["seed"] - if "frequency_penalty" in request_params: - attrs["gen_ai.request.frequency_penalty"] = request_params["frequency_penalty"] - if "presence_penalty" in request_params: - attrs["gen_ai.request.presence_penalty"] = request_params["presence_penalty"] - if "stop_sequences" in request_params: - attrs["gen_ai.request.stop_sequences"] = request_params["stop_sequences"] - if not self.disable_provider_payload: - if event.input_messages: - serialized = _serialize_for_attribute(event.input_messages) - attrs["openarmature.llm.input.messages"] = _truncate_for_attribute( - serialized, self.payload_max_bytes - ) - if event.request_extras: - serialized_extras = _serialize_for_attribute(event.request_extras) - attrs["openarmature.llm.request.extras"] = _truncate_for_attribute( - serialized_extras, self.payload_max_bytes - ) - span = self._tracer.start_span( - name="openarmature.llm.complete", - context=cast("Any", parent_ctx), - kind=SpanKind.CLIENT, - attributes=attrs, - start_time=start_time_ns, - ) - span.set_status( - Status( - StatusCode.ERROR, - description=event.error_category, - ) - ) - span.set_attribute("openarmature.error.category", event.error_category) - self._run_enrichers(span, event) - span.end(end_time=end_time_ns) - def _handle_failure_isolated(self, event: FailureIsolatedEvent) -> None: """Emit a zero-duration ``openarmature.failure_isolated`` span for a FailureIsolationMiddleware catch. diff --git a/tests/_helpers/typed_event.py b/tests/_helpers/typed_event.py index 3b34f9d..02a55a0 100644 --- a/tests/_helpers/typed_event.py +++ b/tests/_helpers/typed_event.py @@ -10,7 +10,7 @@ from typing import Any -from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, LlmRetryAttemptEvent def make_typed_event(**overrides: Any) -> LlmCompletionEvent: @@ -74,3 +74,42 @@ def make_failed_event(**overrides: Any) -> LlmFailedEvent: } base.update(overrides) return LlmFailedEvent(**base) + + +def make_retry_attempt_event(**overrides: Any) -> LlmRetryAttemptEvent: + """Build an ``LlmRetryAttemptEvent`` with neutral success-attempt + defaults; ``overrides`` swap fields for the test case. Pass + ``error_category=...`` (and ``finish_reason=None``) for a failed + attempt. This is the event that drives the OTel per-attempt LLM + span; the Langfuse-side tests keep using ``make_typed_event`` (the + terminal event the Generation mapping reads).""" + base: dict[str, Any] = { + "invocation_id": "inv-1", + "correlation_id": None, + "node_name": "ask", + "namespace": ("ask",), + "attempt_index": 0, + "fan_out_index": None, + "branch_name": None, + "provider": "openai", + "model": "test-m", + "call_id": "cc-1", + "llm_attempt_index": 0, + "latency_ms": 10.0, + "input_messages": [], + "request_params": {}, + "request_extras": {}, + "active_prompt": None, + "active_prompt_group": None, + "response_id": None, + "response_model": None, + "usage": None, + "finish_reason": "stop", + "output_content": None, + "error_category": None, + "error_message": None, + "error_type": None, + "caller_invocation_metadata": None, + } + base.update(overrides) + return LlmRetryAttemptEvent(**base) diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index a9aa72f..90bc580 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -508,7 +508,7 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: PromptResult, TextPrompt, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) @@ -543,7 +543,7 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: # The observer reads from the typed event, NOT from the live # ContextVar — that ContextVar is unreachable from the # dispatch worker's task-local Context. - await observer(make_typed_event(active_prompt=result, active_prompt_group=group)) + await observer(make_retry_attempt_event(active_prompt=result, active_prompt_group=group)) finally: _reset_invocation_id(token) @@ -566,14 +566,14 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-2") try: - await observer(make_typed_event()) + await observer(make_retry_attempt_event()) finally: _reset_invocation_id(token) observer.shutdown() @@ -598,14 +598,14 @@ async def _drive_llm_span_with_cached_tokens( _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-cache") try: await observer( - make_typed_event( + make_retry_attempt_event( usage=Usage( prompt_tokens=100, completion_tokens=5, @@ -714,14 +714,14 @@ async def test_llm_span_duration_matches_typed_event_latency() -> None: _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) latency_ms = 123.456 token = _set_invocation_id("inv-duration") try: - await observer(make_typed_event(latency_ms=latency_ms)) + await observer(make_retry_attempt_event(latency_ms=latency_ms)) finally: _reset_invocation_id(token) observer.shutdown() @@ -745,13 +745,13 @@ async def test_llm_span_zero_duration_when_latency_missing() -> None: _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-no-latency") try: - await observer(make_typed_event(latency_ms=None)) + await observer(make_retry_attempt_event(latency_ms=None)) finally: _reset_invocation_id(token) observer.shutdown() @@ -766,11 +766,11 @@ async def test_typed_llm_event_drops_silently_outside_invocation() -> None: # No invocation in scope (no _set_invocation_id) → the handler # MUST early-return without emitting a span. Symmetric with the # error path's no-invocation drop. - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) - await observer(make_typed_event()) + await observer(make_retry_attempt_event()) observer.shutdown() llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] assert llm_spans == [] @@ -785,7 +785,7 @@ async def test_disable_llm_spans_skips_typed_event_path() -> None: _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver( @@ -794,7 +794,7 @@ async def test_disable_llm_spans_skips_typed_event_path() -> None: ) token = _set_invocation_id("inv-disabled") try: - await observer(make_typed_event()) + await observer(make_retry_attempt_event()) finally: _reset_invocation_id(token) observer.shutdown() @@ -812,19 +812,20 @@ async def test_llm_error_path_emits_error_span_from_typed_failed_event() -> None _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_failed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-err") try: await observer( - make_failed_event( + make_retry_attempt_event( invocation_id="inv-err", error_category="provider_rate_limit", error_type="ProviderRateLimit", error_message="429 from upstream", call_id="cc-err", + finish_reason=None, ) ) finally: From beea3ab36708fe67a10c7b1e8a2a417113980901 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 18:44:10 -0700 Subject: [PATCH 04/10] Activate obs-057 (single-attempt llm.attempt_index) (0050) --- tests/conformance/test_observability.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index c6c249e..f55dd22 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -89,6 +89,9 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # §5.7 attribute surface end-to-end against a two-branch # parallel-branches graph with calls_llm in each branch. "038-otel-parallel-branches-dispatch-span", + # v0.42.0 — proposal 0050 call-level-retry per-attempt LLM span + # surface. Single-attempt default: one span, attempt_index 0. + "057-llm-attempt-index-single-attempt-default", "001-otel-basic-trace", "002-otel-subgraph-hierarchy", "003-otel-error-status", @@ -256,6 +259,7 @@ async def test_observability_fixture(fixture_path: Path) -> None: "021-otel-llm-disable-genai-semconv", "025-otel-llm-request-params-extended", "026-otel-caller-supplied-metadata", + "057-llm-attempt-index-single-attempt-default", }: await _run_llm_payload_fixture(spec) else: From 372f0e6ded344d61df22bab84db60cf441d2caca Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 18:55:28 -0700 Subject: [PATCH 05/10] Add N-span call-level-retry integration test (0050) --- tests/unit/test_observability_otel.py | 81 +++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 90bc580..217bcba 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -839,6 +839,87 @@ async def test_llm_error_path_emits_error_span_from_typed_failed_event() -> None assert attrs.get("openarmature.error.category") == "provider_rate_limit" +async def test_call_level_retry_emits_one_llm_span_per_attempt() -> None: + # Proposal 0050 §7.1 / observability §5.5: a complete(retry=...) that + # fails transiently then succeeds emits TWO openarmature.llm.complete + # spans — attempt_index 0 (ERROR, provider_unavailable) and 1 (OK). + # Validates the per-attempt span surface end to end: the provider + # dispatches one LlmRetryAttemptEvent per attempt and the observer + # renders one span per event. In production the engine's serial + # queue carries the events; here they are captured then replayed. + import httpx + from opentelemetry.trace import StatusCode + + from openarmature.graph.events import LlmRetryAttemptEvent + from openarmature.graph.middleware import RetryConfig, deterministic_backoff + from openarmature.llm.messages import UserMessage + from openarmature.llm.providers.openai import OpenAIProvider + from openarmature.observability.correlation import ( + _reset_active_dispatch, + _reset_invocation_id, + _set_active_dispatch, + _set_invocation_id, + ) + + calls = {"n": 0} + + def handler(_request: httpx.Request) -> httpx.Response: + calls["n"] += 1 + if calls["n"] == 1: + return httpx.Response(503, json={"error": {"message": "service down"}}) + return httpx.Response( + 200, + json={ + "id": "cc-056", + "object": "chat.completion", + "model": "gpt-test", + "choices": [ + {"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"} + ], + "usage": {"prompt_tokens": 4, "completion_tokens": 1, "total_tokens": 5}, + }, + ) + + captured: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: captured.append(e)) + inv_token = _set_invocation_id("inv-retry") + provider = OpenAIProvider( + base_url="http://test", model="gpt-test", api_key="k", transport=httpx.MockTransport(handler) + ) + try: + result = await provider.complete( + [UserMessage(content="go")], + retry=RetryConfig(max_attempts=2, backoff=deterministic_backoff(0.0)), + ) + finally: + await provider.aclose() + _reset_invocation_id(inv_token) + _reset_active_dispatch(disp_token) + assert result.message.content == "ok" + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + inv_token2 = _set_invocation_id("inv-retry") + try: + for event in captured: + if isinstance(event, LlmRetryAttemptEvent): + await observer(event) + finally: + _reset_invocation_id(inv_token2) + observer.shutdown() + + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert len(llm_spans) == 2 + spans_by_index = {dict(s.attributes or {})["openarmature.llm.attempt_index"]: s for s in llm_spans} + assert set(spans_by_index) == {0, 1} + s0 = spans_by_index[0] + assert s0.status.status_code == StatusCode.ERROR + assert dict(s0.attributes or {})["openarmature.error.category"] == "provider_unavailable" + s1 = spans_by_index[1] + assert s1.status.status_code == StatusCode.OK + assert dict(s1.attributes or {}).get("openarmature.llm.finish_reason") == "stop" + + # --------------------------------------------------------------------------- # §7 log bridge: correlation_id injection # --------------------------------------------------------------------------- From f20f4291301243ebf6987d45f89f42d2d8f21cd6 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 19:06:48 -0700 Subject: [PATCH 06/10] Activate call-level-retry per-attempt span fixtures (0050) --- tests/conformance/test_llm_provider.py | 15 ++-- tests/unit/test_observability_otel.py | 112 ++++++++++++++++--------- 2 files changed, 80 insertions(+), 47 deletions(-) diff --git a/tests/conformance/test_llm_provider.py b/tests/conformance/test_llm_provider.py index 0059053..c794807 100644 --- a/tests/conformance/test_llm_provider.py +++ b/tests/conformance/test_llm_provider.py @@ -103,12 +103,15 @@ "055-anthropic-wire-byte-stability": ( "Proposal 0047 wire-byte stability; queued for v0.13.0 (also Anthropic-pending)" ), - # Proposal 0050 (call-level retry, v0.42.0) — three fixtures - # exercise the new ``retry`` kwarg on ``complete()``. Queued for - # v0.14.0 retry & reliability primitives batch. - "056-call-level-retry-transient": ("Proposal 0050 call-level retry; queued for v0.14.0"), - "057-call-level-retry-exhaustion": ("Proposal 0050 call-level retry; queued for v0.14.0"), - "058-call-level-retry-non-transient-no-retry": ("Proposal 0050 call-level retry; queued for v0.14.0"), + # Proposal 0050 (call-level retry, v0.42.0) — three fixtures exercise + # the ``retry`` kwarg on ``complete()`` AND assert per-attempt LLM + # spans (observability §5.5). They stay deferred in this provider + # harness (which has no observer) and are activated + asserted in + # test_observability_otel.py::test_call_level_retry_fixture_per_attempt_spans, + # which drives them through the provider + an OTel observer. + "056-call-level-retry-transient": ("per-attempt LLM spans; see test_observability_otel.py"), + "057-call-level-retry-exhaustion": ("per-attempt LLM spans; see test_observability_otel.py"), + "058-call-level-retry-non-transient-no-retry": ("per-attempt LLM spans; see test_observability_otel.py"), } diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 217bcba..c70a0b6 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -839,19 +839,34 @@ async def test_llm_error_path_emits_error_span_from_typed_failed_event() -> None assert attrs.get("openarmature.error.category") == "provider_rate_limit" -async def test_call_level_retry_emits_one_llm_span_per_attempt() -> None: - # Proposal 0050 §7.1 / observability §5.5: a complete(retry=...) that - # fails transiently then succeeds emits TWO openarmature.llm.complete - # spans — attempt_index 0 (ERROR, provider_unavailable) and 1 (OK). - # Validates the per-attempt span surface end to end: the provider - # dispatches one LlmRetryAttemptEvent per attempt and the observer - # renders one span per event. In production the engine's serial - # queue carries the events; here they are captured then replayed. +@pytest.mark.parametrize( + "fixture_id", + [ + "056-call-level-retry-transient", + "057-call-level-retry-exhaustion", + "058-call-level-retry-non-transient-no-retry", + ], +) +async def test_call_level_retry_fixture_per_attempt_spans(fixture_id: str) -> None: + # Proposal 0050 §7.1 / observability §5.5: drive each spec + # call-level-retry fixture (spec/llm-provider/conformance/) through + # the provider + an OTel observer and assert its per-attempt + # openarmature.llm.complete spans. These fixtures assert SPANS, so + # they are activated here (otel-gated, with an observer) rather than + # the generic llm-provider harness, which has no observer. The + # provider dispatches one LlmRetryAttemptEvent per attempt and the + # observer renders one span per event; in production the engine's + # serial queue carries them, here they are captured then replayed. + import json + from pathlib import Path + import httpx + import yaml from opentelemetry.trace import StatusCode from openarmature.graph.events import LlmRetryAttemptEvent from openarmature.graph.middleware import RetryConfig, deterministic_backoff + from openarmature.llm.errors import LlmProviderError from openarmature.llm.messages import UserMessage from openarmature.llm.providers.openai import OpenAIProvider from openarmature.observability.correlation import ( @@ -861,45 +876,55 @@ async def test_call_level_retry_emits_one_llm_span_per_attempt() -> None: _set_invocation_id, ) - calls = {"n": 0} + fixture_dir = ( + Path(__file__).resolve().parents[2] / "openarmature-spec" / "spec" / "llm-provider" / "conformance" + ) + spec = cast("dict[str, Any]", yaml.safe_load((fixture_dir / f"{fixture_id}.yaml").read_text())) + responses = cast("list[dict[str, Any]]", spec["mock_provider"]["responses"]) + call = cast("dict[str, Any]", spec["call"]) + expected = cast("dict[str, Any]", spec["expected"]) + + response_iter = iter(responses) def handler(_request: httpx.Request) -> httpx.Response: - calls["n"] += 1 - if calls["n"] == 1: - return httpx.Response(503, json={"error": {"message": "service down"}}) - return httpx.Response( - 200, - json={ - "id": "cc-056", - "object": "chat.completion", - "model": "gpt-test", - "choices": [ - {"index": 0, "message": {"role": "assistant", "content": "ok"}, "finish_reason": "stop"} - ], - "usage": {"prompt_tokens": 4, "completion_tokens": 1, "total_tokens": 5}, - }, - ) + entry = next(response_iter) + body = entry.get("body") + content = json.dumps(body).encode() if body is not None else b"" + return httpx.Response(int(entry.get("status", 200)), content=content) + + retry_cfg = cast("dict[str, Any]", call["retry"]) + backoff_seconds = float(cast("dict[str, Any]", retry_cfg.get("backoff") or {}).get("seconds", 0.0)) + retry = RetryConfig( + max_attempts=int(retry_cfg["max_attempts"]), + backoff=deterministic_backoff(backoff_seconds), + ) + messages = [ + UserMessage(content=cast("str", m["content"])) for m in cast("list[dict[str, Any]]", call["messages"]) + ] captured: list[Any] = [] disp_token = _set_active_dispatch(lambda e: captured.append(e)) - inv_token = _set_invocation_id("inv-retry") + inv_token = _set_invocation_id("inv-clr") provider = OpenAIProvider( base_url="http://test", model="gpt-test", api_key="k", transport=httpx.MockTransport(handler) ) try: - result = await provider.complete( - [UserMessage(content="go")], - retry=RetryConfig(max_attempts=2, backoff=deterministic_backoff(0.0)), - ) + if "raises" in expected: + with pytest.raises(LlmProviderError) as excinfo: + await provider.complete(messages, retry=retry) + assert excinfo.value.category == expected["raises"]["category"] + else: + result = await provider.complete(messages, retry=retry) + expected_content = cast("dict[str, Any]", expected["response"]["message"])["content"] + assert result.message.content == expected_content finally: await provider.aclose() _reset_invocation_id(inv_token) _reset_active_dispatch(disp_token) - assert result.message.content == "ok" exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) - inv_token2 = _set_invocation_id("inv-retry") + inv_token2 = _set_invocation_id("inv-clr") try: for event in captured: if isinstance(event, LlmRetryAttemptEvent): @@ -908,16 +933,21 @@ def handler(_request: httpx.Request) -> httpx.Response: _reset_invocation_id(inv_token2) observer.shutdown() - llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] - assert len(llm_spans) == 2 - spans_by_index = {dict(s.attributes or {})["openarmature.llm.attempt_index"]: s for s in llm_spans} - assert set(spans_by_index) == {0, 1} - s0 = spans_by_index[0] - assert s0.status.status_code == StatusCode.ERROR - assert dict(s0.attributes or {})["openarmature.error.category"] == "provider_unavailable" - s1 = spans_by_index[1] - assert s1.status.status_code == StatusCode.OK - assert dict(s1.attributes or {}).get("openarmature.llm.finish_reason") == "stop" + spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + expected_spans = cast("list[dict[str, Any]]", expected["llm_spans"]) + assert len(spans) == len(expected_spans) + spans_by_index = {dict(s.attributes or {})["openarmature.llm.attempt_index"]: s for s in spans} + for exp_span in expected_spans: + idx = exp_span["attempt_index"] + span = spans_by_index[idx] + span_attrs = dict(span.attributes or {}) + for key, val in cast("dict[str, Any]", exp_span.get("attributes") or {}).items(): + assert span_attrs.get(key) == val, f"attempt {idx}: {key}={span_attrs.get(key)!r} != {val!r}" + if exp_span.get("error_category"): + assert span.status.status_code == StatusCode.ERROR + assert span_attrs.get("openarmature.error.category") == exp_span["error_category"] + else: + assert span.status.status_code == StatusCode.OK # --------------------------------------------------------------------------- From 2a2f897592e2615bea29f6465e0c65a1b3ef9e45 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 21:22:03 -0700 Subject: [PATCH 07/10] Document per-attempt LLM spans and flip 0050 Flip conformance.toml [proposals."0050"] partial -> implemented (since 0.15.0): the call-level-retry per-attempt span surface now ships. Document the openarmature.llm.attempt_index attribute and the per-attempt span behavior in the observability concepts page, plus notes that span enrichers receive LlmRetryAttemptEvent on the LLM span and that the bundled provider dispatches that internal event alongside the unchanged terminal events. Add the 0.15.0 changelog section covering this work and backfilling the 0061 detached-trace invocation span (which landed without an entry), plus the v0.60.0 -> v0.61.0 spec-pin bullet. --- CHANGELOG.md | 11 +++++++ conformance.toml | 50 +++++++++++++++++--------------- docs/concepts/observability.md | 53 ++++++++++++++++++++++++++++++---- 3 files changed, 86 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4ec98a..dfac0ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to `openarmature-python` are documented in this file. The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The package follows [Semantic Versioning](https://semver.org/); pre-1.0 minor bumps may carry behavioral changes per [spec governance](https://github.com/LunarCommand/openarmature-spec/blob/main/GOVERNANCE.md). +## [0.15.0] — 2026-06-18 + +### Added + +- **Detached-trace invocation span** (proposal 0061, observability §4.4, spec v0.61.0). The OTel observer now synthesizes an `openarmature.invocation` span at the root of each detached trace (a detached subgraph and each detached fan-out instance), carrying the parent's shared `invocation_id` (detached mode is observer-side trace rendering, not a new run) and the detached unit's own `entry_node`; the detached subgraph / instance span nests under it. A raising detached subgraph surfaces ERROR plus the error category and an OTel exception event on both the parent dispatch span and the detached invocation span. This is observer-side only, with no graph-engine change; the Langfuse observer is unchanged (its Trace entity already plays the invocation-level-container role). Conformance fixtures 008 (rewritten) and 058 (newly wired) run in `test_observability`. +- **Per-attempt LLM spans under call-level retry** (proposal 0050, observability §5.5 / llm-provider §7.1). Completes proposal 0050, which shipped `partial` in v0.14.0 (failure-isolation middleware and the `complete(retry=...)` loop landed then; the per-attempt span surface was deferred). Under call-level retry the OTel observer now emits one `openarmature.llm.complete` span per attempt, each carrying `openarmature.llm.attempt_index` (0-based, 0..N-1, and 0 for a no-retry call). An intermediate failed attempt's span carries ERROR status plus its error category and the request-side attributes; the final attempt's span carries the terminal outcome and, on success, the full response surface. A python-internal `LlmRetryAttemptEvent`, dispatched once per attempt, is the sole source of the OTel span; the terminal `LlmCompletionEvent` / `LlmFailedEvent` stay one per call (payload, latency, Langfuse Generation) and no longer drive the OTel span. Langfuse renders one terminal Generation per call, with the per-attempt detail on the OTel span surface only (a spec-side §8 clarification to pin this is tracked, non-blocking). `conformance.toml` flips proposal 0050 to `implemented`; the call-level fixtures 056-058 are driven through the provider plus OTel observer and the single-attempt observability fixture 057 is wired. + +### Changed + +- **Pinned spec advances v0.60.0 → v0.61.0** (proposal 0061, the detached-trace invocation span above). A single step this cycle; `conformance.toml` records proposal 0061 as `implemented`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. + ## [0.14.0] — 2026-06-17 ### Added diff --git a/conformance.toml b/conformance.toml index b3e4063..7ead03c 100644 --- a/conformance.toml +++ b/conformance.toml @@ -372,30 +372,34 @@ status = "implemented" since = "0.13.0" # Spec v0.42.0 (proposal 0050). Retry & degradation primitives — -# failure-isolation middleware (§6.3) + call-level retry (§7). Both -# primitives implemented across the v0.14.0 cycle: -# FailureIsolationMiddleware (distinct FailureIsolatedEvent + -# CaughtException) and the call-level ``retry`` parameter on -# ``Provider.complete()`` — an in-call loop over transient §7 errors -# reusing the §6.1 RetryConfig record. ``partial`` because §7.1's -# per-attempt span surface — N ``openarmature.llm.complete`` spans + -# the ``openarmature.llm.attempt_index`` attribute — is DEFERRED: the -# python LLM span is rendered from the typed event, which is -# terminal-only per the graph-engine §6 mutual-exclusion contract, so -# per-attempt spans require a dedicated within-call sub-event -# (LlmRetryAttemptEvent) scoped to a future cycle. Call-level retry -# ships terminal-only: exactly one LlmCompletionEvent / LlmFailedEvent -# per ``complete()`` call. Failure-isolation conformance fixtures -# (058-063) are all wired + passing: the FailureIsolatedEvent's -# attempt_index reports the final / exhausting attempt per §6.3's -# lineage-correlation rule (spec ruled this in the attempt-index coord -# thread; RetryMiddleware now records the final attempt in a -# terminal-attempt scope the outer isolation reads, rather than the -# post-reset baseline). ``partial`` is now solely about the -# call-level-retry per-attempt span surface above. +# failure-isolation middleware (§6.3) + call-level retry (§7), +# including §7.1's per-attempt span surface. Implemented across the +# v0.14.0 + v0.15.0 cycles: FailureIsolationMiddleware (distinct +# FailureIsolatedEvent + CaughtException) and the call-level ``retry`` +# parameter on ``Provider.complete()`` — an in-call loop over transient +# §7 errors reusing the §6.1 RetryConfig record. §7.1's per-attempt +# span surface now ships: a call-level ``retry`` emits N +# ``openarmature.llm.complete`` spans — one per attempt — each carrying +# ``openarmature.llm.attempt_index`` (0-based, call-level, independent +# of the node-level attempt_index). A python-internal +# LlmRetryAttemptEvent dispatched once per attempt is the SOLE source of +# the OTel LLM span (including single no-retry calls, at index 0); the +# terminal LlmCompletionEvent / LlmFailedEvent stay one-per-call +# (payload, latency, Langfuse Generation, fixture-072 mutual exclusion) +# and no longer drive the OTel span. Langfuse renders one terminal +# Generation per call. llm-provider fixtures 056-058 (per-attempt +# spans) are validated in tests/unit/test_observability_otel.py through +# the provider + OTel observer; observability fixture 057 +# (single-attempt attempt_index) is wired in test_observability. +# Failure-isolation fixtures (058-063) are all wired + passing: the +# FailureIsolatedEvent's attempt_index reports the final / exhausting +# attempt per §6.3's lineage-correlation rule (spec ruled this in the +# attempt-index coord thread; RetryMiddleware now records the final +# attempt in a terminal-attempt scope the outer isolation reads, rather +# than the post-reset baseline). [proposals."0050"] -status = "partial" -since = "0.14.0" +status = "implemented" +since = "0.15.0" # Spec v0.43.0 (proposal 0051). Langfuse trace.input/trace.output # implementation-surface caveat. Purely textual: documents that the diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 26f23dd..7aa2a01 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -659,14 +659,17 @@ as nested spans. When an `OpenAIProvider` (or any [custom Provider](../model-providers/authoring.md) that wires the dispatch hook) is used inside a graph with `OTelObserver` -attached, each `provider.complete()` call emits a dedicated span named -`openarmature.llm.complete`, parented under the calling node's span. -The span carries two attribute families. +attached, each `provider.complete()` attempt emits a dedicated span +named `openarmature.llm.complete`, parented under the calling node's +span. A call without retry emits one span; a call-level `retry=` that +retries emits [one span per attempt](#per-attempt-spans-under-call-level-retry). +Each span carries two attribute families. **`openarmature.llm.*` (always on).** The framework's canonical namespace: model identifier, finish reason, token counts, prompt -identity from `with_active_prompt(...)`, error category on failure. -Set unconditionally whenever the LLM span itself emits. +identity from `with_active_prompt(...)`, error category on failure, and +`openarmature.llm.attempt_index` (the 0-based call-level attempt +counter). Set unconditionally whenever the LLM span itself emits. **`gen_ai.*` (OpenTelemetry GenAI semantic conventions, default on).** Cross-vendor attribute names every LLM-aware backend reads @@ -702,6 +705,28 @@ when an external auto-instrumentation library (OpenInference, `opentelemetry-instrumentation-openai`) is already the canonical source on your stack. +#### Per-attempt spans under call-level retry + +[Call-level retry](llms.md#retrying-transient-failures) +(`provider.complete(retry=...)`) retries transient provider errors +inside a single call. Each attempt emits its own +`openarmature.llm.complete` span tagged with +`openarmature.llm.attempt_index` (0-based). A call that succeeds on the +first try emits one span at `attempt_index` 0; a call that fails twice +transiently before succeeding emits three spans (indices 0, 1, 2). Each +failed attempt's span carries `ERROR` status plus +`openarmature.error.category`; the final attempt's span carries the +terminal outcome (`OK` on success, `ERROR` on an exhausted or +non-transient failure). + +`openarmature.llm.attempt_index` is the **call-level** attempt counter, +[independent of the node-level `attempt_index`](llms.md#call-level-vs-node-level-retry): +the former counts attempts inside one `complete()` call, the latter +counts node re-executions driven by retry *middleware*. A node retried +once by middleware, each execution calling a provider that itself +retries once, produces node `attempt_index` 0/1 and, within each, +call-level `attempt_index` 0/1. + ### LLM payload attributes By default, LLM spans do **not** carry the messages sent or the @@ -834,6 +859,14 @@ correctly; doing it from a `SpanProcessor.on_end` callback does not, because the framework has already called `span.end()` and the OTel SDK silently drops `set_attribute` on ended spans. +For the `openarmature.llm.complete` span the close event is an +`LlmRetryAttemptEvent` (one per attempt) rather than a `NodeEvent`; +that is the per-attempt event the observer renders the LLM span from. +An enricher scoped to that span (`span.name == +"openarmature.llm.complete"`) can read the attempt's outcome straight +off it: `event.llm_attempt_index`, `event.error_category`, +`event.usage`, `event.finish_reason`, and so on. + Exceptions raised by an enricher are caught and warned, never propagated. @@ -880,6 +913,16 @@ via `current_dispatch()`. See [Authoring providers](../model-providers/authoring.md) for the full pattern. +Under [call-level retry](#per-attempt-spans-under-call-level-retry) the +bundled `OpenAIProvider` additionally dispatches a python-internal +`LlmRetryAttemptEvent` once per attempt; that is the event the OTel +observer renders each per-attempt span from (including the lone attempt +of a no-retry call, at index 0). The terminal `LlmCompletionEvent` / +`LlmFailedEvent` above are unchanged: still one per call, still the +stable surface for per-call consumption (token accounting, failure +tracking). An observer that only cares about per-call outcomes can +ignore `LlmRetryAttemptEvent`. + #### Legacy sentinel-namespace pattern (compatibility surface) `openarmature.observability.LLM_NAMESPACE` and From f848f0d9dbfd558c52f9f13fa785a077ce1bcedb Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 21:22:12 -0700 Subject: [PATCH 08/10] Deduplicate per-attempt LLM event builder _build_llm_retry_attempt_event constructed a full LlmRetryAttemptEvent twice, repeating ~18 shared identity, scoping, and request-side fields across the success and failure branches. Hoist them into one base dict and splat it, leaving each branch to add only its outcome fields. No behavior change. --- src/openarmature/llm/providers/openai.py | 60 ++++++++++-------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 2a5d66b..32eb00d 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -800,56 +800,44 @@ def _build_llm_retry_attempt_event( caller_metadata: Mapping[str, AttributeValue] | None = None if self._populate_caller_metadata: caller_metadata = dict(current_invocation_metadata()) + # Identity / scoping + request-side fields are shared; only the + # outcome fields differ between the success and failure branches. + base: dict[str, Any] = { + "invocation_id": invocation_id, + "correlation_id": current_correlation_id(), + "node_name": node_name, + "namespace": namespace, + "attempt_index": current_attempt_index(), + "fan_out_index": current_fan_out_index(), + "branch_name": current_branch_name(), + "provider": self._genai_system, + "model": self.model, + "call_id": call_id, + "llm_attempt_index": llm_attempt_index, + "latency_ms": latency_ms, + "input_messages": input_messages, + "request_params": request_params, + "request_extras": request_extras, + "active_prompt": active_prompt, + "active_prompt_group": active_prompt_group, + "caller_invocation_metadata": caller_metadata, + } if response is not None: return LlmRetryAttemptEvent( - invocation_id=invocation_id, - correlation_id=current_correlation_id(), - node_name=node_name, - namespace=namespace, - attempt_index=current_attempt_index(), - fan_out_index=current_fan_out_index(), - branch_name=current_branch_name(), - provider=self._genai_system, - model=self.model, - call_id=call_id, - llm_attempt_index=llm_attempt_index, - latency_ms=latency_ms, - input_messages=input_messages, - request_params=request_params, - request_extras=request_extras, - active_prompt=active_prompt, - active_prompt_group=active_prompt_group, + **base, response_id=response.response_id, response_model=response.response_model, usage=response.usage, finish_reason=response.finish_reason, output_content=response.message.content or None, - caller_invocation_metadata=caller_metadata, ) if exc is None: raise ValueError("_build_llm_retry_attempt_event requires response or exc") return LlmRetryAttemptEvent( - invocation_id=invocation_id, - correlation_id=current_correlation_id(), - node_name=node_name, - namespace=namespace, - attempt_index=current_attempt_index(), - fan_out_index=current_fan_out_index(), - branch_name=current_branch_name(), - provider=self._genai_system, - model=self.model, - call_id=call_id, - llm_attempt_index=llm_attempt_index, - latency_ms=latency_ms, - input_messages=input_messages, - request_params=request_params, - request_extras=request_extras, - active_prompt=active_prompt, - active_prompt_group=active_prompt_group, + **base, error_category=exc.category, error_type=type(exc).__name__, error_message=str(exc), - caller_invocation_metadata=caller_metadata, ) async def _do_complete( From 114d1323d8cc90f57cf2feff1bf3a5bb4ebbc6fd Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Thu, 18 Jun 2026 21:22:33 -0700 Subject: [PATCH 09/10] Test OTel observer ignores terminal LLM events The OTel observer now renders the LLM span solely from the per-attempt LlmRetryAttemptEvent; terminal LlmCompletionEvent / LlmFailedEvent are ignored. Add a regression test feeding both terminal events and asserting zero openarmature.llm.complete spans, guarding against reintroducing the terminal-event span path. Also fix a stale docstring in _drive_llm_span_with_cached_tokens that still referenced "typed LlmCompletionEvent". --- tests/unit/test_observability_otel.py | 32 ++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index c70a0b6..52132e1 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -584,12 +584,42 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None assert not any(k.startswith("openarmature.prompt.") for k in attrs) +async def test_otel_observer_ignores_terminal_llm_events() -> None: + """Feeding a terminal LlmCompletionEvent or LlmFailedEvent to the + OTel observer produces no ``openarmature.llm.complete`` span; the + per-attempt event is the sole span source.""" + # Proposal 0050: the OTel span renders only from LlmRetryAttemptEvent. + # The terminal events stay on the queue for the Langfuse mapping + + # payload consumers; this guards against reintroducing the + # terminal-event span path (which would double-emit alongside the + # per-attempt span). + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_failed_event, make_typed_event + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + + token = _set_invocation_id("inv-terminal") + try: + await observer(make_typed_event()) + await observer(make_failed_event()) + finally: + _reset_invocation_id(token) + observer.shutdown() + + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert llm_spans == [] + + async def _drive_llm_span_with_cached_tokens( *, cached_tokens: int | None, cache_creation_tokens: int | None = None, ) -> dict[str, Any]: - """Drive the OTel observer through a typed LlmCompletionEvent + """Drive the OTel observer through a per-attempt LlmRetryAttemptEvent carrying the supplied cache-stat fields on the event's Usage record. Returns the LLM-span's attribute map. """ From 89e11335a4dde2389a96698743485c4820ef87fe Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Fri, 19 Jun 2026 09:15:20 -0700 Subject: [PATCH 10/10] Re-export LlmRetryAttemptEvent; isinstance filter PR #170 CoPilot review: - Re-export LlmRetryAttemptEvent from the openarmature.graph package (import block + __all__), matching the sibling LlmCompletionEvent / LlmFailedEvent so the documented observer import path works. - Replace the brittle type(event).__name__ name match with an isinstance check in the conformance _TypedEventCollector; the filter_event_type string comparison stays as-is. --- src/openarmature/graph/__init__.py | 2 ++ tests/conformance/test_observability.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/openarmature/graph/__init__.py b/src/openarmature/graph/__init__.py index 6d071f8..8938300 100644 --- a/src/openarmature/graph/__init__.py +++ b/src/openarmature/graph/__init__.py @@ -44,6 +44,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -101,6 +102,7 @@ "InvocationStartedEvent", "LlmCompletionEvent", "LlmFailedEvent", + "LlmRetryAttemptEvent", "MappingReferencesUndeclaredField", "MetadataAugmentationEvent", "Middleware", diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index f55dd22..06943e9 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -2956,7 +2956,9 @@ async def __call__(self, event: Any) -> None: # per-attempt span surface), not a spec-normative observer # event, so the conformance collector excludes it from the # captured stream that spec fixtures assert against. - if type(event).__name__ == "LlmRetryAttemptEvent": + from openarmature.graph import LlmRetryAttemptEvent # noqa: PLC0415 + + if isinstance(event, LlmRetryAttemptEvent): return if self.filter_event_type is not None: if type(event).__name__ != self.filter_event_type: