diff --git a/CHANGELOG.md b/CHANGELOG.md index d751cdd..0aac2b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,16 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ## [Unreleased] +### Added + +- **`LlmFailedEvent` typed event variant** (proposal 0058, spec v0.53.0). Carves LLM provider failures into a spec-normatively-typed event variant alongside `LlmCompletionEvent`. 17 mirrored identity / scoping / request-side fields + 3 failure-specific fields (`error_category` always-present from the llm-provider §7 normative category enumeration; optional `error_type` for vendor-specific detail or upstream exception class name; always-present `error_message`). `OpenAIProvider.complete()` emits the typed event alongside the §7 exception on both raise paths — adapter-caught provider exceptions AND pre-send validation raises. Caller-side exception flow unchanged; the exception still raises out of `complete()`. Mutually exclusive with `LlmCompletionEvent` on the same call. Both bundled observers (OTel + Langfuse) consume `LlmFailedEvent` directly: same `openarmature.llm.complete` span / Generation shape as the success path with ERROR status / level + `openarmature.error.category` attribute (OTel) / `error_category` as statusMessage (Langfuse), `start_time` back-dated by `latency_ms` so the failure duration reflects the time-to-raise. + ### Changed -- **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. Failure paths continue to fire from the sentinel `NodeEvent` (the typed event is success-only per the proposal). The §5.5 attribute set and §8.4 Generation metadata are unchanged. -- **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. The sentinel `completed` event still fires on the failure path until the spec extends `LlmCompletionEvent` with error semantics; the sentinel `started` event is no longer emitted on either path. +- **Sentinel-namespace `NodeEvent` emission for LLM events retired entirely from `OpenAIProvider`** (proposal 0058 cleanup). The provider no longer dispatches the `("openarmature.llm.complete",)`-namespaced `NodeEvent`s on either outcome path; both success and failure flow through their respective typed variants exclusively. The `_make_llm_event` helper is removed. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` for success and `isinstance(event, LlmFailedEvent)` for failure to keep receiving LLM-call notifications. `LlmEventPayload` and `LLM_NAMESPACE` remain in `openarmature.observability.llm_event` as a documented compatibility surface for custom providers that haven't migrated; neither is referenced by the bundled provider or observers anymore. +- **Pinned spec advances from v0.51.0 to v0.53.0** (absorbs proposals 0023 + 0058). Proposal 0023 (canonical state reducers) ships in spec v0.52.0 but is not implemented this cycle — `conformance.toml` marks 0023 as `not-yet`; fixtures 034–038 stay parser-deferred. +- **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. The §5.5 attribute set and §8.4 Generation metadata are unchanged. (Failure paths land on `LlmFailedEvent` later in the same cycle — see the proposal 0058 entry above.) +- **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. (The failure-path sentinel emission is retired entirely later in the same cycle — see the proposal 0058 entry above.) - **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter handles v4 Langfuse SDK quirks transparently: `Langfuse.start_observation()` does NOT accept `start_time`, so back-dated generations are routed through the private `_otel_tracer.start_span(name=..., start_time=int_ns)` API (mirroring the SDK's own `create_event` precedent) and the resulting OTel span is wrapped in `LangfuseGeneration` directly; the non-back-dated path still uses `start_observation`. `LangfuseSpan.end()` is typed `Optional[int]` (nanoseconds), so the adapter converts the Protocol's `datetime` surface to int nanoseconds before forwarding. The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions. - **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips. diff --git a/conformance.toml b/conformance.toml index c77ab78..da4501d 100644 --- a/conformance.toml +++ b/conformance.toml @@ -32,7 +32,7 @@ [manifest] implementation = "openarmature-python" -spec_pin = "v0.51.0" +spec_pin = "v0.53.0" # Status values: # implemented — shipped behavior matches the proposal's contract @@ -217,6 +217,15 @@ status = "not-yet" [proposals."0022"] status = "not-yet" +# Spec v0.52.0 (proposal 0023). Canonical state reducers — three +# new factory-style reducers (``bounded_append``, ``dedupe_append``, +# ``merge_by_key``) extending the graph-engine §2 baseline set. +# Python has not yet shipped the new reducers; v0.13.0 leaves the +# capability not-yet-implemented. Conformance fixtures 035-038 +# stay parser-deferred until the implementation lands. +[proposals."0023"] +status = "not-yet" + [proposals."0042"] status = "implemented" since = "0.11.0" @@ -509,3 +518,20 @@ status = "not-yet" [proposals."0057"] status = "implemented" since = "0.13.0" + +# Spec v0.53.0 (proposal 0058). Typed LLM failure event — second +# spec-normatively-typed event variant on the observer event union +# alongside LlmCompletionEvent. Field set mirrors LlmCompletionEvent's +# identity / scoping / request-side surface (17 fields) plus three +# failure-specific fields (error_category from the §7 normative +# category enumeration, optional vendor-specific error_type, always- +# present error_message). Dispatched alongside the §7 exception on +# the observer queue — caller-side exception flow unchanged. +# Mutually exclusive with LlmCompletionEvent on the same call. Python +# lands the typed variant + provider emission + OTel/Langfuse +# consumer migration in v0.13.0; same PR also drops sentinel-namespace +# NodeEvent emission for LLM events entirely from the bundled +# OpenAIProvider. +[proposals."0058"] +status = "implemented" +since = "0.13.0" diff --git a/openarmature-spec b/openarmature-spec index b2045e1..bd2d782 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit b2045e1beed234ef6620943e13b2c5caecb66e6e +Subproject commit bd2d7824e5db40280899cd664442d9c5ac8fe506 diff --git a/pyproject.toml b/pyproject.toml index def2187..74a61c7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec" openarmature = "openarmature.cli:main" [tool.openarmature] -spec_version = "0.51.0" +spec_version = "0.53.0" [dependency-groups] dev = [ diff --git a/scripts/build_agents_md.py b/scripts/build_agents_md.py index 1cad9cb..dd9a8e7 100644 --- a/scripts/build_agents_md.py +++ b/scripts/build_agents_md.py @@ -204,7 +204,11 @@ def _capability_summaries(spec_tag: str) -> str: ( f"_Sourced from openarmature-spec {spec_tag}. Each entry below " + "reproduces §1 (Purpose) and §2 (Concepts) of the capability's " - + "`spec.md`. For the full spec text (execution model, error semantics, " + + "`spec.md` verbatim — including additions from accepted proposals " + + "that this Python implementation may not yet ship. For per-proposal " + + "implementation status (implemented / partial / textual-only / " + + "not-yet), see the `conformance.toml` manifest at the repo root. " + + "For the full spec text (execution model, error semantics, " + "determinism, observer hooks, etc.) see the linked docs site._" ), ] diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index 50d2f2d..c1e65aa 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1,6 +1,6 @@ # OpenArmature — Agent documentation -*This is the agent guide bundled with the openarmature Python package, version 0.12.0 (spec v0.51.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* +*This is the agent guide bundled with the openarmature Python package, version 0.12.0 (spec v0.53.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* ## TL;DR @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents: ## Capability contracts -_Sourced from openarmature-spec v0.51.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md`. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ +_Sourced from openarmature-spec v0.53.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ ### Capability: `graph-engine` @@ -46,11 +46,15 @@ engine constant, not a reserved node name, so a user node may happen to be named **Reducer.** A function that merges a node's partial update into the prior state for a given field. Each state field has exactly one reducer. The default reducer is _last-write-wins_ (the new value replaces the old). -Implementations MUST provide at least: `last_write_wins`, `append` (for list-typed fields), `merge` -(for mapping-typed fields), `concat_flatten` (for list-typed fields whose updates are lists of lists — -e.g., fan-out target fields collecting list-emitting per-instance values), and `merge_all` (for -mapping-typed fields whose updates are lists of mappings — e.g., fan-out target fields collecting -dict-emitting per-instance values). Users MAY register custom reducers per field. +Implementations MUST provide at least the following eight canonical reducers: `last_write_wins`, `append` +(for list-typed fields), `merge` (for mapping-typed fields), `concat_flatten` (for list-typed fields whose +updates are lists of lists — e.g., fan-out target fields collecting list-emitting per-instance values), +`merge_all` (for mapping-typed fields whose updates are lists of mappings — e.g., fan-out target fields +collecting dict-emitting per-instance values), `bounded_append(max_len)` (factory; `append` capped at +`max_len` entries with front-drop on overflow), `dedupe_append(key=None)` (factory; `append` skipping +items whose key already appears in the existing list), and `merge_by_key(key)` (factory; list-of-records +keyed merge — entries with a key matching an existing entry replace the existing entry in place; entries +with novel keys are appended). Users MAY register custom reducers per field. **`concat_flatten` semantics.** `concat_flatten(prior, update)` returns the concatenation of `prior` with the one-level flattening of `update`. Both `prior` and `update` MUST be lists, and every element of `update` MUST @@ -72,6 +76,57 @@ inside `update` contribute zero keys. Implementations MUST NOT auto-detect wheth mappings vs. a single mapping — `merge_all` is strictly the list-of-mappings reducer; callers needing both behaviors on the same field MUST register a custom reducer rather than rely on shape-dependent behavior. +**`bounded_append(max_len)` semantics.** A factory returning a reducer that extends a list with the update's +items and truncates from the front (oldest entries dropped first) if the post-merge length exceeds `max_len`. +`max_len` MUST be a positive integer (≥ 1); a factory call with `max_len ≤ 0` raises +`reducer_configuration_invalid` at field registration time. Behavior: concatenate prior + update, then if +the concatenated list's length exceeds `max_len`, drop entries from the front until the length equals +`max_len`. The bound applies to the post-merge length, not to the update's individual size — an update +larger than `max_len` keeps only the last `max_len` items of the update and the prior list is fully evicted. Both `prior` and `update` MUST be lists; +violations raise `ReducerError` per §4. Empty `update` is a no-op (returns `prior` unchanged) — the bound +applies to merge-time transformations, not as a prior-validation pass; `prior` is returned as-is even if +it somehow already exceeds `max_len` (matching the established `concat_flatten` / `merge_all` empty-update +pattern). Truncation MUST be from the front (oldest-first eviction) for cross-impl consistency; back-drop +is recoverable via a +custom reducer if needed. `bounded_append` is for cases where silent drop of evicted data is acceptable +(recent-events buffers, debug log windows, sliding metric caches); for cases where dropped data must be +summarized or transformed first (the canonical chat-history-with-LLM-summarization shape), use unbounded +`append` plus a separate compaction node or middleware — reducers are pure synchronous functions per the +contract above and cannot perform the IO that real compaction requires. + +**`dedupe_append(key=None)` semantics.** A factory returning a reducer that extends a list with items from +the update that are not already present (by key) in the existing list. The `key` parameter is an optional +callable mapping an item to its dedup key; if omitted, the item itself is used as the key (requires hashable +items). Behavior: initialize a seen-keys set from `prior` (preserving `prior` unchanged in the result), +iterate `update` in order, and for each item compute its key — if the key is NOT yet in seen-keys, append +the item to the result and record its key; otherwise skip. Existing items appear before update items; +within each, original order is maintained. Duplicates within the update itself are filtered alongside +matches against `prior` — first occurrence wins (preserves left-to-right precedence consistent with +`append`). The computed key (the item itself when no `key` callable is supplied, or the value returned by +the callable) MUST be hashable; a non-hashable key raises `ReducerError` per §4 at merge time. A `key` +callable that raises on any item propagates as `ReducerError`. The reducer does NOT mutate existing items +(no in-place dedup of `prior`); only the update is filtered. + +**`merge_by_key(key)` semantics.** A factory returning a reducer for list-of-records fields. Items in the +update with a key matching an existing item REPLACE the existing item in place; items with novel keys are +appended at the end of the list in the order they appear in the update. The `key` parameter is a required +callable mapping an item to its merge key — the spec does NOT default this; keyed merge without a key +function is meaningless and a factory call with `key=None` raises `reducer_configuration_invalid` at field +registration time. Behavior: build a `key_to_idx` index from `prior` (when `prior` contains duplicate keys, +the index MUST hold the LAST index for each duplicate key — implementations whose native dict construction +uses first-wins semantics MUST iterate explicitly to enforce last-wins); for each item in `update`, if its +key is in the index, replace the prior entry at that index with the update item; otherwise append the +update item to the result and register its key. Existing entry order MUST be preserved (replacements are +in-place); novel entries are appended in update order. Duplicate keys within the update collapse to +last-occurrence-wins (consistent with how dict updates work for repeated keys). Earlier duplicates in +`prior` are preserved in place — the reducer does NOT in-place dedupe existing entries (parallel to +`dedupe_append`'s "no in-place dedup of existing" rule). The value returned by the `key` callable MUST +be hashable (required by the index-build step); a non-hashable return value raises `ReducerError` per §4 +at merge time. The `key` callable raising on any item propagates as `ReducerError`. Empty `update` is a +no-op. `merge_by_key` is NOT a substitute for `merge` — `merge` +operates on dict-typed fields with shallow key-value semantics; `merge_by_key` operates on list-of-records +fields with item-key semantics. The qualifier `_by_key` distinguishes the two shapes. + **Subgraph.** A compiled graph used as a node inside another graph. A subgraph executes against its own state schema and produces a partial update that is merged into the parent's state. The merge uses the same reducer rules as ordinary nodes — parent reducers, applied to parent fields. @@ -136,6 +191,11 @@ identifiers (as an error class, error code, or tagged discriminant, per the lang - `conflicting_reducers` — a state field has more than one declared reducer. - `mapping_references_undeclared_field` — a subgraph-as-node `inputs` or `outputs` mapping names a field not declared in the relevant state schema. +- `reducer_configuration_invalid` — a reducer factory was supplied invalid construction parameters + (e.g., `bounded_append(max_len=0)`, `merge_by_key(key=None)`). Raised at field registration / graph + compilation time, before any node body runs. Distinct from `conflicting_reducers`, which is about + the reducer-declaration shape across multiple reducers on the same field; `reducer_configuration_invalid` + is about parameters supplied to a single reducer factory. ### Capability: `pipeline-utilities` diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index c978000..a84ee57 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -25,7 +25,7 @@ """ __version__ = "0.12.0" -__spec_version__ = "0.51.0" +__spec_version__ = "0.53.0" # Proposal 0052 (spec observability §5.1 / §8.4.1): canonical # package-registry name for this implementation. Surfaces on every # OTel invocation span as ``openarmature.implementation.name`` and on diff --git a/src/openarmature/graph/__init__.py b/src/openarmature/graph/__init__.py index af55d4f..60a10ff 100644 --- a/src/openarmature/graph/__init__.py +++ b/src/openarmature/graph/__init__.py @@ -39,6 +39,7 @@ InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, + LlmFailedEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -86,6 +87,7 @@ "InvocationCompletedEvent", "InvocationStartedEvent", "LlmCompletionEvent", + "LlmFailedEvent", "MappingReferencesUndeclaredField", "MetadataAugmentationEvent", "Middleware", diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index fa97273..6e602ec 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -580,11 +580,91 @@ class LlmCompletionEvent: caller_invocation_metadata: Mapping[str, AttributeValue] | None = None +# Spec: realizes proposal 0058's second spec-normatively-typed event +# variant on the observer event union (graph-engine §6 + +# observability §5.5.7), accepted at spec v0.53.0. Dispatched on the +# observer delivery queue whenever a provider.complete() call raises +# a §7 category exception — covers BOTH the adapter-caught provider- +# exception path AND the pre-send validation raise path +# (provider_invalid_request / provider_unsupported_content_block +# raise before any provider contact). The event is dispatched +# ALONGSIDE the exception, not in place of it; caller-side exception +# flow is unchanged. +# +# Mutual exclusion with LlmCompletionEvent on the same +# provider.complete() call — implementations MUST NOT emit both for +# the same call. Conformance fixture 072 locks this down. +# +# Privacy posture identical to LlmCompletionEvent: input_messages / +# request_params / request_extras are populated unconditionally per +# §5.5.7; observer-side privacy gates (OTel disable_llm_payload, +# Langfuse equivalents) apply at rendering. Inline image bytes are +# redacted per observability §5.5.5 before population. Custom +# queryable observers own their own redaction posture. +@dataclass(frozen=True) +class LlmFailedEvent: + """A typed LLM provider call failure event delivered to observers. + + Carries identity, scoping, and failure-context data for an LLM + call that raised a llm-provider §7 category exception. Observer + code filters by type discrimination (``isinstance(event, + LlmFailedEvent)``) rather than by the impl-current sentinel- + namespace string match. + + Identity / scoping / request-side field set mirrors + ``LlmCompletionEvent`` 1:1 — same field semantics, same nullability + rules. Response-side fields (``response_id``, ``response_model``, + ``usage``, ``output_content``, ``finish_reason``) are ABSENT from + this variant — no response was received. + + Failure-specific fields: + + - ``error_category``: the llm-provider §7 normative error + category the call raised. One of the 9 canonical strings + (``provider_authentication``, ``provider_unavailable``, + ``provider_invalid_model``, ``provider_model_not_loaded``, + ``provider_rate_limit``, ``provider_invalid_response``, + ``provider_invalid_request``, + ``provider_unsupported_content_block``, + ``structured_output_invalid``). Always present. + - ``error_type``: OPTIONAL impl-level / vendor-specific error + type or code. Two acceptable styles per spec: + vendor error code (e.g. ``"rate_limit_exceeded"``) OR + upstream exception class name (e.g. ``"RateLimitError"``). + ``None`` when no impl-side type is available. + - ``error_message``: human-readable message from the raised + exception. Always present (empty string when the exception + carried no message). + """ + + invocation_id: str + correlation_id: str | None + node_name: str + namespace: tuple[str, ...] + attempt_index: int + fan_out_index: int | None + branch_name: str | None + provider: str + model: str + latency_ms: float | None + input_messages: list[dict[str, Any]] + request_params: Mapping[str, Any] + request_extras: Mapping[str, Any] + active_prompt: Any + active_prompt_group: Any + call_id: str + error_category: str + error_message: str + error_type: str | None = None + caller_invocation_metadata: Mapping[str, AttributeValue] | None = None + + __all__ = [ "FanOutEventConfig", "InvocationCompletedEvent", "InvocationStartedEvent", "LlmCompletionEvent", + "LlmFailedEvent", "MetadataAugmentationEvent", "NodeEvent", "ParallelBranchesEventConfig", diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index 9a82ce0..75ece1c 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -38,6 +38,7 @@ InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, + LlmFailedEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -49,15 +50,18 @@ # reach every subscribed observer — MetadataAugmentationEvent # (proposal 0040 mid-invocation metadata augmentation), # InvocationStartedEvent / InvocationCompletedEvent (proposal 0043 -# trace.input/output sourcing), and LlmCompletionEvent (proposal -# 0049 typed LLM provider call event, dispatched on every successful -# LLM completion alongside the calling node's NodeEvent pair). +# trace.input/output sourcing), LlmCompletionEvent (proposal 0049 +# typed LLM provider call event, dispatched on every successful LLM +# completion), and LlmFailedEvent (proposal 0058 typed LLM failure +# event, dispatched alongside the §7 exception when provider.complete +# raises). ObserverEvent = ( NodeEvent | MetadataAugmentationEvent | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ) diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index 9c2d310..4e8b9f5 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -63,7 +63,7 @@ import jsonschema from pydantic import BaseModel, ValidationError -from openarmature.graph.events import LlmCompletionEvent, NodeEvent +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent from openarmature.observability.correlation import ( current_attempt_index, current_branch_name, @@ -73,7 +73,6 @@ current_invocation_id, current_namespace_prefix, ) -from openarmature.observability.llm_event import LlmEventPayload from openarmature.observability.metadata import AttributeValue, current_invocation_metadata # ``current_prompt_group`` / ``current_prompt_result`` are imported @@ -372,48 +371,22 @@ async def complete( supplied list. Violations raise ``provider_invalid_request`` BEFORE any HTTP request is sent. """ - validate_message_list(messages) - validate_tools(tools) - # ``validate_tool_choice`` runs after ``validate_tools`` so the - # name-membership check sees a structurally valid tools list. - validate_tool_choice(tool_choice, tools) - schema_dict, schema_class = _normalize_response_schema(response_schema) - # On the fallback path, the wire-side messages list is an - # augmented COPY of the caller's messages — original messages - # MUST NOT be mutated. _augment_messages_with_schema_directive - # builds a fresh list and does not modify the reused Message - # instances in place; the caller's sequence is untouched. - wire_messages: Sequence[Message] = messages - if schema_dict is not None and self._force_prompt_augmentation_fallback: - wire_messages = _augment_messages_with_schema_directive(messages, schema_dict) - body = self._build_request_body( - wire_messages, - tools, - config, - schema_dict, - # The fallback only governs structured-output calls; free- - # form calls (schema_dict is None) must preserve any - # caller-supplied response_format from RuntimeConfig extras. - include_response_format=(schema_dict is None or not self._force_prompt_augmentation_fallback), - tool_choice=tool_choice, - ) - # Spec observability §5.5 LLM provider span: when an # observability backend is active in the current invocation, - # emit a started/completed event pair around the wire call so - # the backend can build a span. Queue-mediated dispatch + # emit a typed LlmCompletionEvent (success) or LlmFailedEvent + # (failure) around the wire call so the backend can build a + # span / Generation observation. Queue-mediated dispatch # preserves spec §6 serial event ordering across all event # sources within an invocation. ``current_dispatch()`` returns - # ``None`` outside an openarmature invocation (direct - # provider use in scripts/tests), in which case the call - # proceeds without span emission. + # ``None`` outside an openarmature invocation (direct provider + # use in scripts/tests), in which case the call proceeds + # without typed-event emission. # - # ``call_id`` is minted once per ``complete()`` call and - # threaded through both events of the pair. Backend - # observers key their in-flight LLM-span maps by it so - # concurrent ``complete()`` calls (e.g., fan-out instances - # each calling this provider) don't collide on the - # constant ``("openarmature.llm.complete",)`` sentinel. + # ``call_id`` is minted once per ``complete()`` call. Per + # proposal 0058: a failed call gets its own ``call_id`` + # distinct from any retry-attempt sibling — the retry + # middleware re-enters ``complete()`` for each attempt, so a + # fresh mint per call automatically satisfies that contract. dispatch = current_dispatch() call_id = str(uuid.uuid4()) # Capture prompt context AT DISPATCH TIME (in the node task's @@ -431,68 +404,98 @@ async def complete( active_prompt = current_prompt_result() active_prompt_group = current_prompt_group() - # Payload data the §5.5.1 / §5.5.2 / §5.5.3 attributes are - # sourced from. Image redaction (per §5.5.5) happens inside - # ``_serialize_messages_for_payload`` — image bytes never - # leave the provider in event form. ``input_messages`` mirrors - # the messages list the caller supplied; the wire-side body - # may be augmented (schema directive on the fallback path), - # but the OBSERVED messages are the spec-§3 logical inputs. - serialized_messages = _serialize_messages_for_payload(messages) + # Resolve request-side fields up-front. These don't raise §7 + # category exceptions; safe to compute outside the try-block. + # The serialized_messages projection runs INSIDE the try-block + # below — it can raise on malformed input that would also + # trigger a pre-send §7 raise, and we want both raises to + # surface through the same LlmFailedEvent path. Default to an + # empty list so the failure event always has a populated + # ``input_messages`` field even when serialization couldn't + # complete. request_params = _request_params_from_config(config) request_extras = _request_extras_from_config(config) + serialized_messages: list[dict[str, Any]] = [] # Wall-clock latency measured at the adapter boundary per - # proposal 0049's LlmCompletionEvent.latency_ms contract. The - # boundary spans from "just before _do_complete is called" to - # "_do_complete returns with a parsed Response in hand" — - # covers HTTP setup, request emission, provider compute, - # response receive, AND response parsing into the typed - # Response. The spec text "wall-clock latency of the LLM call - # measured at the adapter boundary" is silent on whether - # parsing is included; including it matches the operator's - # mental model of "how long until I had a usable answer" - # better than just-the-HTTP-call. perf_counter is the monotonic - # high-resolution clock for elapsed-time measurements. + # proposal 0049's LlmCompletionEvent.latency_ms contract / + # proposal 0058's LlmFailedEvent.latency_ms contract. Window + # spans from "just before validation runs" through to either + # "parsed Response in hand" (success) or "the §7 exception + # was raised" (failure). perf_counter is the monotonic high- + # resolution clock for elapsed-time measurements. adapter_start = time.perf_counter() try: + # Per proposal 0058: pre-send validation raises (e.g., + # provider_invalid_request, provider_unsupported_content_ + # block) MUST dispatch LlmFailedEvent before the exception + # propagates. The whole validation + wire-call sequence + # runs inside the protected scope so any §7 category + # exception (pre-send OR adapter-caught) flows through the + # same emission path. + validate_message_list(messages) + validate_tools(tools) + # ``validate_tool_choice`` runs after ``validate_tools`` + # so the name-membership check sees a structurally valid + # tools list. + validate_tool_choice(tool_choice, tools) + schema_dict, schema_class = _normalize_response_schema(response_schema) + # Image redaction (per §5.5.5) happens inside + # ``_serialize_messages_for_payload`` — image bytes never + # leave the provider in event form. ``input_messages`` + # mirrors the caller's messages list; the wire-side body + # may be augmented (schema directive on the fallback path) + # but the OBSERVED messages are the spec-§3 logical inputs. + serialized_messages = _serialize_messages_for_payload(messages) + # On the fallback path, the wire-side messages list is an + # augmented COPY of the caller's messages — original + # messages MUST NOT be mutated. + wire_messages: Sequence[Message] = messages + if schema_dict is not None and self._force_prompt_augmentation_fallback: + wire_messages = _augment_messages_with_schema_directive(messages, schema_dict) + body = self._build_request_body( + wire_messages, + tools, + config, + schema_dict, + # The fallback only governs structured-output calls; + # free-form calls (schema_dict is None) must preserve + # any caller-supplied response_format from RuntimeConfig + # extras. + include_response_format=(schema_dict is None or not self._force_prompt_augmentation_fallback), + tool_choice=tool_choice, + ) response = await self._do_complete(body, schema_dict, schema_class) - except Exception as exc: + except LlmProviderError as exc: + # Failure path: dispatch a typed LlmFailedEvent per + # proposal 0058. Only §7 category exceptions + # (LlmProviderError subclasses) trigger a typed event — + # internal bugs / unexpected exceptions raise without + # event emission because they aren't part of the §7 + # contract. The caller-side exception flow is unchanged: + # the exception still raises out of complete(). + latency_ms_failed = (time.perf_counter() - adapter_start) * 1000.0 if dispatch is not None: - # Failure path: the sentinel NodeEvent carries the - # error fields per llm-provider §7. LlmCompletionEvent - # is success-only per proposal 0049 §3 alternative 3, - # so failures continue to surface through a sentinel - # ``completed`` event until the spec extends the typed - # event with error semantics. Only ``completed`` fires - # — no started counterpart, since both bundled - # observers' handlers ignore sentinel-started after - # the v0.13.0 migration. dispatch( - _make_llm_event( - "completed", + self._build_llm_failed_event( + exc, + latency_ms_failed, call_id=call_id, - model=self.model, - genai_system=self._genai_system, - error=exc, input_messages=serialized_messages, request_params=request_params, request_extras=request_extras, active_prompt=active_prompt, active_prompt_group=active_prompt_group, - ) + ), ) raise latency_ms = (time.perf_counter() - adapter_start) * 1000.0 if dispatch is not None: - # Success path: emit only the typed LlmCompletionEvent. - # The sentinel NodeEvent pair previously emitted on success - # for compatibility with pre-typed-event observers was - # dropped in v0.13.0; bundled observers (OTel + Langfuse) - # consume the typed event directly, and external custom - # observers should migrate to type discrimination via - # ``isinstance(event, LlmCompletionEvent)`` if they need - # LLM call notifications. + # Success path: emit the typed LlmCompletionEvent. + # External custom observers consuming LLM events MUST + # filter via ``isinstance(event, LlmCompletionEvent)``; + # the sentinel namespace pattern is retired in this + # release. dispatch( self._build_llm_completion_event( response, @@ -588,6 +591,63 @@ def _build_llm_completion_event( caller_invocation_metadata=caller_metadata, ) + def _build_llm_failed_event( + self, + exc: LlmProviderError, + latency_ms: float, + *, + call_id: str, + input_messages: list[dict[str, Any]], + request_params: dict[str, Any], + request_extras: dict[str, Any], + active_prompt: Any, + active_prompt_group: Any, + ) -> LlmFailedEvent: + """Construct the typed LlmFailedEvent for the failure path. + + Sources identity / scoping fields from the calling-node + ContextVars and failure fields from the raised §7 exception. + Field set mirrors LlmCompletionEvent (identity + request-side) + plus the three failure-specific fields per proposal 0058. + + ``error_type`` defaults to the exception class name — falls + into the "upstream exception class name" style documented in + the spec field table. Providers that have a vendor error code + available (e.g. ``rate_limit_exceeded`` for OpenAI) can + override with vendor-specific detail in a future spec + proposal; for now the class name is the safest default since + every LlmProviderError subclass carries one. + """ + + namespace = current_namespace_prefix() + node_name = namespace[-1] if namespace else "" + invocation_id = current_invocation_id() or "" + caller_metadata: Mapping[str, AttributeValue] | None = None + if self._populate_caller_metadata: + caller_metadata = dict(current_invocation_metadata()) + return LlmFailedEvent( + invocation_id=invocation_id, + correlation_id=current_correlation_id(), + node_name=node_name, + namespace=namespace, + attempt_index=current_attempt_index(), + fan_out_index=current_fan_out_index(), + branch_name=current_branch_name(), + provider=self._genai_system, + model=self.model, + latency_ms=latency_ms, + input_messages=input_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + call_id=call_id, + error_category=exc.category, + error_type=type(exc).__name__, + error_message=str(exc), + caller_invocation_metadata=caller_metadata, + ) + async def _do_complete( self, body: dict[str, Any], @@ -1343,7 +1403,8 @@ def _looks_like_model_not_loaded(message: object) -> bool: # payload, not just OA's own. URL-form images pass through unchanged. def _serialize_messages_for_payload(messages: Sequence[Message]) -> list[dict[str, Any]]: """Render a list of typed :class:`Message` instances into the - plain-dict shape carried on ``LlmEventPayload.input_messages``.""" + plain-dict shape carried on the typed LLM events' ``input_messages`` + field (LlmCompletionEvent / LlmFailedEvent).""" out: list[dict[str, Any]] = [] for msg in messages: if isinstance(msg, SystemMessage): @@ -1426,80 +1487,6 @@ def _request_extras_from_config(config: RuntimeConfig | None) -> dict[str, Any]: return dict(config.model_extra or {}) -# call_id MUST be the same string on the started/completed pair so -# the observer can match them under concurrency. The OTel observer -# (or any backend mapping) recognises the sentinel node_name + -# namespace and emits an LLM-specific span instead of a node span; -# backend-specific attribute extraction reads payload fields from -# pre_state directly. -def _make_llm_event( - phase: Literal["started", "completed"], - *, - call_id: str, - model: str, - genai_system: str, - finish_reason: FinishReason | None = None, - usage: Usage | None = None, - error: BaseException | None = None, - input_messages: list[dict[str, Any]] | None = None, - output_content: str | None = None, - request_params: dict[str, Any] | None = None, - request_extras: dict[str, Any] | None = None, - response_id: str | None = None, - response_model: str | None = None, - active_prompt: Any = None, - active_prompt_group: Any = None, -) -> NodeEvent: - """Build a ``NodeEvent``-shaped record for the engine's delivery - queue, populated as an ``openarmature.llm.complete`` event.""" - error_type: str | None = None - error_message: str | None = None - error_category: str | None = None - if error is not None: - error_type = type(error).__name__ - error_message = str(error) - category = getattr(error, "category", None) - if isinstance(category, str): - error_category = category - payload = LlmEventPayload( - call_id=call_id, - model=model, - finish_reason=finish_reason, - prompt_tokens=usage.prompt_tokens if usage is not None else None, - completion_tokens=usage.completion_tokens if usage is not None else None, - total_tokens=usage.total_tokens if usage is not None else None, - cached_tokens=usage.cached_tokens if usage is not None else None, - cache_creation_tokens=usage.cache_creation_tokens if usage is not None else None, - error_type=error_type, - error_message=error_message, - error_category=error_category, - calling_namespace_prefix=current_namespace_prefix(), - calling_attempt_index=current_attempt_index(), - calling_fan_out_index=current_fan_out_index(), - calling_branch_name=current_branch_name(), - active_prompt=active_prompt, - active_prompt_group=active_prompt_group, - input_messages=input_messages, - output_content=output_content, - request_params=request_params, - request_extras=request_extras, - response_id=response_id, - response_model=response_model, - genai_system=genai_system, - caller_invocation_metadata=dict(current_invocation_metadata()), - ) - return NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase=phase, - pre_state=payload, - post_state=None, - error=None, - parent_states=(), - ) - - __all__ = [ "OpenAIProvider", "classify_http_error", diff --git a/src/openarmature/observability/correlation.py b/src/openarmature/observability/correlation.py index 2967bc4..65394f1 100644 --- a/src/openarmature/observability/correlation.py +++ b/src/openarmature/observability/correlation.py @@ -40,6 +40,7 @@ InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, + LlmFailedEvent, MetadataAugmentationEvent, NodeEvent, ) @@ -222,6 +223,7 @@ def _reset_active_observers(token: Token[tuple[SubscribedObserver, ...]]) -> Non | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ], None, ] @@ -237,6 +239,7 @@ def current_dispatch() -> ( | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ], None, ] @@ -263,6 +266,7 @@ def _set_active_dispatch( | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ], None, ], @@ -274,6 +278,7 @@ def _set_active_dispatch( | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ], None, ] @@ -293,6 +298,7 @@ def _reset_active_dispatch( | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ], None, ] diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index d239121..4525a41 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -33,11 +33,11 @@ InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, + LlmFailedEvent, MetadataAugmentationEvent, NodeEvent, ) from openarmature.observability.lineage import is_strict_prefix -from openarmature.observability.llm_event import LLM_NAMESPACE, LlmEventPayload from .client import ( LangfuseClient, @@ -358,6 +358,7 @@ async def __call__( | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ), ) -> None: if isinstance(event, InvocationStartedEvent): @@ -368,23 +369,21 @@ async def __call__( return # Proposal 0049 typed LlmCompletionEvent (success path). Drives # the §5.5 Generation observation lifecycle for successful - # provider calls. Failures don't emit this variant; they flow - # through the sentinel error path below (a single sentinel - # ``completed`` event — no started counterpart in v0.13.0+). + # provider calls. if isinstance(event, LlmCompletionEvent): if not self.disable_llm_spans: self._handle_typed_llm_completion(event) return + # Proposal 0058 typed LlmFailedEvent (failure path). Drives + # the same Generation observation lifecycle with ERROR level + + # error_category as statusMessage. + if isinstance(event, LlmFailedEvent): + if not self.disable_llm_spans: + self._handle_typed_llm_failed(event) + return if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return - # LLM provider sentinel events: failure-path completed opens + - # closes an ERROR-level Generation; everything else is a no-op - # (success-path typed handler above owns the Generation). - if event.namespace == LLM_NAMESPACE: - if not self.disable_llm_spans: - self._handle_llm_error_event(event) - return if event.phase == "started": self._open_started_observation(event) elif event.phase == "completed": @@ -1321,18 +1320,14 @@ def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) -> # Generation observation lifecycle (LLM provider events) # ------------------------------------------------------------------ - # v0.13.0 (proposal 0049 + 0057): success-path Generation lifecycle - # is driven by the typed LlmCompletionEvent — opened and closed in - # one shot at typed-event arrival, with start_time back-dated by - # latency_ms so the observation's duration reflects the adapter- - # boundary measurement rather than dispatcher queue delay. Failure - # path keeps a single sentinel NodeEvent (``completed`` phase - # carrying error fields on its LlmEventPayload — LlmCompletionEvent - # is success-only per proposal 0049 §3 alternative 3). The provider - # dropped success-path sentinel emission entirely in this release, - # so on success the typed event is the only signal the Generation - # observation has to fire from; the failure path's sentinel - # ``started`` was also dropped, leaving only ``completed``. + # v0.13.0 (proposals 0049 + 0057 + 0058): both Generation + # observation lifecycles are driven by typed events — success path + # from LlmCompletionEvent, failure path from LlmFailedEvent. Both + # handlers open + close in one shot at typed-event arrival, with + # start_time back-dated by latency_ms so duration reflects the + # adapter-boundary measurement rather than dispatcher queue delay. + # The provider dropped sentinel-namespace NodeEvent emission for + # LLM events entirely in this release. def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: """Open + close the Generation observation from the typed LlmCompletionEvent (success path).""" @@ -1402,65 +1397,72 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: end_kwargs["usage"] = usage handle.end(end_time=end_time, **end_kwargs) - def _handle_llm_error_event(self, event: NodeEvent) -> None: - """Emit an ERROR-level Generation observation from the sentinel - NodeEvent on the failure path. Success-path sentinel completion - is no longer emitted by the provider in v0.13.0; this handler - only fires for failures.""" + def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: + """Open + close an ERROR-level Generation observation from the + typed LlmFailedEvent (failure path, proposal 0058). Same shape + as the success path with ERROR level + error_category as the + Generation observation's statusMessage.""" from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, ) - if event.phase != "completed": - # Sentinel started becomes a no-op once the success-side - # emission drops. Failures only emit the completed half. - return - if not isinstance(event.pre_state, LlmEventPayload): - return - payload = event.pre_state - if payload.error_type is None: - # Defensive — success path no longer emits the sentinel - # pair; if a non-error sentinel completion slips through - # (e.g., legacy custom provider not yet migrated), the - # typed event handler owns the Generation. - return invocation_id = current_invocation_id() if invocation_id is None: return correlation_id = current_correlation_id() if invocation_id not in self._inv_states: - self._open_trace(invocation_id, correlation_id, event) + self._open_trace_for_typed_event(invocation_id, correlation_id, event) inv_state = self._inv_states[invocation_id] + # Back-date timestamps using latency_ms (mirrors the success + # path); for failures the duration reflects time until the §7 + # exception was raised. + end_time = datetime.now(UTC) + if event.latency_ms is not None: + start_time = end_time - timedelta(milliseconds=event.latency_ms) + else: + start_time = end_time parent_observation_id = self._resolve_llm_parent_observation_id( inv_state, - calling_namespace_prefix=payload.calling_namespace_prefix, - calling_attempt_index=payload.calling_attempt_index, - calling_fan_out_index=payload.calling_fan_out_index, - calling_branch_name=payload.calling_branch_name, - ) - metadata, model_parameters, input_value, _ = self._llm_metadata_and_payload( - payload, correlation_id, phase="started" - ) - target_trace_id = self._trace_id_for( - inv_state, payload.calling_namespace_prefix, payload.calling_fan_out_index + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, ) + metadata = self._typed_event_metadata(event, correlation_id) + # Failure-specific metadata rows: surface error_type + error_ + # message as well as the category-as-statusMessage on the + # observation. error_type is null when no impl-side type was + # available; the metadata key is omitted in that case so the + # absence-is-meaningful semantic is preserved. + if event.error_type is not None: + metadata["error_type"] = event.error_type + metadata["error_message"] = event.error_message + model_parameters: dict[str, Any] = dict(event.request_params or {}) + input_value: Any = None + if not self.disable_llm_payload: + if event.input_messages: + input_value = self._maybe_truncate_for_input(event.input_messages) + if event.request_extras: + metadata["request_extras"] = self._maybe_truncate_for_extras(dict(event.request_extras)) + target_trace_id = self._trace_id_for(inv_state, event.namespace, event.fan_out_index) handle = self.client.generation( trace_id=target_trace_id, name="openarmature.llm.complete", - model=payload.model, + model=event.model, model_parameters=model_parameters, input=input_value, metadata=metadata, parent_observation_id=parent_observation_id, - prompt=self._resolve_prompt_link(payload), + prompt=self._resolve_prompt_link_from_typed_event(event), + start_time=start_time, ) # Error-category mapping: §8.4.2 + §8.4.3. - end_kwargs: dict[str, Any] = { - "level": "ERROR", - "status_message": payload.error_category or payload.error_type, - } - handle.end(**end_kwargs) + handle.end( + end_time=end_time, + level="ERROR", + status_message=event.error_category, + ) def _resolve_llm_parent_observation_id( self, @@ -1504,12 +1506,15 @@ def _resolve_llm_parent_observation_id( return sg.handle.id return None - def _typed_event_metadata(self, event: LlmCompletionEvent, correlation_id: str | None) -> dict[str, Any]: - """Build the Generation observation's metadata dict from the - typed event. Mirrors _llm_metadata_and_payload's metadata - construction but reads from LlmCompletionEvent fields, and - combines started + completed phases into a single populated - dict (the typed event carries everything at once).""" + def _typed_event_metadata( + self, event: LlmCompletionEvent | LlmFailedEvent, correlation_id: str | None + ) -> dict[str, Any]: + """Build the Generation observation's metadata dict from a + typed LLM event. Shared between the success path + (LlmCompletionEvent) and the failure path (LlmFailedEvent); + response-side metadata (finish_reason / response_model / + response_id) lands only on the success variant since those + fields don't exist on LlmFailedEvent.""" metadata: dict[str, Any] = {} if correlation_id is not None: metadata["correlation_id"] = correlation_id @@ -1526,19 +1531,20 @@ def _typed_event_metadata(self, event: LlmCompletionEvent, correlation_id: str | active_group = event.active_prompt_group if active_group is not None: metadata["prompt_group_name"] = active_group.group_name - # Asymmetric guard with _llm_metadata_and_payload below: the - # typed event types caller_invocation_metadata as Mapping | None - # while LlmEventPayload defaults to an empty mapping (never - # None). Don't "normalize" the two paths without normalizing - # the source types. if event.caller_invocation_metadata is not None: _apply_caller_metadata(metadata, event.caller_invocation_metadata) - if event.finish_reason is not None: - metadata["finish_reason"] = event.finish_reason - if event.response_model is not None: - metadata["response_model"] = event.response_model - if event.response_id is not None: - metadata["response_id"] = event.response_id + # Response-side fields are LlmCompletionEvent-only — absent + # from LlmFailedEvent. The type guard keeps the success-path + # metadata complete while the failure-path metadata stays + # focused on the request-side + the failure-specific fields + # the caller adds separately. + if isinstance(event, LlmCompletionEvent): + if event.finish_reason is not None: + metadata["finish_reason"] = event.finish_reason + if event.response_model is not None: + metadata["response_model"] = event.response_model + if event.response_id is not None: + metadata["response_id"] = event.response_id return metadata def _usage_from_typed_event(self, event: LlmCompletionEvent) -> LangfuseUsage | None: @@ -1555,10 +1561,9 @@ def _usage_from_typed_event(self, event: LlmCompletionEvent) -> LangfuseUsage | total=usage.total_tokens, ) - def _resolve_prompt_link_from_typed_event(self, event: LlmCompletionEvent) -> Any: + def _resolve_prompt_link_from_typed_event(self, event: LlmCompletionEvent | LlmFailedEvent) -> Any: """§8.4.4 case discrimination on the typed event's active_prompt - snapshot. Same logic as _resolve_prompt_link but reads from - LlmCompletionEvent instead of LlmEventPayload.""" + snapshot.""" active_prompt = event.active_prompt if active_prompt is None: return None @@ -1568,12 +1573,12 @@ def _resolve_prompt_link_from_typed_event(self, event: LlmCompletionEvent) -> An return cast("dict[str, Any]", entities).get("langfuse_prompt") def _open_trace_for_typed_event( - self, invocation_id: str, correlation_id: str | None, event: LlmCompletionEvent + self, invocation_id: str, correlation_id: str | None, event: LlmCompletionEvent | LlmFailedEvent ) -> None: - """Trace open path for a typed LlmCompletionEvent arriving - before any node-started event reached this observer. - Synthesizes the minimal trace shape from the typed event's - scoping fields.""" + """Trace open path for a typed LLM event (LlmCompletionEvent or + LlmFailedEvent) arriving before any node-started event reached + this observer. Synthesizes the minimal trace shape from the + typed event's scoping fields.""" if event.namespace: entry_node = event.namespace[0] else: @@ -1591,113 +1596,6 @@ def _open_trace_for_typed_event( self.client.trace(id=invocation_id, name=entry_node, metadata=metadata) self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) - def _llm_metadata_and_payload( - self, - payload: LlmEventPayload, - correlation_id: str | None, - *, - phase: str, - ) -> tuple[dict[str, Any], dict[str, Any], Any, Any]: - # Returns (metadata, model_parameters, input, output) for the - # generation(...) / .end(...) call. Phase-specific filtering - # keeps the started call lean (input only) and the completed - # call focused on the output + usage + response metadata. - metadata: dict[str, Any] = {} - if correlation_id is not None: - metadata["correlation_id"] = correlation_id - # gen_ai.system → metadata.system per §8.4.3 - metadata["system"] = payload.genai_system - # Prompt-identity metadata (§8.4.4 always-on, independent of - # whether a Langfuse Prompt entity link is established). - active_prompt = payload.active_prompt - if active_prompt is not None: - metadata["prompt"] = { - "name": active_prompt.name, - "version": active_prompt.version, - "label": active_prompt.label, - "template_hash": active_prompt.template_hash, - "rendered_hash": active_prompt.rendered_hash, - } - active_group = payload.active_prompt_group - if active_group is not None: - metadata["prompt_group_name"] = active_group.group_name - _apply_caller_metadata(metadata, payload.caller_invocation_metadata) - - model_parameters: dict[str, Any] = {} - request_params = payload.request_params or {} - # Per §8.4.3: every gen_ai.request. attribute lifts to - # generation.modelParameters. by inclusion. The §5.5.2 - # source set keys this on (temperature, max_tokens, top_p, - # seed, frequency_penalty, presence_penalty, stop_sequences as - # of v0.24.0); new request-param attrs added in future spec - # versions flow through automatically. - for key, value in request_params.items(): - model_parameters[key] = value - - # Input/output payload gated by disable_llm_payload (§8.7). - input_value: Any = None - output_value: Any = None - if not self.disable_llm_payload: - if phase == "started" and payload.input_messages is not None: - # The payload's input_messages is already image- - # redacted at the provider per §5.5.5 (inline image - # bytes never reach the observer). Serialize and - # compare against the configured cap; under cap the - # native shape is fine, over cap §8.7 says preserve - # the raw truncated string with the marker. - input_value = self._maybe_truncate_for_input(payload.input_messages) - if phase == "completed" and payload.output_content is not None: - output_value = self._maybe_truncate_for_output(payload.output_content) - if phase == "started" and payload.request_extras: - # request_extras renders into metadata, not the input - # field, per §8.4.3 (`metadata.request_extras`). - metadata["request_extras"] = self._maybe_truncate_for_extras(dict(payload.request_extras)) - - # Response metadata fields land on the completed call (§8.4.3). - if phase == "completed": - if payload.finish_reason is not None: - metadata["finish_reason"] = payload.finish_reason - if payload.response_model is not None: - metadata["response_model"] = payload.response_model - if payload.response_id is not None: - metadata["response_id"] = payload.response_id - - return metadata, model_parameters, input_value, output_value - - def _usage_from_payload(self, payload: LlmEventPayload) -> LangfuseUsage | None: - # Map OA usage fields onto the Langfuse Usage record per - # §8.4.3. Returns None when no usage was reported (all three - # token fields None) so the Generation observation reflects - # absence rather than zeroed counts. - if ( - payload.prompt_tokens is None - and payload.completion_tokens is None - and payload.total_tokens is None - ): - return None - return LangfuseUsage( - input=payload.prompt_tokens, - output=payload.completion_tokens, - total=payload.total_tokens, - ) - - def _resolve_prompt_link(self, payload: LlmEventPayload) -> Any: - # §8.4.4 case discrimination: the trigger is whether the - # prompt's source exposes a Langfuse Prompt reference, not - # which specific backend produced it. PromptResult has - # observability_entities['langfuse_prompt'] populated when - # case 1 applies; absent otherwise. - active_prompt = payload.active_prompt - if active_prompt is None: - return None - # PromptResult is typed Any on LlmEventPayload to avoid a - # cross-package import (see llm_event.py for the rationale); - # read defensively. - entities = getattr(active_prompt, "observability_entities", None) - if not isinstance(entities, dict): - return None - return cast("dict[str, Any]", entities).get("langfuse_prompt") - def _maybe_truncate_for_input(self, value: Any) -> Any: # Returns the native value (list of message dicts) when it # fits the cap, or the truncated marker-bearing string when diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 5265876..6f8f146 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -101,11 +101,11 @@ InvocationCompletedEvent, InvocationStartedEvent, LlmCompletionEvent, + LlmFailedEvent, MetadataAugmentationEvent, NodeEvent, ) from openarmature.observability.lineage import is_strict_prefix -from openarmature.observability.llm_event import LLM_NAMESPACE, LlmEventPayload # Span-stack key shape: # ``(namespace, attempt_index, fan_out_index, branch_name)`` — these @@ -117,15 +117,6 @@ _StackKey = tuple[tuple[str, ...], int, int | None, str | None] -# Re-export the LLM-event namespace sentinel under the same name the -# observer module has historically used. The canonical home is -# ``openarmature.observability.llm_event`` (backend-agnostic — pulling -# the constant from there avoids forcing core-observability imports to -# load the OTel backend). ``_LLM_NAMESPACE`` is retained for backwards -# compatibility within this module. -_LLM_NAMESPACE = LLM_NAMESPACE - - # §5.5.5 truncation marker. The leading character is U+2026 HORIZONTAL # ELLIPSIS (3 bytes UTF-8); the marker is a fixed UTF-8 string and is # appended as a whole unit so no boundary backtracking is needed past @@ -182,8 +173,9 @@ def _apply_caller_metadata(attrs: dict[str, Any], metadata: Mapping[str, Any]) - dispatch, fan-out instance dispatch, LLM provider span, detached roots). Source values may come from ``NodeEvent.caller_invocation_metadata`` for graph events or - from ``LlmEventPayload.caller_invocation_metadata`` for LLM - events; both are dispatch-time snapshots. + from the typed LLM events' (LlmCompletionEvent / LlmFailedEvent) + caller_invocation_metadata field for LLM events; both are + dispatch-time snapshots. """ for key, value in metadata.items(): attrs[f"openarmature.user.{key}"] = value @@ -460,7 +452,9 @@ class OTelObserver: # span.end() the observer issues. NodeEvent is None on synthetic # close sites (subgraph dispatch, detached root, fan-out instance, # invocation span, shutdown drain). - attribute_enrichers: Sequence[Callable[[Span, NodeEvent | LlmCompletionEvent | None], None]] = () + attribute_enrichers: Sequence[ + Callable[[Span, NodeEvent | LlmCompletionEvent | LlmFailedEvent | None], None] + ] = () # Read from the package's ``__spec_version__`` (one of the three # places the spec version is pinned per CLAUDE.md). Bumping the # spec submodule + the two version fields automatically updates @@ -531,7 +525,9 @@ def __post_init__(self) -> None: # warned, never propagated to the dispatch worker. # ``event`` is None on synthetic close sites (subgraph dispatch, # detached root, fan-out instance, invocation span, orphan drain). - def _run_enrichers(self, span: Span, event: NodeEvent | LlmCompletionEvent | None) -> None: + def _run_enrichers( + self, span: Span, event: NodeEvent | LlmCompletionEvent | LlmFailedEvent | None + ) -> None: """Invoke configured enrichers against ``span`` before ``span.end()`` is called.""" if not self.attribute_enrichers: @@ -575,6 +571,7 @@ async def __call__( | InvocationStartedEvent | InvocationCompletedEvent | LlmCompletionEvent + | LlmFailedEvent ), ) -> None: # Proposal 0043 invocation-boundary events: OTel has no @@ -586,25 +583,22 @@ async def __call__( return # Proposal 0049 typed LlmCompletionEvent (success path). # Drives the openarmature.llm.complete span lifecycle for - # successful provider calls. Failures don't emit this variant; - # they flow through the sentinel-pair error path below. + # successful provider calls. if isinstance(event, LlmCompletionEvent): if not self.disable_llm_spans: self._handle_typed_llm_completion(event) return + # Proposal 0058 typed LlmFailedEvent (failure path). Drives + # the same openarmature.llm.complete span lifecycle for failed + # provider calls, with ERROR status + openarmature.error.category + # attribute. + if isinstance(event, LlmFailedEvent): + if not self.disable_llm_spans: + self._handle_typed_llm_failed(event) + return if isinstance(event, MetadataAugmentationEvent): self._handle_metadata_augmentation(event) return - # LLM provider sentinel events: the typed event handler above - # owns the success-path span. Failure-path ``completed`` events - # open + close an error span. v0.13.0 dropped sentinel-pair - # emission on the success path entirely; the only remaining - # sentinel emission is failure-path ``completed`` (until the - # spec extends the typed event with error semantics). - if event.namespace == _LLM_NAMESPACE: - if not self.disable_llm_spans: - self._handle_llm_error_event(event) - return if event.phase == "checkpoint_saved": self._emit_checkpoint_save_span(event) return @@ -640,7 +634,7 @@ def prepare_sync(self, event: NodeEvent) -> None: engine-side attach. Errors don't leak: ``_dispatch`` wraps this call in try/except + ``warnings.warn`` matching the async path. """ - if event.phase != "started" or event.namespace == _LLM_NAMESPACE: + if event.phase != "started": return from openarmature.observability.correlation import ( _set_active_observer_span, @@ -1146,11 +1140,6 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid - # Asymmetric guard with _handle_llm_error_event below: the typed - # event types ``caller_invocation_metadata`` as ``Mapping | None`` - # while LlmEventPayload defaults to an empty mapping (never None). - # Don't "normalize" the two paths without also normalizing the - # source types. if event.caller_invocation_metadata is not None: _apply_caller_metadata(attrs, event.caller_invocation_metadata) active_prompt = event.active_prompt @@ -1242,55 +1231,58 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: self._run_enrichers(span, event) span.end(end_time=end_time_ns) - def _handle_llm_error_event(self, event: NodeEvent) -> None: - """Emit the error-path ``openarmature.llm.complete`` span from - the sentinel NodeEvent. Success-path completed events and the - started event are no-ops here — the typed LlmCompletionEvent - handler owns the success-path span.""" + def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: + """Open + close the ``openarmature.llm.complete`` span from the + typed LlmFailedEvent (failure path, proposal 0058). Same span + shape as the success path with ERROR status + + ``openarmature.error.category`` attribute attached.""" from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, ) - if event.phase != "completed": - return - if not isinstance(event.pre_state, LlmEventPayload): - return - payload = event.pre_state - if payload.error_type is None: - # Success-path completed: typed-event handler owns the span. - return invocation_id = current_invocation_id() if invocation_id is None: return inv_state = self._inv_state_for(invocation_id) + # Back-date start_time using latency_ms (mirrors the success- + # path handler). For failures, latency reflects time spent + # until the §7 exception was raised — useful for diagnosing + # whether failures are fast-failing (pre-send validation) or + # slow-failing (provider timeout). + end_time_ns = time.time_ns() + if event.latency_ms is not None: + start_time_ns = end_time_ns - int(event.latency_ms * 1_000_000) + else: + start_time_ns = end_time_ns parent_ctx = self._resolve_llm_parent( inv_state, invocation_id, - calling_namespace_prefix=payload.calling_namespace_prefix, - calling_attempt_index=payload.calling_attempt_index, - calling_fan_out_index=payload.calling_fan_out_index, - calling_branch_name=payload.calling_branch_name, + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, ) - attrs: dict[str, Any] = {"openarmature.llm.model": payload.model} + attrs: dict[str, Any] = {"openarmature.llm.model": event.model} cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid - _apply_caller_metadata(attrs, payload.caller_invocation_metadata) - active_prompt = payload.active_prompt + if event.caller_invocation_metadata is not None: + _apply_caller_metadata(attrs, event.caller_invocation_metadata) + active_prompt = event.active_prompt if active_prompt is not None: attrs["openarmature.prompt.name"] = active_prompt.name attrs["openarmature.prompt.version"] = active_prompt.version attrs["openarmature.prompt.label"] = active_prompt.label attrs["openarmature.prompt.template_hash"] = active_prompt.template_hash attrs["openarmature.prompt.rendered_hash"] = active_prompt.rendered_hash - active_group = payload.active_prompt_group + active_group = event.active_prompt_group if active_group is not None: attrs["openarmature.prompt.group_name"] = active_group.group_name if not self.disable_genai_semconv: - attrs["gen_ai.system"] = payload.genai_system - attrs["gen_ai.request.model"] = payload.model - request_params = payload.request_params or {} + attrs["gen_ai.system"] = event.provider + attrs["gen_ai.request.model"] = event.model + request_params = event.request_params or {} if "temperature" in request_params: attrs["gen_ai.request.temperature"] = request_params["temperature"] if "max_tokens" in request_params: @@ -1306,13 +1298,13 @@ def _handle_llm_error_event(self, event: NodeEvent) -> None: if "stop_sequences" in request_params: attrs["gen_ai.request.stop_sequences"] = request_params["stop_sequences"] if not self.disable_llm_payload: - if payload.input_messages: - serialized = _serialize_for_attribute(payload.input_messages) + if event.input_messages: + serialized = _serialize_for_attribute(event.input_messages) attrs["openarmature.llm.input.messages"] = _truncate_for_attribute( serialized, self.payload_max_bytes ) - if payload.request_extras: - serialized_extras = _serialize_for_attribute(payload.request_extras) + if event.request_extras: + serialized_extras = _serialize_for_attribute(event.request_extras) attrs["openarmature.llm.request.extras"] = _truncate_for_attribute( serialized_extras, self.payload_max_bytes ) @@ -1321,17 +1313,17 @@ def _handle_llm_error_event(self, event: NodeEvent) -> None: context=cast("Any", parent_ctx), kind=SpanKind.CLIENT, attributes=attrs, + start_time=start_time_ns, ) span.set_status( Status( StatusCode.ERROR, - description=payload.error_category or payload.error_type, + description=event.error_category, ) ) - if payload.error_category is not None: - span.set_attribute("openarmature.error.category", payload.error_category) + span.set_attribute("openarmature.error.category", event.error_category) self._run_enrichers(span, event) - span.end() + span.end(end_time=end_time_ns) def _resolve_llm_parent( self, diff --git a/tests/_helpers/typed_event.py b/tests/_helpers/typed_event.py index ad420cf..3b34f9d 100644 --- a/tests/_helpers/typed_event.py +++ b/tests/_helpers/typed_event.py @@ -1,4 +1,4 @@ -"""Shared test helper for constructing ``LlmCompletionEvent`` instances. +"""Shared test helpers for constructing typed LLM event instances. Replaces 20+ kwargs of boilerplate at each call site with a one-liner plus the overrides relevant to the test. Used by unit tests against @@ -10,7 +10,7 @@ from typing import Any -from openarmature.graph.events import LlmCompletionEvent +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent def make_typed_event(**overrides: Any) -> LlmCompletionEvent: @@ -42,3 +42,35 @@ def make_typed_event(**overrides: Any) -> LlmCompletionEvent: } base.update(overrides) return LlmCompletionEvent(**base) + + +def make_failed_event(**overrides: Any) -> LlmFailedEvent: + """Build a ``LlmFailedEvent`` with neutral defaults; ``overrides`` + swap individual fields for the test case. Mirrors ``make_typed_event`` + on the shared field set; failure-specific defaults are + ``provider_unavailable`` category, the upstream class name as + ``error_type``, and a generic message.""" + base: dict[str, Any] = { + "invocation_id": "inv-1", + "correlation_id": None, + "node_name": "ask", + "namespace": ("ask",), + "attempt_index": 0, + "fan_out_index": None, + "branch_name": None, + "provider": "openai", + "model": "test-m", + "latency_ms": 10.0, + "input_messages": [], + "request_params": {}, + "request_extras": {}, + "active_prompt": None, + "active_prompt_group": None, + "call_id": "cc-1", + "error_category": "provider_unavailable", + "error_type": "ProviderUnavailable", + "error_message": "service down", + "caller_invocation_metadata": None, + } + base.update(overrides) + return LlmFailedEvent(**base) diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py index ac9a467..899d409 100644 --- a/tests/conformance/test_conformance.py +++ b/tests/conformance/test_conformance.py @@ -74,6 +74,17 @@ def _fixture_id(path: Path) -> str: _DEFERRED_FIXTURES: dict[str, str] = { # proposal 0011 — parallel branches; fixture 021 (``branch_name`` # field on NodeEvent) runs through this driver as of PR-5. + # Proposal 0023 (canonical state reducers, spec v0.52.0) — runtime + # execution requires the new factory reducers (``bounded_append``, + # ``dedupe_append``, ``merge_by_key``). Python ships these in a + # future PR; the manifest entry is ``not-yet``. + "034-reducer-bounded-append": "Proposal 0023 canonical state reducers; impl not yet shipped", + "035-reducer-dedupe-append": "Proposal 0023 canonical state reducers; impl not yet shipped", + "036-reducer-merge-by-key": "Proposal 0023 canonical state reducers; impl not yet shipped", + "037-reducer-configuration-invalid-max-len": ( + "Proposal 0023 canonical state reducers; impl not yet shipped" + ), + "038-reducer-error-non-list-update": ("Proposal 0023 canonical state reducers; impl not yet shipped"), } diff --git a/tests/conformance/test_fixture_parsing.py b/tests/conformance/test_fixture_parsing.py index 655bd68..643e99a 100644 --- a/tests/conformance/test_fixture_parsing.py +++ b/tests/conformance/test_fixture_parsing.py @@ -365,6 +365,48 @@ def _id(case: tuple[str, Path]) -> str: "observability/068-llm-completion-event-response-model-distinct-from-request": ( "Proposal 0057 typed event request-side fields; queued for v0.13.0" ), + # Proposal 0058 (LlmFailedEvent typed variant, v0.53.0) — fixtures + # 069-073 share the same typed_observers / typed_event_collector + # directive shape as 050-068 and inherit the same parser-deferral + # status pending the harness model's typed-event-collector schema. + # The behavior is pinned by unit tests in + # ``tests/unit/test_llm_provider.py`` (provider emission) plus + # ``tests/unit/test_observability_otel.py`` and + # ``test_observability_langfuse.py`` (observer rendering). + "observability/069-llm-failure-event-dispatch-on-provider-unavailable": ( + "Proposal 0058 typed LLM failure event; harness typed_event_collector schema pending" + ), + "observability/070-llm-failure-event-dispatch-on-provider-invalid-request": ( + "Proposal 0058 typed LLM failure event; harness typed_event_collector schema pending" + ), + "observability/071-llm-failure-event-call-id-distinct-from-completion-event": ( + "Proposal 0058 typed LLM failure event; harness typed_event_collector schema pending" + ), + "observability/072-llm-failure-event-mutual-exclusion-with-completion-event": ( + "Proposal 0058 typed LLM failure event; harness typed_event_collector schema pending" + ), + "observability/073-llm-failure-event-error-type-vendor-specific": ( + "Proposal 0058 typed LLM failure event; harness typed_event_collector schema pending" + ), + # Proposal 0023 (canonical state reducers, accepted before v0.53.0 + # but not implemented by this release) — fixtures 035-038 introduce + # the new dict-form reducer directive (``dedupe_append: {}``, + # ``merge_by_key: {key: 'id'}``); the harness's reducer field still + # expects a string. Queued for the canonical-state-reducers impl + # batch alongside its conformance manifest entry. + "graph-engine/034-reducer-bounded-append": ( + "Proposal 0023 canonical state reducers; impl not yet shipped" + ), + "graph-engine/035-reducer-dedupe-append": ( + "Proposal 0023 canonical state reducers; impl not yet shipped" + ), + "graph-engine/036-reducer-merge-by-key": ("Proposal 0023 canonical state reducers; impl not yet shipped"), + "graph-engine/037-reducer-configuration-invalid-max-len": ( + "Proposal 0023 canonical state reducers; impl not yet shipped" + ), + "graph-engine/038-reducer-error-non-list-update": ( + "Proposal 0023 canonical state reducers; impl not yet shipped" + ), # Proposal 0050 (failure-isolation middleware + call-level retry, # v0.42.0) — llm-provider fixtures 056-058 (call-level retry) and # pipeline-utilities fixtures 058-063 (failure-isolation diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 88e20f7..863ec1a 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.12.0" - assert openarmature.__spec_version__ == "0.51.0" + assert openarmature.__spec_version__ == "0.53.0" def test_spec_version_matches_pyproject() -> None: diff --git a/tests/unit/test_llm_provider.py b/tests/unit/test_llm_provider.py index db985ba..4c5aa55 100644 --- a/tests/unit/test_llm_provider.py +++ b/tests/unit/test_llm_provider.py @@ -13,12 +13,13 @@ from collections.abc import Callable from contextvars import Token +from typing import cast import httpx import pytest from pydantic import ValidationError -from openarmature.graph.events import LlmCompletionEvent, NodeEvent +from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, NodeEvent from openarmature.graph.observer import ObserverEvent from openarmature.llm import ( PROVIDER_AUTHENTICATION, @@ -39,6 +40,8 @@ ProviderModelNotLoaded, ProviderRateLimit, ProviderUnavailable, + ProviderUnsupportedContentBlock, + StructuredOutputInvalid, SystemMessage, Tool, ToolCall, @@ -1298,14 +1301,15 @@ async def test_complete_success_emits_only_typed_event() -> None: assert len(typed_events) == 1 -async def test_complete_failure_emits_only_sentinel_completed_no_typed_event() -> None: - # Per proposal 0049 §3 alternative 3: LlmCompletionEvent fires on - # successful structured-response completion only. Provider - # exceptions (provider_unavailable etc.) flow through the existing - # exception path; the sentinel NodeEvent(completed, error=...) - # carries the error fields. v0.13.0 dropped the sentinel ``started`` - # event entirely — only ``completed`` fires on failure. - from openarmature.graph.events import LlmCompletionEvent, NodeEvent +async def test_complete_failure_emits_typed_llm_failed_event_only() -> None: + # Per proposal 0058: failures emit a typed LlmFailedEvent on the + # observer queue ALONGSIDE the exception (the exception still + # raises out of complete() — caller-side flow unchanged). Per + # proposal 0049 §3 alternative 3: LlmCompletionEvent stays + # success-only — no LlmCompletionEvent fires on failure. v0.13.0 + # dropped sentinel-namespace NodeEvent emission for LLM events + # entirely; no NodeEvent fires on success OR failure. + from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent, NodeEvent def _503(_req: httpx.Request) -> httpx.Response: return httpx.Response(503, json={"error": {"message": "down"}}) @@ -1322,10 +1326,146 @@ def _503(_req: httpx.Request) -> httpx.Response: _release_dispatch(token) node_events = [e for e in events if isinstance(e, NodeEvent)] - typed_events = [e for e in events if isinstance(e, LlmCompletionEvent)] - assert len(node_events) == 1 - assert node_events[0].phase == "completed" - assert typed_events == [] + completion_events = [e for e in events if isinstance(e, LlmCompletionEvent)] + failed_events = [e for e in events if isinstance(e, LlmFailedEvent)] + assert node_events == [] + assert completion_events == [] + assert len(failed_events) == 1 + assert failed_events[0].error_category == "provider_unavailable" + assert failed_events[0].error_type == "ProviderUnavailable" + + +# --------------------------------------------------------------------------- +# Proposal 0058: per-category field-mapping + pre-send + mutual exclusion +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + ("exc_factory", "expected_cls_name", "expected_category"), + [ + (lambda: ProviderAuthentication("boom"), "ProviderAuthentication", "provider_authentication"), + (lambda: ProviderUnavailable("boom"), "ProviderUnavailable", "provider_unavailable"), + (lambda: ProviderInvalidModel("boom"), "ProviderInvalidModel", "provider_invalid_model"), + (lambda: ProviderModelNotLoaded("boom"), "ProviderModelNotLoaded", "provider_model_not_loaded"), + (lambda: ProviderRateLimit("boom"), "ProviderRateLimit", "provider_rate_limit"), + (lambda: ProviderInvalidResponse("boom"), "ProviderInvalidResponse", "provider_invalid_response"), + (lambda: ProviderInvalidRequest("boom"), "ProviderInvalidRequest", "provider_invalid_request"), + ( + lambda: ProviderUnsupportedContentBlock("boom", block_type="image"), + "ProviderUnsupportedContentBlock", + "provider_unsupported_content_block", + ), + ( + lambda: StructuredOutputInvalid( + "boom", + response_schema={}, + raw_content="", + failure_description="", + ), + "StructuredOutputInvalid", + "structured_output_invalid", + ), + ], +) +def test_build_llm_failed_event_maps_category_and_type_per_exception( + exc_factory: Callable[[], LlmProviderError], expected_cls_name: str, expected_category: str +) -> None: + # Proposal 0058 field mapping: every §7 LlmProviderError subclass + # populates the typed event's error_category from its ``category`` + # class attribute and error_type from the class name. Locks down + # the mapping for all 9 categories so future additions to the + # error hierarchy can't silently drop this. + provider = OpenAIProvider(base_url="http://test", model="m", api_key="k") + exc = exc_factory() + event = provider._build_llm_failed_event( # noqa: SLF001 + exc, + latency_ms=12.0, + call_id="cc-test", + input_messages=[], + request_params={}, + request_extras={}, + active_prompt=None, + active_prompt_group=None, + ) + assert event.error_category == expected_category + assert event.error_type == expected_cls_name + assert event.error_message == "boom" + assert event.latency_ms == 12.0 + assert event.call_id == "cc-test" + + +async def test_complete_pre_send_validation_emits_llm_failed_event_before_propagating() -> None: + # Proposal 0058: §7 category exceptions raised from the pre-send + # validation layer (before any wire contact) MUST dispatch + # LlmFailedEvent on the observer queue alongside the exception. + # ProviderInvalidRequest from _normalize_response_schema's non- + # BaseModel-class rejection is the cleanest pre-send trigger + # because it bypasses every wire concern. + class _NotABaseModel: + pass + + events, token = _collecting_dispatch() + provider = OpenAIProvider(base_url="http://test", model="m", api_key="k") + try: + with pytest.raises(ProviderInvalidRequest): + await provider.complete( + [UserMessage(content="hi")], + response_schema=cast("type", _NotABaseModel), + ) + finally: + await provider.aclose() + _release_dispatch(token) + + failed_events = [e for e in events if isinstance(e, LlmFailedEvent)] + completion_events = [e for e in events if isinstance(e, LlmCompletionEvent)] + assert completion_events == [] + assert len(failed_events) == 1 + assert failed_events[0].error_category == "provider_invalid_request" + assert failed_events[0].error_type == "ProviderInvalidRequest" + + +async def test_llm_completion_and_failed_events_are_mutually_exclusive() -> None: + # Proposal 0058 mutual-exclusion contract: implementations MUST + # NOT emit both LlmCompletionEvent and LlmFailedEvent for the same + # provider.complete() call. Verify the disjoint-count rule on + # both success and failure paths within the same test so a future + # restructure that accidentally emits both surfaces here. + success_events, success_token = _collecting_dispatch() + success_transport = _make_openai_response_with_usage( + {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2} + ) + success_provider = OpenAIProvider( + base_url="http://test", model="m", api_key="k", transport=success_transport + ) + try: + await success_provider.complete([UserMessage(content="hi")]) + finally: + await success_provider.aclose() + _release_dispatch(success_token) + + success_completion = [e for e in success_events if isinstance(e, LlmCompletionEvent)] + success_failed = [e for e in success_events if isinstance(e, LlmFailedEvent)] + assert len(success_completion) == 1 + assert success_failed == [] + + def _503(_req: httpx.Request) -> httpx.Response: + return httpx.Response(503, json={"error": {"message": "down"}}) + + failure_events, failure_token = _collecting_dispatch() + failure_provider = OpenAIProvider( + base_url="http://test", model="m", api_key="k", transport=httpx.MockTransport(_503) + ) + try: + with pytest.raises(ProviderUnavailable): + await failure_provider.complete([UserMessage(content="hi")]) + finally: + await failure_provider.aclose() + _release_dispatch(failure_token) + + failure_completion = [e for e in failure_events if isinstance(e, LlmCompletionEvent)] + failure_failed = [e for e in failure_events if isinstance(e, LlmFailedEvent)] + assert failure_completion == [] + assert len(failure_failed) == 1 async def test_llm_completion_event_carries_typed_outcome_fields() -> None: diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 9f467f3..8d66c65 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -1342,65 +1342,56 @@ async def test_disable_llm_spans_skips_typed_event_path() -> None: assert client.traces == {} -async def test_llm_error_path_emits_error_generation_from_sentinel_completed() -> None: - # Failure-path provider exceptions still flow through the sentinel - # NodeEvent(completed, error=...). Verify the observer emits an - # ERROR-level Generation with the canonical error_category as - # status_message. - from openarmature.graph.events import NodeEvent +async def test_llm_error_path_emits_error_generation_from_typed_failed_event() -> None: + # Per proposal 0058: failures emit a typed LlmFailedEvent. The + # Langfuse observer drives the Generation observation with ERROR + # level + error_category as statusMessage. from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, ) - from openarmature.observability.llm_event import LlmEventPayload + from tests._helpers.typed_event import make_failed_event client = InMemoryLangfuseClient() observer = LangfuseObserver(client=client) token = _set_invocation_id("inv-err") try: - completed = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="completed", - pre_state=LlmEventPayload( - call_id="cc-err", + await observer( + make_failed_event( + invocation_id="inv-err", model="m-test", + error_category="provider_rate_limit", error_type="ProviderRateLimit", - error_category="provider_rate_limited", error_message="429 from upstream", - ), - post_state=None, - error=None, - parent_states=(), + call_id="cc-err", + ) ) - await observer(completed) finally: _reset_invocation_id(token) trace = client.traces["inv-err"] obs = next(o for o in trace.observations if o.type == "generation") assert obs.level == "ERROR" - assert obs.status_message == "provider_rate_limited" + assert obs.status_message == "provider_rate_limit" -async def test_llm_error_event_parents_under_branch_calling_node() -> None: +async def test_typed_failed_event_parents_under_branch_calling_node() -> None: # Regression cover for the _resolve_llm_parent_observation_id - # keyword-only refactor: when an LLM failure fires inside a - # parallel-branches branch, the resulting ERROR Generation MUST - # parent under THAT branch's calling node observation, not under - # a sibling branch's same-named node. Pre-populates the observer's - # internal state with two open node observations that differ only - # by branch_name, then dispatches a sentinel-completed-error with - # a matching calling_branch_name and asserts the parent_observation - # _id points at the right one. + # keyword-only signature: when a typed LlmFailedEvent fires + # inside a parallel-branches branch, the resulting ERROR + # Generation MUST parent under THAT branch's calling node + # observation, not under a sibling branch's same-named node. + # Pre-populates the observer's internal state with two open + # node observations that differ only by branch_name, then + # dispatches a typed LlmFailedEvent with the matching + # branch_name and asserts the parent_observation_id points at + # the right one. # # Note: the same _resolve_llm_parent_observation_id call also - # serves the success-path typed event handler (with - # calling_branch_name = event.branch_name); error- and success- - # paths share the resolver, so this test transitively covers the - # success-path branch_name handling. - from openarmature.graph.events import NodeEvent + # serves the success-path handler with calling_branch_name = + # event.branch_name; failure- and success-paths share the + # resolver so this test transitively covers the success-path + # branch_name handling. from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, @@ -1409,7 +1400,7 @@ async def test_llm_error_event_parents_under_branch_calling_node() -> None: _InvState, _OpenObservation, ) - from openarmature.observability.llm_event import LlmEventPayload + from tests._helpers.typed_event import make_failed_event client = InMemoryLangfuseClient() observer = LangfuseObserver(client=client) @@ -1419,7 +1410,7 @@ async def test_llm_error_event_parents_under_branch_calling_node() -> None: # Bootstrap the Trace + two branch-distinguished node # observations directly. _InvState's open_observations map is # keyed by (namespace, attempt_index, fan_out_index, - # branch_name); the calling node identity on the error payload + # branch_name); the calling node identity on the typed event # is (("dispatcher", "ask"), 0, None, "fast"). client.trace(id=invocation_id, name="dispatcher") observer._inv_states[invocation_id] = _InvState(trace_id=invocation_id) # noqa: SLF001 @@ -1431,27 +1422,21 @@ async def test_llm_error_event_parents_under_branch_calling_node() -> None: slow_key = (("dispatcher", "ask"), 0, None, "slow") inv_state.open_observations[fast_key] = _OpenObservation(handle=fast_handle) inv_state.open_observations[slow_key] = _OpenObservation(handle=slow_handle) - completed = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="completed", - pre_state=LlmEventPayload( - call_id="cc-pb", + await observer( + make_failed_event( + invocation_id=invocation_id, + node_name="ask", + namespace=("dispatcher", "ask"), + attempt_index=0, + fan_out_index=None, + branch_name="fast", model="m-test", - error_type="ProviderUnavailable", error_category="provider_unavailable", + error_type="ProviderUnavailable", error_message="503 from upstream", - calling_namespace_prefix=("dispatcher", "ask"), - calling_attempt_index=0, - calling_fan_out_index=None, - calling_branch_name="fast", - ), - post_state=None, - error=None, - parent_states=(), + call_id="cc-pb", + ) ) - await observer(completed) finally: _reset_invocation_id(token) diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index cc4c556..d7fdfc0 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -803,53 +803,31 @@ async def test_disable_llm_spans_skips_typed_event_path() -> None: assert llm_spans == [] -async def test_llm_error_path_emits_error_span_from_sentinel_completed() -> None: - # Failure-path LLM calls don't emit the typed LlmCompletionEvent - # (per proposal 0049 §3 alternative 3). The OTel observer keeps - # the sentinel-pair error path alive so error-status spans still - # fire. Sentinel started + non-error completed are no-ops; only - # completed-with-error_type produces a span. +async def test_llm_error_path_emits_error_span_from_typed_failed_event() -> None: + # Per proposal 0058: failures emit a typed LlmFailedEvent. The + # OTel observer drives the same openarmature.llm.complete span + # shape with ERROR status + openarmature.error.category attribute. from opentelemetry.trace import StatusCode - from openarmature.graph.events import NodeEvent from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, ) - from openarmature.observability.llm_event import LlmEventPayload + from tests._helpers.typed_event import make_failed_event exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) token = _set_invocation_id("inv-err") try: - started = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="started", - pre_state=LlmEventPayload(call_id="cc-err", model="test-m"), - post_state=None, - error=None, - parent_states=(), - ) - completed = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="completed", - pre_state=LlmEventPayload( - call_id="cc-err", - model="test-m", + await observer( + make_failed_event( + invocation_id="inv-err", + error_category="provider_rate_limit", error_type="ProviderRateLimit", - error_category="provider_rate_limited", error_message="429 from upstream", - ), - post_state=None, - error=None, - parent_states=(), + call_id="cc-err", + ) ) - await observer(started) - await observer(completed) finally: _reset_invocation_id(token) observer.shutdown() @@ -858,7 +836,7 @@ async def test_llm_error_path_emits_error_span_from_sentinel_completed() -> None span = llm_spans[0] assert span.status.status_code == StatusCode.ERROR attrs = dict(span.attributes or {}) - assert attrs.get("openarmature.error.category") == "provider_rate_limited" + assert attrs.get("openarmature.error.category") == "provider_rate_limit" # ---------------------------------------------------------------------------