diff --git a/CHANGELOG.md b/CHANGELOG.md index c4ec98a..dfac0ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to `openarmature-python` are documented in this file. The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The package follows [Semantic Versioning](https://semver.org/); pre-1.0 minor bumps may carry behavioral changes per [spec governance](https://github.com/LunarCommand/openarmature-spec/blob/main/GOVERNANCE.md). +## [0.15.0] — 2026-06-18 + +### Added + +- **Detached-trace invocation span** (proposal 0061, observability §4.4, spec v0.61.0). The OTel observer now synthesizes an `openarmature.invocation` span at the root of each detached trace (a detached subgraph and each detached fan-out instance), carrying the parent's shared `invocation_id` (detached mode is observer-side trace rendering, not a new run) and the detached unit's own `entry_node`; the detached subgraph / instance span nests under it. A raising detached subgraph surfaces ERROR plus the error category and an OTel exception event on both the parent dispatch span and the detached invocation span. This is observer-side only, with no graph-engine change; the Langfuse observer is unchanged (its Trace entity already plays the invocation-level-container role). Conformance fixtures 008 (rewritten) and 058 (newly wired) run in `test_observability`. +- **Per-attempt LLM spans under call-level retry** (proposal 0050, observability §5.5 / llm-provider §7.1). Completes proposal 0050, which shipped `partial` in v0.14.0 (failure-isolation middleware and the `complete(retry=...)` loop landed then; the per-attempt span surface was deferred). Under call-level retry the OTel observer now emits one `openarmature.llm.complete` span per attempt, each carrying `openarmature.llm.attempt_index` (0-based, 0..N-1, and 0 for a no-retry call). An intermediate failed attempt's span carries ERROR status plus its error category and the request-side attributes; the final attempt's span carries the terminal outcome and, on success, the full response surface. A python-internal `LlmRetryAttemptEvent`, dispatched once per attempt, is the sole source of the OTel span; the terminal `LlmCompletionEvent` / `LlmFailedEvent` stay one per call (payload, latency, Langfuse Generation) and no longer drive the OTel span. Langfuse renders one terminal Generation per call, with the per-attempt detail on the OTel span surface only (a spec-side §8 clarification to pin this is tracked, non-blocking). `conformance.toml` flips proposal 0050 to `implemented`; the call-level fixtures 056-058 are driven through the provider plus OTel observer and the single-attempt observability fixture 057 is wired. + +### Changed + +- **Pinned spec advances v0.60.0 → v0.61.0** (proposal 0061, the detached-trace invocation span above). A single step this cycle; `conformance.toml` records proposal 0061 as `implemented`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. + ## [0.14.0] — 2026-06-17 ### Added diff --git a/conformance.toml b/conformance.toml index b3e4063..7ead03c 100644 --- a/conformance.toml +++ b/conformance.toml @@ -372,30 +372,34 @@ status = "implemented" since = "0.13.0" # Spec v0.42.0 (proposal 0050). Retry & degradation primitives — -# failure-isolation middleware (§6.3) + call-level retry (§7). Both -# primitives implemented across the v0.14.0 cycle: -# FailureIsolationMiddleware (distinct FailureIsolatedEvent + -# CaughtException) and the call-level ``retry`` parameter on -# ``Provider.complete()`` — an in-call loop over transient §7 errors -# reusing the §6.1 RetryConfig record. ``partial`` because §7.1's -# per-attempt span surface — N ``openarmature.llm.complete`` spans + -# the ``openarmature.llm.attempt_index`` attribute — is DEFERRED: the -# python LLM span is rendered from the typed event, which is -# terminal-only per the graph-engine §6 mutual-exclusion contract, so -# per-attempt spans require a dedicated within-call sub-event -# (LlmRetryAttemptEvent) scoped to a future cycle. Call-level retry -# ships terminal-only: exactly one LlmCompletionEvent / LlmFailedEvent -# per ``complete()`` call. Failure-isolation conformance fixtures -# (058-063) are all wired + passing: the FailureIsolatedEvent's -# attempt_index reports the final / exhausting attempt per §6.3's -# lineage-correlation rule (spec ruled this in the attempt-index coord -# thread; RetryMiddleware now records the final attempt in a -# terminal-attempt scope the outer isolation reads, rather than the -# post-reset baseline). ``partial`` is now solely about the -# call-level-retry per-attempt span surface above. +# failure-isolation middleware (§6.3) + call-level retry (§7), +# including §7.1's per-attempt span surface. Implemented across the +# v0.14.0 + v0.15.0 cycles: FailureIsolationMiddleware (distinct +# FailureIsolatedEvent + CaughtException) and the call-level ``retry`` +# parameter on ``Provider.complete()`` — an in-call loop over transient +# §7 errors reusing the §6.1 RetryConfig record. §7.1's per-attempt +# span surface now ships: a call-level ``retry`` emits N +# ``openarmature.llm.complete`` spans — one per attempt — each carrying +# ``openarmature.llm.attempt_index`` (0-based, call-level, independent +# of the node-level attempt_index). A python-internal +# LlmRetryAttemptEvent dispatched once per attempt is the SOLE source of +# the OTel LLM span (including single no-retry calls, at index 0); the +# terminal LlmCompletionEvent / LlmFailedEvent stay one-per-call +# (payload, latency, Langfuse Generation, fixture-072 mutual exclusion) +# and no longer drive the OTel span. Langfuse renders one terminal +# Generation per call. llm-provider fixtures 056-058 (per-attempt +# spans) are validated in tests/unit/test_observability_otel.py through +# the provider + OTel observer; observability fixture 057 +# (single-attempt attempt_index) is wired in test_observability. +# Failure-isolation fixtures (058-063) are all wired + passing: the +# FailureIsolatedEvent's attempt_index reports the final / exhausting +# attempt per §6.3's lineage-correlation rule (spec ruled this in the +# attempt-index coord thread; RetryMiddleware now records the final +# attempt in a terminal-attempt scope the outer isolation reads, rather +# than the post-reset baseline). [proposals."0050"] -status = "partial" -since = "0.14.0" +status = "implemented" +since = "0.15.0" # Spec v0.43.0 (proposal 0051). Langfuse trace.input/trace.output # implementation-surface caveat. Purely textual: documents that the diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 26f23dd..7aa2a01 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -659,14 +659,17 @@ as nested spans. When an `OpenAIProvider` (or any [custom Provider](../model-providers/authoring.md) that wires the dispatch hook) is used inside a graph with `OTelObserver` -attached, each `provider.complete()` call emits a dedicated span named -`openarmature.llm.complete`, parented under the calling node's span. -The span carries two attribute families. +attached, each `provider.complete()` attempt emits a dedicated span +named `openarmature.llm.complete`, parented under the calling node's +span. A call without retry emits one span; a call-level `retry=` that +retries emits [one span per attempt](#per-attempt-spans-under-call-level-retry). +Each span carries two attribute families. **`openarmature.llm.*` (always on).** The framework's canonical namespace: model identifier, finish reason, token counts, prompt -identity from `with_active_prompt(...)`, error category on failure. -Set unconditionally whenever the LLM span itself emits. +identity from `with_active_prompt(...)`, error category on failure, and +`openarmature.llm.attempt_index` (the 0-based call-level attempt +counter). Set unconditionally whenever the LLM span itself emits. **`gen_ai.*` (OpenTelemetry GenAI semantic conventions, default on).** Cross-vendor attribute names every LLM-aware backend reads @@ -702,6 +705,28 @@ when an external auto-instrumentation library (OpenInference, `opentelemetry-instrumentation-openai`) is already the canonical source on your stack. +#### Per-attempt spans under call-level retry + +[Call-level retry](llms.md#retrying-transient-failures) +(`provider.complete(retry=...)`) retries transient provider errors +inside a single call. Each attempt emits its own +`openarmature.llm.complete` span tagged with +`openarmature.llm.attempt_index` (0-based). A call that succeeds on the +first try emits one span at `attempt_index` 0; a call that fails twice +transiently before succeeding emits three spans (indices 0, 1, 2). Each +failed attempt's span carries `ERROR` status plus +`openarmature.error.category`; the final attempt's span carries the +terminal outcome (`OK` on success, `ERROR` on an exhausted or +non-transient failure). + +`openarmature.llm.attempt_index` is the **call-level** attempt counter, +[independent of the node-level `attempt_index`](llms.md#call-level-vs-node-level-retry): +the former counts attempts inside one `complete()` call, the latter +counts node re-executions driven by retry *middleware*. A node retried +once by middleware, each execution calling a provider that itself +retries once, produces node `attempt_index` 0/1 and, within each, +call-level `attempt_index` 0/1. + ### LLM payload attributes By default, LLM spans do **not** carry the messages sent or the @@ -834,6 +859,14 @@ correctly; doing it from a `SpanProcessor.on_end` callback does not, because the framework has already called `span.end()` and the OTel SDK silently drops `set_attribute` on ended spans. +For the `openarmature.llm.complete` span the close event is an +`LlmRetryAttemptEvent` (one per attempt) rather than a `NodeEvent`; +that is the per-attempt event the observer renders the LLM span from. +An enricher scoped to that span (`span.name == +"openarmature.llm.complete"`) can read the attempt's outcome straight +off it: `event.llm_attempt_index`, `event.error_category`, +`event.usage`, `event.finish_reason`, and so on. + Exceptions raised by an enricher are caught and warned, never propagated. @@ -880,6 +913,16 @@ via `current_dispatch()`. See [Authoring providers](../model-providers/authoring.md) for the full pattern. +Under [call-level retry](#per-attempt-spans-under-call-level-retry) the +bundled `OpenAIProvider` additionally dispatches a python-internal +`LlmRetryAttemptEvent` once per attempt; that is the event the OTel +observer renders each per-attempt span from (including the lone attempt +of a no-retry call, at index 0). The terminal `LlmCompletionEvent` / +`LlmFailedEvent` above are unchanged: still one per call, still the +stable surface for per-call consumption (token accounting, failure +tracking). An observer that only cares about per-call outcomes can +ignore `LlmRetryAttemptEvent`. + #### Legacy sentinel-namespace pattern (compatibility surface) `openarmature.observability.LLM_NAMESPACE` and diff --git a/src/openarmature/graph/__init__.py b/src/openarmature/graph/__init__.py index 6d071f8..8938300 100644 --- a/src/openarmature/graph/__init__.py +++ b/src/openarmature/graph/__init__.py @@ -44,6 +44,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -101,6 +102,7 @@ "InvocationStartedEvent", "LlmCompletionEvent", "LlmFailedEvent", + "LlmRetryAttemptEvent", "MappingReferencesUndeclaredField", "MetadataAugmentationEvent", "Middleware", diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index 09b3b50..2171ca1 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -656,6 +656,72 @@ class LlmFailedEvent: caller_invocation_metadata: Mapping[str, AttributeValue] | None = None +# Python-internal per-attempt LLM event. NOT a spec-normative event type +# (unlike LlmCompletionEvent / LlmFailedEvent): it is the observer-side +# vehicle for the observability §5.5 per-attempt span surface under +# llm-provider §7.1 call-level retry. One is dispatched per in-call +# attempt (including the single attempt of a no-retry call); the OTel +# observer renders one openarmature.llm.complete span from each, while +# the terminal LlmCompletionEvent / LlmFailedEvent stay one-per-call +# (payload/latency, Langfuse mapping, the fixture-072 mutual exclusion). +@dataclass(frozen=True) +class LlmRetryAttemptEvent: + """One LLM-call attempt delivered to observers for per-attempt span + rendering. + + Carries the full request-side surface plus that attempt's outcome. + ``error_category`` discriminates the outcome: ``None`` for a + successful attempt (the response-side fields are populated), a + category string for a failed attempt (the response-side fields are + ``None`` — no response was received). + + Field set: + + - ``llm_attempt_index``: the call-level retry-attempt index, ``0`` + for the first attempt and ``0..N-1`` across the N attempts of a + call-level retry. Distinct from ``attempt_index`` (the node-level + retry index used for calling-span resolution); the two are + independent. + - identity / scoping (``invocation_id`` ... ``call_id``) and the + request side (``input_messages`` / ``request_params`` / + ``request_extras`` / ``active_prompt`` / ``active_prompt_group``) + mirror :class:`LlmCompletionEvent`, carried on every attempt. + - response side (``response_id`` / ``response_model`` / ``usage`` / + ``finish_reason`` / ``output_content``): populated on a successful + attempt; ``None`` on a failed attempt. + - failure side (``error_category`` / ``error_message`` / + ``error_type``): populated on a failed attempt; ``None`` on a + successful one. + """ + + invocation_id: str + correlation_id: str | None + node_name: str + namespace: tuple[str, ...] + attempt_index: int + fan_out_index: int | None + branch_name: str | None + provider: str + model: str + call_id: str + llm_attempt_index: int + latency_ms: float | None + input_messages: list[dict[str, Any]] + request_params: Mapping[str, Any] + request_extras: Mapping[str, Any] + active_prompt: Any + active_prompt_group: Any + response_id: str | None = None + response_model: str | None = None + usage: "Usage | None" = None + finish_reason: str | None = None + output_content: str | None = None + error_category: str | None = None + error_message: str | None = None + error_type: str | None = None + caller_invocation_metadata: Mapping[str, AttributeValue] | None = None + + # Spec: pipeline-utilities §6.3 cause chain (proposal 0068). A ``carrier`` # link is a graph-engine §4 ``node_exception`` wrapper the engine applies at a # non-node placement (§9.7 instance / §11.7 branch / §9.6 / §11.6 parent-node @@ -758,6 +824,7 @@ class FailureIsolatedEvent: "InvocationStartedEvent", "LlmCompletionEvent", "LlmFailedEvent", + "LlmRetryAttemptEvent", "MetadataAugmentationEvent", "NodeEvent", "ParallelBranchesEventConfig", diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index ce5da36..da6b3e7 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -40,6 +40,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -55,6 +56,9 @@ # typed LLM provider call event, dispatched on every successful LLM # completion), LlmFailedEvent (proposal 0058 typed LLM failure event, # dispatched alongside the §7 exception when provider.complete raises), +# LlmRetryAttemptEvent (proposal 0050 per-attempt LLM span event, +# python-internal, dispatched once per in-call attempt under call-level +# retry to drive the per-attempt OTel span surface), # and FailureIsolatedEvent (proposal 0050 §6.3 framework-emitted event, # dispatched by FailureIsolationMiddleware when it catches an exception # escaping the inner chain and substitutes a degraded partial update). @@ -65,6 +69,7 @@ | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ) diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 8c86cb4..32eb00d 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -56,7 +56,7 @@ import re import time import uuid -from collections.abc import Mapping, Sequence +from collections.abc import Callable, Mapping, Sequence from typing import TYPE_CHECKING, Any, Literal, cast from urllib.parse import urlparse @@ -64,7 +64,11 @@ import jsonschema from pydantic import BaseModel, ValidationError -from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent +from openarmature.graph.events import ( + LlmCompletionEvent, + LlmFailedEvent, + LlmRetryAttemptEvent, +) from openarmature.observability.correlation import ( current_attempt_index, current_branch_name, @@ -381,9 +385,12 @@ async def complete( backoff (defaulting to the canonical transient categories with exponential jittered backoff). The request is built and validated once; pre-send validation errors are never retried. - Exactly one observability event fires for the call's terminal - outcome regardless of attempt count, and its ``latency_ms`` - covers the whole call, retries and backoff included. The + Exactly one terminal observability event (LlmCompletionEvent or + LlmFailedEvent) fires for the call's terminal outcome regardless + of attempt count, and its ``latency_ms`` covers the whole call, + retries and backoff included. A per-attempt LlmRetryAttemptEvent + also fires for each attempt (driving the per-attempt LLM span + surface), each carrying just that attempt's latency. The ``on_retry`` hook is not exception-isolated (mirroring ``RetryMiddleware``); an exception raised by it propagates out of the call. @@ -481,7 +488,38 @@ async def complete( include_response_format=(schema_dict is None or not self._force_prompt_augmentation_fallback), tool_choice=tool_choice, ) - response = await self._do_complete_with_retry(body, schema_dict, schema_class, retry) + + # Per-attempt LLM span surface (observability §5.5 under + # llm-provider §7.1 call-level retry): dispatch one + # LlmRetryAttemptEvent per attempt so the observer can render + # one openarmature.llm.complete span per attempt. No-op + # outside an invocation (``dispatch`` is None). + def emit_attempt( + idx: int, + attempt_latency_ms: float, + attempt_response: Response | None, + attempt_exc: LlmProviderError | None, + ) -> None: + if dispatch is None: + return + dispatch( + self._build_llm_retry_attempt_event( + llm_attempt_index=idx, + latency_ms=attempt_latency_ms, + call_id=call_id, + input_messages=serialized_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + response=attempt_response, + exc=attempt_exc, + ), + ) + + response = await self._do_complete_with_retry( + body, schema_dict, schema_class, retry, emit_attempt + ) except LlmProviderError as exc: # Failure path: dispatch a typed LlmFailedEvent per # proposal 0058. Only §7 category exceptions @@ -533,18 +571,30 @@ async def _do_complete_with_retry( schema_dict: dict[str, Any] | None, schema_class: type[BaseModel] | None, retry: RetryConfig | None, + emit_attempt: Callable[[int, float, Response | None, LlmProviderError | None], None] | None = None, ) -> Response: """Run the wire call with optional call-level retry. Loops the underlying wire call on transient provider errors per - the retry config. Intermediate transient attempts are caught - here and emit no observability event; only the terminal outcome - (success, retry exhaustion, or a non-transient error) reaches - ``complete()``'s typed-event dispatch, so exactly one event - fires per ``complete()`` call. + the retry config. ``emit_attempt`` (when supplied) fires once + per attempt with that attempt's index, latency, and outcome + (response on success, exception on failure) so the caller can + dispatch the per-attempt LLM span event. The terminal outcome + (success, retry exhaustion, or a non-transient error) is what + reaches ``complete()`` for the single terminal typed event, so + exactly one terminal event still fires per ``complete()`` call. """ if retry is None: - return await self._do_complete(body, schema_dict, schema_class) + attempt_start = time.perf_counter() + try: + response = await self._do_complete(body, schema_dict, schema_class) + except LlmProviderError as exc: + if emit_attempt is not None: + emit_attempt(0, (time.perf_counter() - attempt_start) * 1000.0, None, exc) + raise + if emit_attempt is not None: + emit_attempt(0, (time.perf_counter() - attempt_start) * 1000.0, response, None) + return response # Lazy import avoids a module-load cycle: graph.middleware.retry # imports llm.errors. Resolve None config fields to the canonical # defaults, mirroring RetryMiddleware. @@ -557,9 +607,15 @@ async def _do_complete_with_retry( backoff = retry.backoff or exponential_jitter_backoff attempt = 0 while True: + attempt_start = time.perf_counter() try: - return await self._do_complete(body, schema_dict, schema_class) + response = await self._do_complete(body, schema_dict, schema_class) except LlmProviderError as exc: + # Per-attempt event fires for every attempt, including + # this failed one (the terminal failed attempt's span + # carries the final category). + if emit_attempt is not None: + emit_attempt(attempt, (time.perf_counter() - attempt_start) * 1000.0, None, exc) # No graph state at the call boundary; pass None (the # default classifier ignores it). Re-raise on exhaustion # or a non-transient category so complete() emits the @@ -572,6 +628,10 @@ async def _do_complete_with_retry( await retry.on_retry(exc, attempt) await asyncio.sleep(backoff(attempt)) attempt += 1 + continue + if emit_attempt is not None: + emit_attempt(attempt, (time.perf_counter() - attempt_start) * 1000.0, response, None) + return response def _build_llm_completion_event( self, @@ -711,6 +771,75 @@ def _build_llm_failed_event( caller_invocation_metadata=caller_metadata, ) + def _build_llm_retry_attempt_event( + self, + *, + llm_attempt_index: int, + latency_ms: float, + call_id: str, + input_messages: list[dict[str, Any]], + request_params: dict[str, Any], + request_extras: dict[str, Any], + active_prompt: Any, + active_prompt_group: Any, + response: Response | None = None, + exc: LlmProviderError | None = None, + ) -> LlmRetryAttemptEvent: + """Construct an LlmRetryAttemptEvent for one in-call attempt. + + Identity / scoping + request-side fields mirror the terminal + events; the outcome comes from ``response`` (a successful + attempt) or ``exc`` (a failed attempt). Exactly one of the two + is supplied. ``llm_attempt_index`` is the call-level retry + index, distinct from the node-level ``attempt_index`` sourced + from the ContextVar. + """ + namespace = current_namespace_prefix() + node_name = namespace[-1] if namespace else "" + invocation_id = current_invocation_id() or "" + caller_metadata: Mapping[str, AttributeValue] | None = None + if self._populate_caller_metadata: + caller_metadata = dict(current_invocation_metadata()) + # Identity / scoping + request-side fields are shared; only the + # outcome fields differ between the success and failure branches. + base: dict[str, Any] = { + "invocation_id": invocation_id, + "correlation_id": current_correlation_id(), + "node_name": node_name, + "namespace": namespace, + "attempt_index": current_attempt_index(), + "fan_out_index": current_fan_out_index(), + "branch_name": current_branch_name(), + "provider": self._genai_system, + "model": self.model, + "call_id": call_id, + "llm_attempt_index": llm_attempt_index, + "latency_ms": latency_ms, + "input_messages": input_messages, + "request_params": request_params, + "request_extras": request_extras, + "active_prompt": active_prompt, + "active_prompt_group": active_prompt_group, + "caller_invocation_metadata": caller_metadata, + } + if response is not None: + return LlmRetryAttemptEvent( + **base, + response_id=response.response_id, + response_model=response.response_model, + usage=response.usage, + finish_reason=response.finish_reason, + output_content=response.message.content or None, + ) + if exc is None: + raise ValueError("_build_llm_retry_attempt_event requires response or exc") + return LlmRetryAttemptEvent( + **base, + error_category=exc.category, + error_type=type(exc).__name__, + error_message=str(exc), + ) + async def _do_complete( self, body: dict[str, Any], diff --git a/src/openarmature/observability/correlation.py b/src/openarmature/observability/correlation.py index 7b6660f..ba83e00 100644 --- a/src/openarmature/observability/correlation.py +++ b/src/openarmature/observability/correlation.py @@ -42,6 +42,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -227,6 +228,7 @@ def _reset_active_observers(token: Token[tuple[SubscribedObserver, ...]]) -> Non | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -244,6 +246,7 @@ def current_dispatch() -> ( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -272,6 +275,7 @@ def _set_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -285,6 +289,7 @@ def _set_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, @@ -306,6 +311,7 @@ def _reset_active_dispatch( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ], None, diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 587641f..08b4a7e 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -35,6 +35,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -368,6 +369,7 @@ async def __call__( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ), ) -> None: @@ -377,6 +379,12 @@ async def __call__( if isinstance(event, InvocationCompletedEvent): self._handle_invocation_completed(event) return + # Proposal 0050 per-attempt LLM events are OTel-span-only: the + # Langfuse mapping renders one Generation per call from the + # terminal LlmCompletionEvent / LlmFailedEvent, so the + # per-attempt event is ignored here. + if isinstance(event, LlmRetryAttemptEvent): + return # Proposal 0049 typed LlmCompletionEvent (success path). Drives # the §5.5 Generation observation lifecycle for successful # provider calls. diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 35a2929..cb45ec9 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -103,6 +103,7 @@ InvocationStartedEvent, LlmCompletionEvent, LlmFailedEvent, + LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -475,7 +476,7 @@ class OTelObserver: # close sites (subgraph dispatch, detached root, fan-out instance, # invocation span, shutdown drain). attribute_enrichers: Sequence[ - Callable[[Span, NodeEvent | LlmCompletionEvent | LlmFailedEvent | None], None] + Callable[[Span, NodeEvent | LlmCompletionEvent | LlmFailedEvent | LlmRetryAttemptEvent | None], None] ] = () # Read from the package's ``__spec_version__`` (one of the three # places the spec version is pinned per CLAUDE.md). Bumping the @@ -548,7 +549,7 @@ def __post_init__(self) -> None: # ``event`` is None on synthetic close sites (subgraph dispatch, # detached root, fan-out instance, invocation span, orphan drain). def _run_enrichers( - self, span: Span, event: NodeEvent | LlmCompletionEvent | LlmFailedEvent | None + self, span: Span, event: NodeEvent | LlmCompletionEvent | LlmFailedEvent | LlmRetryAttemptEvent | None ) -> None: """Invoke configured enrichers against ``span`` before ``span.end()`` is called.""" @@ -594,6 +595,7 @@ async def __call__( | InvocationCompletedEvent | LlmCompletionEvent | LlmFailedEvent + | LlmRetryAttemptEvent | FailureIsolatedEvent ), ) -> None: @@ -604,20 +606,19 @@ async def __call__( # before any node-specific logic runs. if isinstance(event, InvocationStartedEvent | InvocationCompletedEvent): return - # Proposal 0049 typed LlmCompletionEvent (success path). - # Drives the openarmature.llm.complete span lifecycle for - # successful provider calls. - if isinstance(event, LlmCompletionEvent): + # Proposal 0050 per-attempt LLM span surface: the + # openarmature.llm.complete span(s) render from the per-attempt + # LlmRetryAttemptEvent — one span per attempt under call-level + # retry (attempt_index 0..N-1), one for a no-retry call. + if isinstance(event, LlmRetryAttemptEvent): if not self.disable_llm_spans: - self._handle_typed_llm_completion(event) + self._handle_typed_llm_retry_attempt(event) return - # Proposal 0058 typed LlmFailedEvent (failure path). Drives - # the same openarmature.llm.complete span lifecycle for failed - # provider calls, with ERROR status + openarmature.error.category - # attribute. - if isinstance(event, LlmFailedEvent): - if not self.disable_llm_spans: - self._handle_typed_llm_failed(event) + # The terminal LlmCompletionEvent / LlmFailedEvent no longer + # drive the OTel span (the per-attempt event does); they stay on + # the queue for the Langfuse mapping and payload/latency + # consumers, so the OTel observer ignores them here. + if isinstance(event, LlmCompletionEvent | LlmFailedEvent): return # Proposal 0050 §6.3 framework-emitted failure-isolation event. if isinstance(event, FailureIsolatedEvent): @@ -1180,9 +1181,17 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: # active_prompt_group snapshots taken at dispatch time — NOT the # ContextVar. The dispatch worker's task-local Context doesn't see # node-body ContextVar writes. - def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: - """Open + close the ``openarmature.llm.complete`` span from the - typed LlmCompletionEvent (success path).""" + def _handle_typed_llm_retry_attempt(self, event: LlmRetryAttemptEvent) -> None: + """Open + close one ``openarmature.llm.complete`` span from a + per-attempt LlmRetryAttemptEvent. + + N call-level retry attempts produce N spans, all parented under + the calling node span, each carrying + ``openarmature.llm.attempt_index`` 0..N-1. A successful attempt + (``error_category is None``) carries the full response surface + with OK status; a failed attempt carries ERROR status + the + error category and no response attributes. + """ # Mid-call metadata augmentation can't reach this span: the # typed event arrives only after complete() returns, and the # span is back-dated past any augmentation event that fired @@ -1216,7 +1225,10 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: calling_fan_out_index=event.fan_out_index, calling_branch_name=event.branch_name, ) - attrs: dict[str, Any] = {"openarmature.llm.model": event.model} + attrs: dict[str, Any] = { + "openarmature.llm.model": event.model, + "openarmature.llm.attempt_index": event.llm_attempt_index, + } cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid @@ -1268,6 +1280,14 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: attributes=attrs, start_time=start_time_ns, ) + if event.error_category is not None: + # Failed attempt: ERROR + the §4 category, no response + # attributes (no response was received). + span.set_status(Status(StatusCode.ERROR, description=event.error_category)) + span.set_attribute("openarmature.error.category", event.error_category) + self._run_enrichers(span, event) + span.end(end_time=end_time_ns) + return usage = event.usage if event.finish_reason is not None: span.set_attribute("openarmature.llm.finish_reason", event.finish_reason) @@ -1311,100 +1331,6 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: self._run_enrichers(span, event) span.end(end_time=end_time_ns) - def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: - """Open + close the ``openarmature.llm.complete`` span from the - typed LlmFailedEvent (failure path). Same span shape as the - success path with ERROR status + - ``openarmature.error.category`` attribute attached.""" - from openarmature.observability.correlation import ( - current_correlation_id, - current_invocation_id, - ) - - invocation_id = current_invocation_id() - if invocation_id is None: - return - inv_state = self._inv_state_for(invocation_id) - # Back-date start_time using latency_ms (mirrors the success- - # path handler). For failures, latency reflects time spent - # until the §7 exception was raised — useful for diagnosing - # whether failures are fast-failing (pre-send validation) or - # slow-failing (provider timeout). - end_time_ns = time.time_ns() - if event.latency_ms is not None: - start_time_ns = end_time_ns - int(event.latency_ms * 1_000_000) - else: - start_time_ns = end_time_ns - parent_ctx = self._resolve_llm_parent( - inv_state, - invocation_id, - calling_namespace_prefix=event.namespace, - calling_attempt_index=event.attempt_index, - calling_fan_out_index=event.fan_out_index, - calling_branch_name=event.branch_name, - ) - attrs: dict[str, Any] = {"openarmature.llm.model": event.model} - cid = current_correlation_id() - if cid is not None: - attrs["openarmature.correlation_id"] = cid - if event.caller_invocation_metadata is not None: - _apply_caller_metadata(attrs, event.caller_invocation_metadata) - active_prompt = event.active_prompt - if active_prompt is not None: - attrs["openarmature.prompt.name"] = active_prompt.name - attrs["openarmature.prompt.version"] = active_prompt.version - attrs["openarmature.prompt.label"] = active_prompt.label - attrs["openarmature.prompt.template_hash"] = active_prompt.template_hash - attrs["openarmature.prompt.rendered_hash"] = active_prompt.rendered_hash - active_group = event.active_prompt_group - if active_group is not None: - attrs["openarmature.prompt.group_name"] = active_group.group_name - if not self.disable_genai_semconv: - attrs["gen_ai.system"] = event.provider - attrs["gen_ai.request.model"] = event.model - request_params = event.request_params or {} - if "temperature" in request_params: - attrs["gen_ai.request.temperature"] = request_params["temperature"] - if "max_tokens" in request_params: - attrs["gen_ai.request.max_tokens"] = request_params["max_tokens"] - if "top_p" in request_params: - attrs["gen_ai.request.top_p"] = request_params["top_p"] - if "seed" in request_params: - attrs["gen_ai.request.seed"] = request_params["seed"] - if "frequency_penalty" in request_params: - attrs["gen_ai.request.frequency_penalty"] = request_params["frequency_penalty"] - if "presence_penalty" in request_params: - attrs["gen_ai.request.presence_penalty"] = request_params["presence_penalty"] - if "stop_sequences" in request_params: - attrs["gen_ai.request.stop_sequences"] = request_params["stop_sequences"] - if not self.disable_provider_payload: - if event.input_messages: - serialized = _serialize_for_attribute(event.input_messages) - attrs["openarmature.llm.input.messages"] = _truncate_for_attribute( - serialized, self.payload_max_bytes - ) - if event.request_extras: - serialized_extras = _serialize_for_attribute(event.request_extras) - attrs["openarmature.llm.request.extras"] = _truncate_for_attribute( - serialized_extras, self.payload_max_bytes - ) - span = self._tracer.start_span( - name="openarmature.llm.complete", - context=cast("Any", parent_ctx), - kind=SpanKind.CLIENT, - attributes=attrs, - start_time=start_time_ns, - ) - span.set_status( - Status( - StatusCode.ERROR, - description=event.error_category, - ) - ) - span.set_attribute("openarmature.error.category", event.error_category) - self._run_enrichers(span, event) - span.end(end_time=end_time_ns) - def _handle_failure_isolated(self, event: FailureIsolatedEvent) -> None: """Emit a zero-duration ``openarmature.failure_isolated`` span for a FailureIsolationMiddleware catch. diff --git a/tests/_helpers/typed_event.py b/tests/_helpers/typed_event.py index 3b34f9d..02a55a0 100644 --- a/tests/_helpers/typed_event.py +++ b/tests/_helpers/typed_event.py @@ -10,7 +10,7 @@ from typing import Any -from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, LlmRetryAttemptEvent def make_typed_event(**overrides: Any) -> LlmCompletionEvent: @@ -74,3 +74,42 @@ def make_failed_event(**overrides: Any) -> LlmFailedEvent: } base.update(overrides) return LlmFailedEvent(**base) + + +def make_retry_attempt_event(**overrides: Any) -> LlmRetryAttemptEvent: + """Build an ``LlmRetryAttemptEvent`` with neutral success-attempt + defaults; ``overrides`` swap fields for the test case. Pass + ``error_category=...`` (and ``finish_reason=None``) for a failed + attempt. This is the event that drives the OTel per-attempt LLM + span; the Langfuse-side tests keep using ``make_typed_event`` (the + terminal event the Generation mapping reads).""" + base: dict[str, Any] = { + "invocation_id": "inv-1", + "correlation_id": None, + "node_name": "ask", + "namespace": ("ask",), + "attempt_index": 0, + "fan_out_index": None, + "branch_name": None, + "provider": "openai", + "model": "test-m", + "call_id": "cc-1", + "llm_attempt_index": 0, + "latency_ms": 10.0, + "input_messages": [], + "request_params": {}, + "request_extras": {}, + "active_prompt": None, + "active_prompt_group": None, + "response_id": None, + "response_model": None, + "usage": None, + "finish_reason": "stop", + "output_content": None, + "error_category": None, + "error_message": None, + "error_type": None, + "caller_invocation_metadata": None, + } + base.update(overrides) + return LlmRetryAttemptEvent(**base) diff --git a/tests/conformance/test_llm_provider.py b/tests/conformance/test_llm_provider.py index 0059053..c794807 100644 --- a/tests/conformance/test_llm_provider.py +++ b/tests/conformance/test_llm_provider.py @@ -103,12 +103,15 @@ "055-anthropic-wire-byte-stability": ( "Proposal 0047 wire-byte stability; queued for v0.13.0 (also Anthropic-pending)" ), - # Proposal 0050 (call-level retry, v0.42.0) — three fixtures - # exercise the new ``retry`` kwarg on ``complete()``. Queued for - # v0.14.0 retry & reliability primitives batch. - "056-call-level-retry-transient": ("Proposal 0050 call-level retry; queued for v0.14.0"), - "057-call-level-retry-exhaustion": ("Proposal 0050 call-level retry; queued for v0.14.0"), - "058-call-level-retry-non-transient-no-retry": ("Proposal 0050 call-level retry; queued for v0.14.0"), + # Proposal 0050 (call-level retry, v0.42.0) — three fixtures exercise + # the ``retry`` kwarg on ``complete()`` AND assert per-attempt LLM + # spans (observability §5.5). They stay deferred in this provider + # harness (which has no observer) and are activated + asserted in + # test_observability_otel.py::test_call_level_retry_fixture_per_attempt_spans, + # which drives them through the provider + an OTel observer. + "056-call-level-retry-transient": ("per-attempt LLM spans; see test_observability_otel.py"), + "057-call-level-retry-exhaustion": ("per-attempt LLM spans; see test_observability_otel.py"), + "058-call-level-retry-non-transient-no-retry": ("per-attempt LLM spans; see test_observability_otel.py"), } diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index ab2c25a..06943e9 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -89,6 +89,9 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # §5.7 attribute surface end-to-end against a two-branch # parallel-branches graph with calls_llm in each branch. "038-otel-parallel-branches-dispatch-span", + # v0.42.0 — proposal 0050 call-level-retry per-attempt LLM span + # surface. Single-attempt default: one span, attempt_index 0. + "057-llm-attempt-index-single-attempt-default", "001-otel-basic-trace", "002-otel-subgraph-hierarchy", "003-otel-error-status", @@ -256,6 +259,7 @@ async def test_observability_fixture(fixture_path: Path) -> None: "021-otel-llm-disable-genai-semconv", "025-otel-llm-request-params-extended", "026-otel-caller-supplied-metadata", + "057-llm-attempt-index-single-attempt-default", }: await _run_llm_payload_fixture(spec) else: @@ -2948,6 +2952,14 @@ def __init__(self, *, filter_event_type: str | None = None) -> None: self.events: list[Any] = [] async def __call__(self, event: Any) -> None: + # LlmRetryAttemptEvent is python-internal (it drives the OTel + # per-attempt span surface), not a spec-normative observer + # event, so the conformance collector excludes it from the + # captured stream that spec fixtures assert against. + from openarmature.graph import LlmRetryAttemptEvent # noqa: PLC0415 + + if isinstance(event, LlmRetryAttemptEvent): + return if self.filter_event_type is not None: if type(event).__name__ != self.filter_event_type: return diff --git a/tests/unit/test_llm_provider.py b/tests/unit/test_llm_provider.py index 13f6e3d..05dc039 100644 --- a/tests/unit/test_llm_provider.py +++ b/tests/unit/test_llm_provider.py @@ -20,7 +20,7 @@ import pytest from pydantic import ValidationError -from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, NodeEvent +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, LlmRetryAttemptEvent, NodeEvent from openarmature.graph.middleware import RetryConfig, deterministic_backoff from openarmature.graph.observer import ObserverEvent from openarmature.llm import ( @@ -1959,14 +1959,15 @@ def _handler(_req: httpx.Request) -> httpx.Response: assert typed.response_id is None -async def test_complete_success_emits_typed_event_as_single_emission() -> None: - # v0.13.0 dropped the success-side sentinel emission, so the - # provider's success-path emission window is now a single typed - # event — no within-emission ordering question remains. This test - # locks the single-emission shape so a regression that re-adds a - # sentinel NodeEvent on success would surface here. Spec fixture - # 056 pins the broader bracketing (typed event arrives between - # the CALLING NODE's started/completed pair) end-to-end. +async def test_complete_success_emits_per_attempt_then_terminal_typed_event() -> None: + # A successful no-retry complete() emits two typed events: a + # per-attempt LlmRetryAttemptEvent (attempt 0, driving the OTel + # per-attempt span surface) followed by the terminal + # LlmCompletionEvent. Both are typed — this locks the no-sentinel + # shape so a regression re-adding a sentinel NodeEvent on success + # would surface here as an extra NodeEvent. Spec fixture 056 pins + # the terminal event's bracketing (it arrives between the CALLING + # NODE's started/completed pair) end-to-end. events, token = _collecting_dispatch() transport = _make_openai_response_with_usage( {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} @@ -1978,8 +1979,11 @@ async def test_complete_success_emits_typed_event_as_single_emission() -> None: await provider.aclose() _release_dispatch(token) - assert len(events) == 1 - assert isinstance(events[0], LlmCompletionEvent) + assert [type(e).__name__ for e in events] == ["LlmRetryAttemptEvent", "LlmCompletionEvent"] + first, terminal = events + assert isinstance(first, LlmRetryAttemptEvent) + assert first.llm_attempt_index == 0 + assert isinstance(terminal, LlmCompletionEvent) async def test_llm_completion_event_sources_node_identity_from_calling_context() -> None: diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index a9aa72f..52132e1 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -508,7 +508,7 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: PromptResult, TextPrompt, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) @@ -543,7 +543,7 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: # The observer reads from the typed event, NOT from the live # ContextVar — that ContextVar is unreachable from the # dispatch worker's task-local Context. - await observer(make_typed_event(active_prompt=result, active_prompt_group=group)) + await observer(make_retry_attempt_event(active_prompt=result, active_prompt_group=group)) finally: _reset_invocation_id(token) @@ -566,14 +566,14 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-2") try: - await observer(make_typed_event()) + await observer(make_retry_attempt_event()) finally: _reset_invocation_id(token) observer.shutdown() @@ -584,12 +584,42 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None assert not any(k.startswith("openarmature.prompt.") for k in attrs) +async def test_otel_observer_ignores_terminal_llm_events() -> None: + """Feeding a terminal LlmCompletionEvent or LlmFailedEvent to the + OTel observer produces no ``openarmature.llm.complete`` span; the + per-attempt event is the sole span source.""" + # Proposal 0050: the OTel span renders only from LlmRetryAttemptEvent. + # The terminal events stay on the queue for the Langfuse mapping + + # payload consumers; this guards against reintroducing the + # terminal-event span path (which would double-emit alongside the + # per-attempt span). + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + from tests._helpers.typed_event import make_failed_event, make_typed_event + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + + token = _set_invocation_id("inv-terminal") + try: + await observer(make_typed_event()) + await observer(make_failed_event()) + finally: + _reset_invocation_id(token) + observer.shutdown() + + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + assert llm_spans == [] + + async def _drive_llm_span_with_cached_tokens( *, cached_tokens: int | None, cache_creation_tokens: int | None = None, ) -> dict[str, Any]: - """Drive the OTel observer through a typed LlmCompletionEvent + """Drive the OTel observer through a per-attempt LlmRetryAttemptEvent carrying the supplied cache-stat fields on the event's Usage record. Returns the LLM-span's attribute map. """ @@ -598,14 +628,14 @@ async def _drive_llm_span_with_cached_tokens( _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-cache") try: await observer( - make_typed_event( + make_retry_attempt_event( usage=Usage( prompt_tokens=100, completion_tokens=5, @@ -714,14 +744,14 @@ async def test_llm_span_duration_matches_typed_event_latency() -> None: _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) latency_ms = 123.456 token = _set_invocation_id("inv-duration") try: - await observer(make_typed_event(latency_ms=latency_ms)) + await observer(make_retry_attempt_event(latency_ms=latency_ms)) finally: _reset_invocation_id(token) observer.shutdown() @@ -745,13 +775,13 @@ async def test_llm_span_zero_duration_when_latency_missing() -> None: _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-no-latency") try: - await observer(make_typed_event(latency_ms=None)) + await observer(make_retry_attempt_event(latency_ms=None)) finally: _reset_invocation_id(token) observer.shutdown() @@ -766,11 +796,11 @@ async def test_typed_llm_event_drops_silently_outside_invocation() -> None: # No invocation in scope (no _set_invocation_id) → the handler # MUST early-return without emitting a span. Symmetric with the # error path's no-invocation drop. - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) - await observer(make_typed_event()) + await observer(make_retry_attempt_event()) observer.shutdown() llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] assert llm_spans == [] @@ -785,7 +815,7 @@ async def test_disable_llm_spans_skips_typed_event_path() -> None: _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_typed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver( @@ -794,7 +824,7 @@ async def test_disable_llm_spans_skips_typed_event_path() -> None: ) token = _set_invocation_id("inv-disabled") try: - await observer(make_typed_event()) + await observer(make_retry_attempt_event()) finally: _reset_invocation_id(token) observer.shutdown() @@ -812,19 +842,20 @@ async def test_llm_error_path_emits_error_span_from_typed_failed_event() -> None _reset_invocation_id, _set_invocation_id, ) - from tests._helpers.typed_event import make_failed_event + from tests._helpers.typed_event import make_retry_attempt_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-err") try: await observer( - make_failed_event( + make_retry_attempt_event( invocation_id="inv-err", error_category="provider_rate_limit", error_type="ProviderRateLimit", error_message="429 from upstream", call_id="cc-err", + finish_reason=None, ) ) finally: @@ -838,6 +869,117 @@ async def test_llm_error_path_emits_error_span_from_typed_failed_event() -> None assert attrs.get("openarmature.error.category") == "provider_rate_limit" +@pytest.mark.parametrize( + "fixture_id", + [ + "056-call-level-retry-transient", + "057-call-level-retry-exhaustion", + "058-call-level-retry-non-transient-no-retry", + ], +) +async def test_call_level_retry_fixture_per_attempt_spans(fixture_id: str) -> None: + # Proposal 0050 §7.1 / observability §5.5: drive each spec + # call-level-retry fixture (spec/llm-provider/conformance/) through + # the provider + an OTel observer and assert its per-attempt + # openarmature.llm.complete spans. These fixtures assert SPANS, so + # they are activated here (otel-gated, with an observer) rather than + # the generic llm-provider harness, which has no observer. The + # provider dispatches one LlmRetryAttemptEvent per attempt and the + # observer renders one span per event; in production the engine's + # serial queue carries them, here they are captured then replayed. + import json + from pathlib import Path + + import httpx + import yaml + from opentelemetry.trace import StatusCode + + from openarmature.graph.events import LlmRetryAttemptEvent + from openarmature.graph.middleware import RetryConfig, deterministic_backoff + from openarmature.llm.errors import LlmProviderError + from openarmature.llm.messages import UserMessage + from openarmature.llm.providers.openai import OpenAIProvider + from openarmature.observability.correlation import ( + _reset_active_dispatch, + _reset_invocation_id, + _set_active_dispatch, + _set_invocation_id, + ) + + fixture_dir = ( + Path(__file__).resolve().parents[2] / "openarmature-spec" / "spec" / "llm-provider" / "conformance" + ) + spec = cast("dict[str, Any]", yaml.safe_load((fixture_dir / f"{fixture_id}.yaml").read_text())) + responses = cast("list[dict[str, Any]]", spec["mock_provider"]["responses"]) + call = cast("dict[str, Any]", spec["call"]) + expected = cast("dict[str, Any]", spec["expected"]) + + response_iter = iter(responses) + + def handler(_request: httpx.Request) -> httpx.Response: + entry = next(response_iter) + body = entry.get("body") + content = json.dumps(body).encode() if body is not None else b"" + return httpx.Response(int(entry.get("status", 200)), content=content) + + retry_cfg = cast("dict[str, Any]", call["retry"]) + backoff_seconds = float(cast("dict[str, Any]", retry_cfg.get("backoff") or {}).get("seconds", 0.0)) + retry = RetryConfig( + max_attempts=int(retry_cfg["max_attempts"]), + backoff=deterministic_backoff(backoff_seconds), + ) + messages = [ + UserMessage(content=cast("str", m["content"])) for m in cast("list[dict[str, Any]]", call["messages"]) + ] + + captured: list[Any] = [] + disp_token = _set_active_dispatch(lambda e: captured.append(e)) + inv_token = _set_invocation_id("inv-clr") + provider = OpenAIProvider( + base_url="http://test", model="gpt-test", api_key="k", transport=httpx.MockTransport(handler) + ) + try: + if "raises" in expected: + with pytest.raises(LlmProviderError) as excinfo: + await provider.complete(messages, retry=retry) + assert excinfo.value.category == expected["raises"]["category"] + else: + result = await provider.complete(messages, retry=retry) + expected_content = cast("dict[str, Any]", expected["response"]["message"])["content"] + assert result.message.content == expected_content + finally: + await provider.aclose() + _reset_invocation_id(inv_token) + _reset_active_dispatch(disp_token) + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + inv_token2 = _set_invocation_id("inv-clr") + try: + for event in captured: + if isinstance(event, LlmRetryAttemptEvent): + await observer(event) + finally: + _reset_invocation_id(inv_token2) + observer.shutdown() + + spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + expected_spans = cast("list[dict[str, Any]]", expected["llm_spans"]) + assert len(spans) == len(expected_spans) + spans_by_index = {dict(s.attributes or {})["openarmature.llm.attempt_index"]: s for s in spans} + for exp_span in expected_spans: + idx = exp_span["attempt_index"] + span = spans_by_index[idx] + span_attrs = dict(span.attributes or {}) + for key, val in cast("dict[str, Any]", exp_span.get("attributes") or {}).items(): + assert span_attrs.get(key) == val, f"attempt {idx}: {key}={span_attrs.get(key)!r} != {val!r}" + if exp_span.get("error_category"): + assert span.status.status_code == StatusCode.ERROR + assert span_attrs.get("openarmature.error.category") == exp_span["error_category"] + else: + assert span.status.status_code == StatusCode.OK + + # --------------------------------------------------------------------------- # §7 log bridge: correlation_id injection # ---------------------------------------------------------------------------