diff --git a/CHANGELOG.md b/CHANGELOG.md index c996e41..5e0c68e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,39 @@ All notable changes to `openarmature-python` are documented in this file. The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The package follows [Semantic Versioning](https://semver.org/); pre-1.0 minor bumps may carry behavioral changes per [spec governance](https://github.com/LunarCommand/openarmature-spec/blob/main/GOVERNANCE.md). +## [Unreleased] + +LLM-provider span payload and GenAI semconv release. Pinned spec +jumps from v0.16.1 to v0.17.0 (proposal 0024 / observability §5.5 +expansion). The trigger was a friction report from a downstream +agent integrating OA with Langfuse over OTLP: LLM spans rendered +"naked" (model + tokens only), prompt linkage silently dropped at +the dispatch-worker task boundary, and every backend needed a +per-service attribute-mapping shim. This release clears all eight +items in that report. + +### Added + +- **`openarmature.llm.input.messages` / `openarmature.llm.output.content` / `openarmature.llm.request.extras` span attributes (spec §5.5.1).** When the OTel observer is constructed with `disable_llm_payload=False`, LLM spans carry the messages sent, the assistant response content, and the `RuntimeConfig` extras bag — JSON-encoded with sorted keys, no insignificant whitespace, UTF-8. Default-off (the flag is `disable_llm_payload: bool = True`) because the payload may contain PII the user hasn't audited; opt in deliberately. Subject to the §5.5.5 truncation contract. +- **GenAI semantic-conventions attributes (spec §5.5.2 + §5.5.3).** LLM spans now carry `gen_ai.system`, `gen_ai.request.model`, `gen_ai.response.model`, `gen_ai.usage.input_tokens`, `gen_ai.usage.output_tokens`, `gen_ai.response.finish_reasons` (single-element string array), `gen_ai.response.id`, and per-set `gen_ai.request.{temperature,max_tokens,top_p,seed}` (only set fields — absence is meaningful per §5.5.2). The existing `openarmature.llm.*` attribute set is preserved alongside; both namespaces emit. Default-on (`disable_genai_semconv: bool = False`); opt out when an external auto-instrumentation library (OpenInference, opentelemetry-instrumentation-openai, etc.) is the canonical source of GenAI attributes for your stack. +- **`OTelObserver(resource=...)` constructor argument.** Optional `opentelemetry.sdk.resources.Resource` passed to the private `TracerProvider`. Lets callers set `service.name` / `service.version` directly rather than via `OTEL_SERVICE_NAME` / `OTEL_RESOURCE_ATTRIBUTES` environment variables (which had to be set BEFORE constructing the observer to take effect — a footgun the explicit kwarg avoids). +- **Multi-processor support on `OTelObserver`.** The `span_processor` constructor argument now accepts a `SpanProcessor | Sequence[SpanProcessor]`. Multi-destination export (e.g., HyperDX + Langfuse on one observer) becomes a one-line constructor call instead of a per-service `CompoundSpanProcessor` workaround. +- **`OTelObserver(attribute_enrichers=...)` hook.** Sequence of `Callable[[Span, NodeEvent | None], None]` invoked just before the observer ends each span. Lets users add backend-specific attributes (custom `langfuse.*` keys, vendor span kinds, etc.) without subclassing or mutating `span._attributes` post-`on_end`. The event is `None` on synthetic close sites (subgraph dispatch, detached root, fan-out instance, invocation span, shutdown drain); enrichers that need per-event context short-circuit on `None`. Exceptions are caught and warned, never propagated to the dispatch worker. +- **`OTelObserver(payload_max_bytes=...)` truncation cap.** Per-attribute byte cap for the §5.5.1 payload attributes. Default 65,536 (64 KiB) per attribute; minimum 256 bytes (rejected at construction). The truncation algorithm (spec §5.5.5) emits the largest UTF-8 code-point-aligned prefix that fits within `cap - len(marker)` bytes followed by the marker `…[truncated, M bytes total]`. Inline image bytes are unconditionally redacted at the provider before any cap applies (see Image redaction below). +- **`OpenAIProvider(genai_system="openai")` constructor argument.** Default `"openai"`; override for non-OpenAI endpoints that speak the OpenAI Chat Completions wire format (vLLM, LM Studio, llama.cpp, sglang). Surfaces as the `gen_ai.system` span attribute. No base-URL sniffing happens — the same host:port could be any of several servers, and a wrong inference is worse than the explicit opt-in. +- **`openarmature.observability.LLM_NAMESPACE` and `openarmature.observability.LlmEventPayload` public exports.** The `("openarmature.llm.complete",)` sentinel namespace used by the LLM-provider hook and the payload shape backend observers consume. Third-party `Provider` implementations can dispatch their own LLM events via `current_dispatch()(NodeEvent(..., namespace=LLM_NAMESPACE, pre_state=LlmEventPayload(...)))`; custom observers can recognize the same sentinel and read attributes off the payload. Previously private (`_LLM_NAMESPACE`, `_LlmEventState`); the old underscore-prefixed names are no longer exported. +- **`Response.response_id` and `Response.response_model` typed fields.** Mirror the wire response's `id` and `model` fields when the provider returns them. Surface as `gen_ai.response.id` and `gen_ai.response.model` per spec §5.5.3; also useful for downstream cross-referencing with provider-side billing or audit logs without reaching into `Response.raw`. + +### Changed + +- **Prompt-context attribute propagation now survives the dispatch-worker task boundary.** Previously the OTel observer read `current_prompt_result()` / `current_prompt_group()` from inside `_handle_llm_event`, which runs in the engine's delivery-worker task. `asyncio.create_task(deliver_loop(queue))` snapshots the current Context at task creation, before any node body runs — so the ContextVars set by `with_active_prompt(...)` were never visible to the worker. `openarmature.prompt.*` attributes silently went missing on the LLM span. Fixed by capturing both ContextVars at dispatch time inside the `OpenAIProvider.complete()` call (which runs in the node task, where `with_active_prompt` IS active) and threading the snapshots through the `LlmEventPayload`. The observer reads from the payload, not the ContextVar. +- **Inline image bytes are redacted at the provider, not the observer.** Image content blocks with `ImageSourceInline` are serialized with `source` replaced by `{type: "inline_redacted", byte_count: N}` per §5.5.5 *before* the payload reaches the observability dispatch queue. Defense-in-depth: bytes never leave the provider in event form, so custom observers subscribing to the LLM event (enabled by `LlmEventPayload` being public) cannot accidentally leak raw image bytes regardless of their implementation. `media_type` and `detail` are preserved at the image-block level per llm-provider §3.1.2. URL-form images pass through unchanged. +- **`OTelObserver.shutdown()` docstring documents the `BatchSpanProcessor` flush gotcha.** Under fast or unusual teardown orderings (e.g., FastAPI TestClient teardown that closes the event loop before the batch processor's export thread finishes), spans can appear dropped. Documented workarounds: call `provider.force_flush(timeout_millis=…)` explicitly before `shutdown()`, or use `SimpleSpanProcessor` in tests. + +### Notes + +- **Pinned spec version bumped to v0.17.0.** Per the additive-only governance rule (proposal 0024 adds; never renames), implementations passing v0.16.1 conformance fixtures continue to pass under v0.17.0; the new fixtures (012-021) add cases without modifying existing ones. + ## [0.7.0] — 2026-05-23 Docs-and-examples release. Pinned spec stays at v0.16.1; no diff --git a/README.md b/README.md index 76c884f..6700bb2 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,9 @@ The engine awaits each save before advancing. A crash immediately after a `compl **Observability that doesn't double-export.**
The OpenTelemetry mapping mandates a private `TracerProvider`. That prevents the trap where global-provider auto-instrumentation libraries (OpenInference, Langfuse v3, etc.) emit duplicate spans alongside the framework's. Your spans flow exactly where you point them; no surprise fan-out to vendor backends you didn't configure. +**LLM spans LLM-aware backends can actually read.**
+Each `provider.complete()` call emits a dedicated `openarmature.llm.complete` span carrying both the framework's `openarmature.llm.*` attributes and the cross-vendor OpenTelemetry GenAI semantic conventions (`gen_ai.system`, `gen_ai.request.*`, `gen_ai.response.*`, `gen_ai.usage.*`). Langfuse, Phoenix, Honeycomb's LLM lens — they render generations correctly out of the box, no per-service attribute-mapping shim required. Input/output payload emission is opt-in (`disable_llm_payload=False`), default-off because the payload may contain PII; image bytes are unconditionally redacted at the provider so they never enter the observability stream. + ## Hello World About a hundred lines that show the engine in action. Three reducer policies declared on one state class. Three LLM calls each returning typed structured output (Pydantic class on two, raw JSON Schema dict on the third). Conditional routing as a pure function of state, not a hidden state machine. An observer attached at compile time that sees every node boundary the engine emits. Requires Python 3.12 or later and an OpenAI-compatible endpoint (defaults to OpenAI public API; works against any local server too). diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 12027bd..02faea2 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -333,3 +333,226 @@ join semantics survive even when trace boundaries don't. The non-detached default is what you want most of the time: one trace per outermost invocation, with subgraphs and fan-out instances as nested spans. + +### LLM provider spans + +When an `OpenAIProvider` (or any [custom Provider](../model-providers/authoring.md) +that wires the dispatch hook) is used inside a graph with `OTelObserver` +attached, each `provider.complete()` call emits a dedicated span named +`openarmature.llm.complete`, parented under the calling node's span. +The span carries two attribute families. + +**`openarmature.llm.*` (always on).** The framework's canonical +namespace: model identifier, finish reason, token counts, prompt +identity from `with_active_prompt(...)`, error category on failure. +Set unconditionally whenever the LLM span itself emits. + +**`gen_ai.*` (OpenTelemetry GenAI semantic conventions, default on).** +Cross-vendor attribute names every LLM-aware backend reads +(Langfuse, Phoenix, Honeycomb's LLM lens, OpenInference-aware +tools). Emitted alongside the OA namespace: + +- `gen_ai.system` — `"openai"` by default; override per provider + instance to `"vllm"` / `"lm_studio"` / `"llama_cpp"` / etc. when + the OpenAI Chat Completions wire format is hitting a non-OpenAI + endpoint: + + ```python + provider = OpenAIProvider( + base_url="http://vllm.internal:8000", + model="meta-llama/Llama-3-8B-Instruct", + genai_system="vllm", + ) + ``` + +- `gen_ai.request.model` / `gen_ai.response.model` — the bound + model and (when the provider returns one) the more-specific + identifier in the response body. +- `gen_ai.request.temperature` / `max_tokens` / `top_p` / `seed` + — only emitted for fields the caller actually set; absence on + the span means "not supplied," distinct from a zero value. +- `gen_ai.usage.input_tokens` / `output_tokens` — token counts. +- `gen_ai.response.finish_reasons` — single-element string array. +- `gen_ai.response.id` — when the provider returns one. + +Disable the GenAI semconv set with `OTelObserver(disable_genai_semconv=True)` +when an external auto-instrumentation library (OpenInference, +`opentelemetry-instrumentation-openai`) is already the canonical +source on your stack. + +### LLM payload attributes + +By default, LLM spans do **not** carry the messages sent or the +response content. Opt in with `disable_llm_payload=False`: + +```python +observer = OTelObserver( + span_processor=SimpleSpanProcessor(exporter), + disable_llm_payload=False, +) +``` + +This surfaces three attributes: + +- `openarmature.llm.input.messages` — JSON-encoded message array + (the spec §3 message shape: `{role, content, tool_calls?, …}`). +- `openarmature.llm.output.content` — the assistant's response + content string verbatim. Omitted for tool-call-only responses + with empty content. +- `openarmature.llm.request.extras` — JSON-encoded `RuntimeConfig` + extras bag (provider-specific pass-through fields like + `frequency_penalty`). Omitted when empty. + +**Default-off is deliberate.** The payload may contain PII the user +hasn't audited; opting in is a separate decision from opting into +observability. The flag name keeps symmetry with `disable_llm_spans`: +the default value (`True`) reads as "the observer disables payload +emission by default." + +#### Truncation + +Each payload attribute is capped at `payload_max_bytes` UTF-8 bytes +(default 64 KiB, minimum 256). When the serialized value exceeds the +cap, the observer emits the largest UTF-8-code-point-aligned prefix +that fits within `cap - len(marker)` bytes followed by the marker: + +``` +…[truncated, M bytes total] +``` + +where M is the pre-truncation byte length. The marker is appended +outside any JSON encoding — a truncated attribute is *not* parseable +JSON, which is the clean signal backend code can use to detect +truncation without a separate flag. + +#### Inline image redaction (always on) + +Image content blocks with `ImageSourceInline` are redacted at the +provider, *before* the payload reaches the observer: + +```json +{ + "type": "image", + "source": {"type": "inline_redacted", "byte_count": 4096}, + "media_type": "image/png", + "detail": "auto" +} +``` + +The `media_type` and `detail` fields are preserved at the image-block +level (per llm-provider §3.1.2); only `source` is replaced. URL-form +images pass through unchanged — the URL is a short string and is +informative for trace readers. + +Redaction is **not** gated by `disable_llm_payload` and is **not** +configurable. Inline image bytes never leave the provider in event +form, so custom observers consuming +[`LlmEventPayload`](#publishing-llm-events-for-custom-observers) +cannot accidentally leak raw bytes regardless of how they're +written. + +### Identifying the service: `Resource` + +Pass an `opentelemetry.sdk.resources.Resource` to set +`service.name` / `service.version` / etc. without relying on the +`OTEL_SERVICE_NAME` / `OTEL_RESOURCE_ATTRIBUTES` environment +variables (which had to be set *before* `OTelObserver()` +construction to take effect): + +```python +from opentelemetry.sdk.resources import Resource + +observer = OTelObserver( + span_processor=SimpleSpanProcessor(exporter), + resource=Resource.create({"service.name": "claims-pipeline"}), +) +``` + +### Fanning out to multiple backends + +The `span_processor` argument accepts either a single processor or +a sequence. Multi-destination export (HyperDX + Langfuse from one +observer) is a one-line construct: + +```python +observer = OTelObserver( + span_processor=[ + BatchSpanProcessor(OTLPSpanExporter(endpoint=HYPERDX_URL)), + BatchSpanProcessor(OTLPSpanExporter(endpoint=LANGFUSE_URL)), + ], +) +``` + +Every registered processor receives every span. + +### Adding backend-specific attributes: `attribute_enrichers` + +When a backend needs attributes the framework doesn't emit +(custom `langfuse.observation.*` keys, Honeycomb derived fields, +etc.), the `attribute_enrichers` hook fires just before every +`span.end()` call: + +```python +def langfuse_observation_kind(span, event): + if span.name == "openarmature.llm.complete": + span.set_attribute("langfuse.observation.type", "generation") + +observer = OTelObserver( + span_processor=processor, + attribute_enrichers=[langfuse_observation_kind], +) +``` + +Each enricher receives the live `Span` plus the `NodeEvent` that +triggered the close (or `None` on synthetic close sites — subgraph +dispatch, detached root, fan-out instance, invocation span, +shutdown drain). Setting attributes inside this hook works +correctly; doing it from a `SpanProcessor.on_end` callback does +not, because the framework has already called `span.end()` and the +OTel SDK silently drops `set_attribute` on ended spans. + +Exceptions raised by an enricher are caught and warned, never +propagated. + +### Publishing LLM events for custom observers + +`openarmature.observability.LLM_NAMESPACE` and +`openarmature.observability.LlmEventPayload` are part of the public +API. A custom observer subscribing to the dispatch stream can +recognize the LLM-event sentinel namespace and read the typed +payload directly: + +```python +from openarmature.observability import LLM_NAMESPACE, LlmEventPayload + +async def my_llm_observer(event): + if event.namespace != LLM_NAMESPACE: + return + payload = event.pre_state + if not isinstance(payload, LlmEventPayload): + return + # payload.model, payload.input_messages (already image-redacted), + # payload.output_content, payload.request_params, + # payload.response_id, payload.active_prompt, ... +``` + +A custom `Provider` that wants to participate in the same span +emission protocol dispatches `NodeEvent(namespace=LLM_NAMESPACE, +pre_state=LlmEventPayload(...))` via `current_dispatch()`. See +[Authoring providers](../model-providers/authoring.md) for the +full pattern. + +### Flushing under fast teardown + +`OTelObserver.shutdown()` calls `provider.shutdown()` on the private +`TracerProvider`, which per OTel SDK contract flushes every +registered span processor. Under unusual teardown orderings — for +example, FastAPI's `TestClient` teardown that closes the event loop +before a `BatchSpanProcessor`'s export thread finishes — spans can +appear dropped. Two workarounds: + +- Call `observer._provider.force_flush(timeout_millis=...)` + explicitly before `shutdown()`. +- Use `SimpleSpanProcessor` instead of `BatchSpanProcessor` in + tests; it exports synchronously and is unaffected by teardown + timing. diff --git a/docs/examples/07-multimodal-prompt.md b/docs/examples/07-multimodal-prompt.md index 4838f5a..d9c0a11 100644 --- a/docs/examples/07-multimodal-prompt.md +++ b/docs/examples/07-multimodal-prompt.md @@ -41,14 +41,22 @@ The image source can be a URL (default) or a local file. Setting content blocks (`TextBlock` + `ImageBlock`), and the two image sources (`ImageSourceURL` for hosted images, `ImageSourceInline` with base64 data for local files). +- [`OTelObserver`](../concepts/observability.md) wired with a + console exporter so the prompt-context attributes the + `with_active_prompt*` scopes propagate (and the cross-vendor + `gen_ai.*` attributes) actually surface on the + `openarmature.llm.complete` spans printed to stdout. ## How to run ```bash -uv sync --group examples +uv sync --group examples --all-extras LLM_API_KEY=sk-... uv run python examples/07-multimodal-prompt/main.py ``` +(`--all-extras` pulls in `opentelemetry-sdk` for the OTel observer +that surfaces the prompt-context attributes.) + To use a local file instead of the default URL: ```bash @@ -112,13 +120,23 @@ Lunar-mission image analysis (surface + equipment) use this to correlate LLM-call spans to specific template versions. - **`group: lunar-image-analysis`** is the group identifier from - `PromptGroup`. Inside the `with_active_prompt_group` scope, OTel - observers would stamp this on every LLM-call span fired by either - node. + `PromptGroup`. Inside the `with_active_prompt_group` scope, the + attached OTel observer stamps `openarmature.prompt.group_name` on + every LLM-call span. The console exporter prints those spans + alongside the human-readable output above — search the JSON blobs + for `openarmature.prompt.group_name` to confirm. - **Per-call prompt scope**. The inner `with_active_prompt(rendered)` block adds the per-call identifiers (name, version, label, template_hash, rendered_hash) on top of the group identifier. Two - layers, both stamped on the same span. + layers, both stamped on the same span (visible as + `openarmature.prompt.*` attributes on the + `openarmature.llm.complete` spans in the console output). +- **GenAI semantic conventions**. The same LLM spans also carry + the cross-vendor `gen_ai.*` attributes (`gen_ai.system`, + `gen_ai.request.model`, `gen_ai.response.model`, + `gen_ai.usage.{input,output}_tokens`, etc.) — Langfuse, Phoenix, + or Honeycomb's LLM lens would render the generation correctly + without any per-service attribute-mapping shim. - **Fallback path** isn't visible in a clean run because the primary backend serves both prompts. To observe it, point the primary at a directory missing one of the templates and re-run; diff --git a/docs/examples/index.md b/docs/examples/index.md index 4bdd2c8..c961b73 100644 --- a/docs/examples/index.md +++ b/docs/examples/index.md @@ -66,7 +66,7 @@ server expects. # Install the examples dep group. uv sync --group examples -# Demo 03 also wants the OTel SDK for its OTelObserver. +# Demos 03 and 07 also want the OTel SDK for their OTelObserver. uv sync --group examples --all-extras # Run any demo. diff --git a/docs/model-providers/authoring.md b/docs/model-providers/authoring.md index 28931cb..495214e 100644 --- a/docs/model-providers/authoring.md +++ b/docs/model-providers/authoring.md @@ -218,6 +218,69 @@ of: `openarmature.llm` to share the provider-agnostic boundary checks. - **Observability spans.** Opt-in `started`/`completed` events around the wire call so the OTel observer can build LLM spans. + Dispatch via the public `openarmature.observability.LLM_NAMESPACE` + sentinel and the typed `LlmEventPayload`. The sketch below is + what lives around the wire call inside `complete()`; the + `OpenAIProvider`'s `_make_llm_event` helper is the reference + implementation: + + ```python + import uuid + from typing import Any + + from openarmature.graph.events import NodeEvent + from openarmature.observability import LLM_NAMESPACE, LlmEventPayload + from openarmature.observability.correlation import current_dispatch + + + def dispatch_llm_event( + phase: str, + *, + call_id: str, + model: str, + **extra: Any, + ) -> None: + """Emit one half of the started/completed pair. The same + ``call_id`` MUST appear on both events so observers can match + them under concurrency.""" + dispatch = current_dispatch() + if dispatch is None: + return + dispatch(NodeEvent( + node_name="openarmature.llm.complete", + namespace=LLM_NAMESPACE, + step=-1, + phase=phase, + pre_state=LlmEventPayload( + call_id=call_id, + model=model, + genai_system="my-provider", + **extra, + ), + post_state=None, + error=None, + parent_states=(), + )) + + + # Inside Provider.complete(), the call_id is minted once per call: + call_id = str(uuid.uuid4()) + dispatch_llm_event("started", call_id=call_id, model="my-model") + # ... wire call here, populating finish_reason / usage / output ... + dispatch_llm_event( + "completed", + call_id=call_id, + model="my-model", + finish_reason="stop", + ) + ``` + + Inline image bytes MUST be redacted in the provider's + serialization step before reaching the payload (see + [Observability — Inline image + redaction](../concepts/observability.md#inline-image-redaction-always-on)) + so custom observers consuming `LlmEventPayload` cannot leak raw + bytes. - **Lenient response parsing** under `finish_reason="error"`. Degraded responses surface what they can; tool-call arguments that fail to parse populate `arguments=None` instead of raising. diff --git a/examples/03-observer-hooks/main.py b/examples/03-observer-hooks/main.py index 146e387..8c98a01 100644 --- a/examples/03-observer-hooks/main.py +++ b/examples/03-observer-hooks/main.py @@ -45,6 +45,7 @@ from collections.abc import Mapping from typing import Annotated, Any +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor from pydantic import Field @@ -304,8 +305,25 @@ async def main() -> None: # provider here is PRIVATE to the observer; the global # TracerProvider is untouched, so this won't pollute any OTel # setup the surrounding application already has. + # + # ``resource`` stamps ``service.name`` on every emitted span so + # downstream backends (Honeycomb, Tempo, HyperDX, Langfuse) can + # filter by service. Setting it on the observer is the explicit + # path — the OTel SDK alternative (reading the + # ``OTEL_SERVICE_NAME`` / ``OTEL_RESOURCE_ATTRIBUTES`` env vars) + # has to be set BEFORE the observer constructs, which is easy + # to get wrong. + # + # LLM-call spans (one per ``provider.complete()``) carry the + # OpenTelemetry GenAI semantic conventions automatically: + # ``gen_ai.system``, ``gen_ai.request.model``, + # ``gen_ai.response.{model,id,finish_reasons}``, + # ``gen_ai.usage.{input,output}_tokens``. Cross-vendor backends + # (Langfuse, Phoenix, Honeycomb's LLM lens) render them + # correctly without a per-service attribute-mapping shim. otel_observer = OTelObserver( span_processor=SimpleSpanProcessor(ConsoleSpanExporter()), + resource=Resource.create({"service.name": "openarmature-demo-answers"}), ) graph = build_graph() diff --git a/examples/07-multimodal-prompt/main.py b/examples/07-multimodal-prompt/main.py index 697e559..d023f34 100644 --- a/examples/07-multimodal-prompt/main.py +++ b/examples/07-multimodal-prompt/main.py @@ -37,13 +37,16 @@ renders, no asymmetric "first call computes the second's input" shape. - ``with_active_prompt_group(group)`` propagates the group name via - ContextVar; OTel observers stamp ``openarmature.prompt.group_name`` - onto every LLM-call span fired inside. + ContextVar; the attached ``OTelObserver`` stamps + ``openarmature.prompt.group_name`` onto every LLM-call span fired + inside the block. Confirm in the console output — the two + ``openarmature.llm.complete`` spans both carry + ``openarmature.prompt.group_name = "lunar-image-analysis"``. - ``with_active_prompt(result)`` (inside the group's scope) propagates the per-call prompt identifiers — name, version, label, - template_hash, rendered_hash. The two layers compose: spans inside - the group see both the group identifier AND the per-call prompt - identifiers. + template_hash, rendered_hash. The two layers compose: each LLM + span carries the group identifier AND the per-call prompt + identifiers. The console output makes both visible. - The rendered text becomes a ``TextBlock`` inside a multimodal ``UserMessage``; the image is a sibling ``ImageBlock``. The image source is ``ImageSourceURL(url=...)`` by default; setting @@ -62,10 +65,12 @@ Run with: - uv sync --group examples + uv sync --group examples --all-extras cd examples/07-multimodal-prompt LLM_API_KEY=sk-... uv run python main.py LLM_API_KEY=sk-... IMAGE_PATH=./my-photo.jpg uv run python main.py + +(``--all-extras`` pulls in ``opentelemetry-sdk`` for the OTel observer.) """ from __future__ import annotations @@ -77,6 +82,8 @@ from pathlib import Path from typing import Annotated, Any +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor from pydantic import Field from openarmature.graph import ( @@ -95,6 +102,7 @@ TextBlock, UserMessage, ) +from openarmature.observability.otel import OTelObserver from openarmature.prompts import ( FilesystemPromptBackend, PromptGroup, @@ -329,13 +337,27 @@ async def main() -> None: members=[surface_member, equipment_member], ) + # Attach an OTel observer with a console exporter so the prompt- + # context attributes the ``with_active_prompt`` / ``_group`` + # blocks below propagate become visible. Every LLM-call span + # printed to stdout will carry ``openarmature.prompt.group_name`` + # (from the group context) plus the per-call + # ``openarmature.prompt.{name, version, label, template_hash, + # rendered_hash}`` attributes. A production setup would point a + # ``BatchSpanProcessor`` at a real OTLP endpoint instead. + otel_observer = OTelObserver( + span_processor=SimpleSpanProcessor(ConsoleSpanExporter()), + resource=Resource.create({"service.name": "openarmature-demo-multimodal"}), + ) + graph = build_graph() + graph.attach_observer(otel_observer) try: # ``with_active_prompt_group`` propagates the group_name to # observers for the duration of the invoke. Inside the nodes, # ``with_active_prompt`` adds the per-call prompt identifiers # alongside it — both layers stamp attributes on the same - # LLM-call span. + # LLM-call span. The OTel observer above captures both. with with_active_prompt_group(group): final = await graph.invoke( AnalysisState( diff --git a/openarmature-spec b/openarmature-spec index 19b3e0c..efc0bff 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit 19b3e0c81480c1c974e8520322ddf5ba7abc8286 +Subproject commit efc0bff19d9feca3e176db5e367beecef5c6dc69 diff --git a/pyproject.toml b/pyproject.toml index 46931e1..f726300 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ Repository = "https://github.com/LunarCommand/openarmature-python" Specification = "https://github.com/LunarCommand/openarmature-spec" [tool.openarmature] -spec_version = "0.16.1" +spec_version = "0.17.0" [dependency-groups] dev = [ diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index 2bb00be..a145c88 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -1,4 +1,4 @@ """OpenArmature: workflow framework for LLM pipelines and tool-calling agents.""" __version__ = "0.7.0" -__spec_version__ = "0.16.1" +__spec_version__ = "0.17.0" diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index e2e1745..8a08b27 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -52,13 +52,14 @@ from pydantic import BaseModel, ValidationError from openarmature.graph.events import NodeEvent -from openarmature.graph.state import State from openarmature.observability.correlation import ( current_attempt_index, current_dispatch, current_fan_out_index, current_namespace_prefix, ) +from openarmature.observability.llm_event import LlmEventPayload +from openarmature.prompts.context import current_prompt_group, current_prompt_result from ..errors import ( LlmProviderError, @@ -113,6 +114,7 @@ def __init__( transport: httpx.AsyncBaseTransport | None = None, timeout: float = 60.0, force_prompt_augmentation_fallback: bool = False, + genai_system: str = "openai", ) -> None: self.base_url = base_url.rstrip("/") self.model = model @@ -122,6 +124,15 @@ def __init__( # servers (some vLLM/LM Studio/llama.cpp versions) that reject # or silently ignore response_format. self._force_prompt_augmentation_fallback = force_prompt_augmentation_fallback + # ``genai_system`` surfaces as the ``gen_ai.system`` span attribute + # per observability §5.5.3. The OpenAI Chat Completions wire format + # is the de facto standard for vLLM, LM Studio, llama.cpp, + # sglang, etc. — callers using this provider against a non-OpenAI + # endpoint pass the appropriate identifier (e.g. ``"vllm"``). + # No base_url-sniffing happens: the same host:port could be any of + # those servers, and a wrong inference is worse than the explicit + # opt-in. + self._genai_system = genai_system self._headers: dict[str, str] = {"Content-Type": "application/json"} if api_key is not None: self._headers["Authorization"] = f"Bearer {api_key}" @@ -277,14 +288,58 @@ async def complete( # constant ``("openarmature.llm.complete",)`` sentinel. dispatch = current_dispatch() call_id = str(uuid.uuid4()) + # Capture prompt context AT DISPATCH TIME (in the node task's + # context). The delivery worker (asyncio.create_task'd at + # ``invoke()`` entry, before any node body runs) has a stale + # ContextVar snapshot — reading ``current_prompt_result()`` + # from inside the observer in the worker task returns ``None`` + # even when a node body opened a ``with_active_prompt`` block. + # Snapshot here; the observer reads from the event payload. + active_prompt = current_prompt_result() + active_prompt_group = current_prompt_group() + # Payload data the §5.5.1 / §5.5.2 / §5.5.3 attributes are + # sourced from. Image redaction (per §5.5.5) happens inside + # ``_serialize_messages_for_payload`` — image bytes never + # leave the provider in event form. ``input_messages`` mirrors + # the messages list the caller supplied; the wire-side body + # may be augmented (schema directive on the fallback path), + # but the OBSERVED messages are the spec-§3 logical inputs. + serialized_messages = _serialize_messages_for_payload(messages) + request_params = _request_params_from_config(config) + request_extras = _request_extras_from_config(config) if dispatch is not None: - dispatch(_make_llm_event("started", call_id=call_id, model=self.model)) + dispatch( + _make_llm_event( + "started", + call_id=call_id, + model=self.model, + genai_system=self._genai_system, + input_messages=serialized_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + ) + ) try: response = await self._do_complete(body, schema_dict, schema_class) except Exception as exc: if dispatch is not None: - dispatch(_make_llm_event("completed", call_id=call_id, model=self.model, error=exc)) + dispatch( + _make_llm_event( + "completed", + call_id=call_id, + model=self.model, + genai_system=self._genai_system, + error=exc, + input_messages=serialized_messages, + request_params=request_params, + request_extras=request_extras, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + ) + ) raise if dispatch is not None: @@ -293,8 +348,17 @@ async def complete( "completed", call_id=call_id, model=self.model, + genai_system=self._genai_system, finish_reason=response.finish_reason, usage=response.usage, + input_messages=serialized_messages, + output_content=response.message.content or None, + request_params=request_params, + request_extras=request_extras, + response_id=response.response_id, + response_model=response.response_model, + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, ) ) return response @@ -451,12 +515,23 @@ def _parse_response( if schema_dict is not None and finish_reason_typed != "tool_calls": parsed = _parse_and_validate(assistant_msg.content, schema_dict, schema_class) + # gen_ai.response.id / gen_ai.response.model semconv (spec + # §5.5.3) read these off the Response. The wire fields are + # optional — providers MAY omit either or both. ``None`` when + # absent or not a string. + response_id_raw = payload.get("id") + response_id: str | None = response_id_raw if isinstance(response_id_raw, str) else None + response_model_raw = payload.get("model") + response_model: str | None = response_model_raw if isinstance(response_model_raw, str) else None + return Response( message=assistant_msg, finish_reason=finish_reason_typed, usage=usage, raw=payload, parsed=parsed, + response_id=response_id, + response_model=response_model, ) @@ -929,80 +1004,118 @@ def _looks_like_model_not_loaded(message: object) -> bool: # --------------------------------------------------------------------------- -class _LlmEventState(State): - """Typed payload for LLM-provider span events. Subclasses - :class:`openarmature.graph.state.State` so the - ``NodeEvent.pre_state: State`` contract holds — observers - calling ``event.pre_state.model_dump()`` (or any other - Pydantic-on-State method) work without the raw-dict overload - that previously violated the schema. - - Backend mappings (the OTel observer in this repo, future - Langfuse / Datadog adapters) recognize the - ``("openarmature.llm.complete",)`` namespace sentinel and read - these fields directly via attribute access. - - ``call_id`` is the per-call disambiguator: a UUIDv4 minted in - ``OpenAIProvider.complete`` and shared between the started / - completed event pair. Backend observers key their in-flight - LLM-span maps by it so concurrent ``complete()`` calls (e.g., - fan-out instances each calling the provider) don't collide on - a single sentinel-namespace key. - - ``calling_namespace_prefix``, ``calling_attempt_index``, and - ``calling_fan_out_index`` carry the calling node's identity so - the OTel observer can resolve the §5.5 "parent under calling - node" contract correctly under concurrent fan-out and retry. - Populated from the engine's ContextVars (set in - ``_step_*_node`` around node-body execution); fall back to - sentinel defaults (empty tuple, 0, ``None``) when the LLM - provider is called outside any node body. - """ +# Inline image sources are redacted in this step per observability +# §5.5.5: ImageSourceInline → {"type": "inline_redacted", +# "byte_count": N} where N is the byte length of the original base64 +# string. media_type stays at the image-block level per llm-provider +# §3.1.2; detail is preserved when present. +# +# Redaction lives here (provider-side) rather than observer-side so +# inline image bytes never leave the provider in event form — +# defense-in-depth that applies to every observer consuming the +# payload, not just OA's own. URL-form images pass through unchanged. +def _serialize_messages_for_payload(messages: Sequence[Message]) -> list[dict[str, Any]]: + """Render a list of typed :class:`Message` instances into the + plain-dict shape carried on ``LlmEventPayload.input_messages``.""" + out: list[dict[str, Any]] = [] + for msg in messages: + if isinstance(msg, SystemMessage): + out.append({"role": "system", "content": msg.content}) + elif isinstance(msg, UserMessage): + if isinstance(msg.content, str): + out.append({"role": "user", "content": msg.content}) + else: + rendered_blocks: list[dict[str, Any]] = [] + for block in msg.content: + if isinstance(block, TextBlock): + rendered_blocks.append({"type": "text", "text": block.text}) + else: # ImageBlock + # The ImageBlock validator already guarantees + # media_type when source is inline. + if isinstance(block.source, ImageSourceInline): + byte_count = len(block.source.base64_data) + source_record: dict[str, Any] = { + "type": "inline_redacted", + "byte_count": byte_count, + } + else: + source_record = {"type": "url", "url": block.source.url} + image_record: dict[str, Any] = { + "type": "image", + "source": source_record, + } + if block.media_type is not None: + image_record["media_type"] = block.media_type + if block.detail is not None: + image_record["detail"] = block.detail + rendered_blocks.append(image_record) + out.append({"role": "user", "content": rendered_blocks}) + elif isinstance(msg, AssistantMessage): + entry: dict[str, Any] = {"role": "assistant", "content": msg.content} + if msg.tool_calls: + entry["tool_calls"] = [ + {"id": tc.id, "name": tc.name, "arguments": tc.arguments} for tc in msg.tool_calls + ] + out.append(entry) + else: # ToolMessage + out.append({"role": "tool", "content": msg.content, "tool_call_id": msg.tool_call_id}) + return out - call_id: str - model: str - finish_reason: str | None = None - prompt_tokens: int | None = None - completion_tokens: int | None = None - total_tokens: int | None = None - # On error responses the provider caller doesn't have a - # graph-engine §4 ``RuntimeGraphError`` to put in - # ``NodeEvent.error``, so we surface the failure detail through - # these fields instead. ``error_category`` is the canonical §7 - # llm-provider category (``provider_unavailable``, etc.) when - # the failed exception carries one. - error_type: str | None = None - error_message: str | None = None - error_category: str | None = None - # Calling-node identity captured at dispatch time. The OTel - # observer reads these to look up the calling node's span in - # its (now-invocation_id-scoped) ``_open_spans`` map without relying on - # the OTel current-span context (which under concurrent fan-out - # can yield a sibling instance's span). - calling_namespace_prefix: tuple[str, ...] = () - calling_attempt_index: int = 0 - calling_fan_out_index: int | None = None +# Only set fields appear in the result. Absence is meaningful per +# observability §5.5.2: "the field was not supplied for this call" +# — distinct from "supplied with a zero value." +def _request_params_from_config(config: RuntimeConfig | None) -> dict[str, Any]: + """Extract the cross-vendor request parameters from a + ``RuntimeConfig`` for emission as ``gen_ai.request.*`` attributes.""" + if config is None: + return {} + out: dict[str, Any] = {} + if config.temperature is not None: + out["temperature"] = config.temperature + if config.max_tokens is not None: + out["max_tokens"] = config.max_tokens + if config.top_p is not None: + out["top_p"] = config.top_p + if config.seed is not None: + out["seed"] = config.seed + return out + +def _request_extras_from_config(config: RuntimeConfig | None) -> dict[str, Any]: + """Return the ``RuntimeConfig`` extras pass-through bag as a plain + dict; empty when no extras are set or when ``config`` is None.""" + if config is None: + return {} + return dict(config.model_extra or {}) + + +# call_id MUST be the same string on the started/completed pair so +# the observer can match them under concurrency. The OTel observer +# (or any backend mapping) recognises the sentinel node_name + +# namespace and emits an LLM-specific span instead of a node span; +# backend-specific attribute extraction reads payload fields from +# pre_state directly. def _make_llm_event( phase: Literal["started", "completed"], *, call_id: str, model: str, + genai_system: str, finish_reason: FinishReason | None = None, usage: Usage | None = None, error: BaseException | None = None, + input_messages: list[dict[str, Any]] | None = None, + output_content: str | None = None, + request_params: dict[str, Any] | None = None, + request_extras: dict[str, Any] | None = None, + response_id: str | None = None, + response_model: str | None = None, + active_prompt: Any = None, + active_prompt_group: Any = None, ) -> NodeEvent: - """Build a NodeEvent-shaped record for the engine's delivery - queue. The OTel observer (or any backend mapping) recognises the - sentinel ``node_name`` and ``namespace`` and emits an LLM-specific - span instead of a node span. Backend-specific attribute extraction - reads ``model``, ``finish_reason``, and ``usage`` from - ``pre_state`` directly via attribute access. - - ``call_id`` MUST be the same string on the started/completed - pair so the observer can match them under concurrency. - """ + """Build a ``NodeEvent``-shaped record for the engine's delivery + queue, populated as an ``openarmature.llm.complete`` event.""" error_type: str | None = None error_message: str | None = None error_category: str | None = None @@ -1012,7 +1125,7 @@ def _make_llm_event( category = getattr(error, "category", None) if isinstance(category, str): error_category = category - payload = _LlmEventState( + payload = LlmEventPayload( call_id=call_id, model=model, finish_reason=finish_reason, @@ -1025,6 +1138,15 @@ def _make_llm_event( calling_namespace_prefix=current_namespace_prefix(), calling_attempt_index=current_attempt_index(), calling_fan_out_index=current_fan_out_index(), + active_prompt=active_prompt, + active_prompt_group=active_prompt_group, + input_messages=input_messages, + output_content=output_content, + request_params=request_params, + request_extras=request_extras, + response_id=response_id, + response_model=response_model, + genai_system=genai_system, ) return NodeEvent( node_name="openarmature.llm.complete", diff --git a/src/openarmature/llm/response.py b/src/openarmature/llm/response.py index 89478b2..a256cec 100644 --- a/src/openarmature/llm/response.py +++ b/src/openarmature/llm/response.py @@ -93,6 +93,18 @@ class Response(BaseModel): usage: Usage raw: dict[str, Any] parsed: ParsedValue = None + # The provider's response id (e.g., OpenAI's ``chatcmpl-…``). + # Surface as a typed field rather than asking callers to reach into + # ``raw["id"]``; mirrors the gen_ai.response.id semconv attribute + # the observability mapping (spec §5.5.3) emits onto the LLM span. + # ``None`` when the provider didn't return one. + response_id: str | None = None + # The model identifier the provider returned (the ``model`` field + # on the response body). May be more specific than the bound + # request model — e.g., bound ``gpt-4o``, response carries + # ``gpt-4o-2024-08-06``. Mirrors gen_ai.response.model per §5.5.3. + # ``None`` when the provider didn't return one. + response_model: str | None = None class RuntimeConfig(BaseModel): diff --git a/src/openarmature/observability/__init__.py b/src/openarmature/observability/__init__.py index 03131d5..7a751a1 100644 --- a/src/openarmature/observability/__init__.py +++ b/src/openarmature/observability/__init__.py @@ -33,7 +33,17 @@ current_namespace_prefix, ) +# v0.17.0 (proposal 0024 / friction-roundup #9): publish the LLM event +# contract so third-party Provider implementations and custom observers +# can interoperate against a stable shape. Both names live in +# ``observability.llm_event`` — backend-agnostic — so importing the +# core observability package never drags the OTel backend (and its +# ``opentelemetry-sdk`` dependency) along. +from .llm_event import LLM_NAMESPACE, LlmEventPayload + __all__ = [ + "LLM_NAMESPACE", + "LlmEventPayload", "current_active_observers", "current_attempt_index", "current_correlation_id", diff --git a/src/openarmature/observability/llm_event.py b/src/openarmature/observability/llm_event.py new file mode 100644 index 0000000..aef4b17 --- /dev/null +++ b/src/openarmature/observability/llm_event.py @@ -0,0 +1,117 @@ +# Backend mappings (the OTel observer in this repo, future Langfuse / +# Datadog adapters) recognize the LLM_NAMESPACE sentinel and read these +# fields directly via attribute access. +# +# call_id is the per-call disambiguator: a UUIDv4 minted by the +# provider and shared across the started / completed event pair. +# Backend observers key their in-flight LLM-span maps by it so +# concurrent complete() calls (e.g., fan-out instances each calling +# the provider) don't collide on the single sentinel-namespace key. +# +# calling_namespace_prefix / calling_attempt_index / +# calling_fan_out_index carry the calling node's identity so the OTel +# observer can resolve §5.5 "parent under calling node" correctly +# under concurrent fan-out and retry. Populated from the engine's +# ContextVars at dispatch time; sentinel defaults when the provider +# is called outside any node body. +# +# active_prompt / active_prompt_group are dispatch-time snapshots of +# the prompts-context ContextVars (per friction-roundup #3). The +# delivery-worker task cannot read these ContextVars — its Context is +# snapshotted at invoke()-entry, before any node body opens a +# with_active_prompt block — so the snapshot has to travel on the +# payload. +# +# input_messages / output_content / request_params / request_extras +# source the §5.5.1 + §5.5.2 attributes. input_messages is the message +# list serialized to §3 plain-dict shape with ImageSourceInline already +# redacted (per §5.5.5 — inline bytes never leave the provider in +# event form, regardless of any observer-side flag). response_id / +# response_model source the §5.5.3 gen_ai.response.{id,model} +# attributes. genai_system sources gen_ai.system per §5.5.3 (default +# "openai"; overridable on the OpenAI-compatible provider). + +"""LLM event payload exchanged between providers and observability backends.""" + +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, ConfigDict + +# Sentinel namespace the LLM provider emits to signal "this is an LLM +# event, not a regular node event." Backend mappings (the OTel observer +# in this repo, future Langfuse / Datadog adapters) recognise this +# value on ``NodeEvent.namespace`` and route to their LLM-specific +# span path. Lives here rather than under ``otel/observer.py`` so the +# core observability package doesn't pull the OTel backend into its +# import chain — anyone consuming ``LlmEventPayload`` from a custom +# provider needs the namespace value too, and shouldn't have to +# install the ``[otel]`` extra just for the constant. +LLM_NAMESPACE: tuple[str, ...] = ("openarmature.llm.complete",) + + +# LlmEventPayload uses plain Pydantic BaseModel (not openarmature.graph.State) +# so importing it doesn't transitively load the entire graph package. +# That lets providers in openarmature.llm import this type cleanly even +# though graph.middleware.retry imports from openarmature.llm.errors — +# subclassing State would create a circular load order. NodeEvent.pre_state +# is typed Any (per the comment in graph.events) so the State-subclass +# constraint isn't load-bearing here. +class LlmEventPayload(BaseModel): + """Typed payload carried on ``NodeEvent.pre_state`` for the + ``openarmature.llm.complete`` event pair an LLM provider emits + around each ``complete()`` call. + + Observers subscribing to events with namespace + :data:`openarmature.observability.LLM_NAMESPACE` read attributes + directly off this payload. The OpenAI provider populates every + field; third-party providers populate the subset they support. + """ + + # Extra fields rejected at construction; instance frozen so + # observers can't mutate payload data after dispatch. + model_config = ConfigDict(frozen=True, extra="forbid") + + call_id: str + model: str + finish_reason: str | None = None + prompt_tokens: int | None = None + completion_tokens: int | None = None + total_tokens: int | None = None + # error_category is the canonical llm-provider §7 category + # (provider_unavailable, etc.) when the failed exception carried + # one — the provider caller doesn't have a graph-engine §4 + # RuntimeGraphError to attach to NodeEvent.error, so failure + # detail surfaces through these fields instead. + error_type: str | None = None + error_message: str | None = None + error_category: str | None = None + # Calling-node identity captured at dispatch time. The OTel + # observer reads these to look up the calling node's span in its + # invocation_id-scoped _open_spans map without relying on the + # OTel current-span context (which under concurrent fan-out can + # yield a sibling instance's span). + calling_namespace_prefix: tuple[str, ...] = () + calling_attempt_index: int = 0 + calling_fan_out_index: int | None = None + # Prompt-context snapshot captured at dispatch time. ``Any`` + # because the prompts package imports State indirectly; the typed + # shapes are PromptResult / PromptGroup from openarmature.prompts. + # Observers cast back at the read site. + active_prompt: Any = None + active_prompt_group: Any = None + # Payload + request-config carrier. input_messages is already + # image-redacted by the provider before reaching this struct; + # request_params carries only the gen_ai.request.* fields; + # request_extras carries the RuntimeConfig extras pass-through bag. + input_messages: list[dict[str, Any]] | None = None + output_content: str | None = None + request_params: dict[str, Any] | None = None + request_extras: dict[str, Any] | None = None + response_id: str | None = None + response_model: str | None = None + genai_system: str = "openai" + + +__all__ = ["LLM_NAMESPACE", "LlmEventPayload"] diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index dd6b4ae..ee459b0 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -74,11 +74,14 @@ from __future__ import annotations +import json +from collections.abc import Callable, Sequence from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, cast from opentelemetry import context as otel_context from opentelemetry import trace as otel_trace +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import SpanProcessor, TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator from opentelemetry.trace import ( @@ -93,7 +96,7 @@ ) from opentelemetry.trace.propagation import set_span_in_context -from openarmature.prompts.context import current_prompt_group, current_prompt_result +from openarmature.observability.llm_event import LLM_NAMESPACE, LlmEventPayload if TYPE_CHECKING: from openarmature.graph.events import NodeEvent @@ -105,9 +108,35 @@ _StackKey = tuple[tuple[str, ...], int, int | None] -# Sentinel namespace the LLM provider emits to signal "this is an LLM -# event, not a regular node event." -_LLM_NAMESPACE = ("openarmature.llm.complete",) +# Re-export the LLM-event namespace sentinel under the same name the +# observer module has historically used. The canonical home is +# ``openarmature.observability.llm_event`` (backend-agnostic — pulling +# the constant from there avoids forcing core-observability imports to +# load the OTel backend). ``_LLM_NAMESPACE`` is retained for backwards +# compatibility within this module. +_LLM_NAMESPACE = LLM_NAMESPACE + + +# §5.5.5 truncation marker. The leading character is U+2026 HORIZONTAL +# ELLIPSIS (3 bytes UTF-8); the marker is a fixed UTF-8 string and is +# appended as a whole unit so no boundary backtracking is needed past +# the prefix cut. +_TRUNCATION_MARKER_TEMPLATE = "…[truncated, {m} bytes total]" + + +# §5.5.5 minimum-cap rule: any payload byte cap configuration below +# 256 bytes is rejected at observer construction time. Rationale (spec +# verbatim): "256 bytes leaves room for the worst-case marker (~36 +# bytes) plus a diagnostically useful payload preview; caps below this +# would produce attributes that are almost entirely marker with little +# or no preview value." +_PAYLOAD_MIN_BYTES = 256 + + +# §5.5 default truncation cap — 64 KiB per the spec's preferred +# default. Implementations MAY configure; ours sits on a constructor +# field. +_PAYLOAD_DEFAULT_BYTES = 65536 def _read_spec_version() -> str: @@ -135,6 +164,53 @@ class _OpenSpan: span: Span +# Sorted object keys, no insignificant whitespace, UTF-8 output (per +# observability §5.5.1 / §5.5.6). Within-impl determinism for identical +# inputs is required; cross-impl bytewise stability is NOT required by +# v0.17.0 — conformance fixtures use parse-shape assertions, not +# bytewise equality. +def _serialize_for_attribute(value: Any) -> str: + """JSON-encode ``value`` for emission as an OTel string attribute.""" + return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + + +# §5.5.5 truncation algorithm: +# 1. Compute M, the pre-truncation byte length. +# 2. Format the marker with M substituted; compute L_marker. +# 3. Compute target prefix size N = cap_bytes - L_marker. +# 4. Backtrack to a UTF-8 code-point boundary ≤ N (avoid splitting +# multi-byte sequences — CJK, emoji, combining marks). +# 5. Emit first N' bytes + marker. +# The resulting string is at most cap_bytes UTF-8 bytes (may be strictly +# less due to step-4 backtracking). The marker leading char is U+2026 +# HORIZONTAL ELLIPSIS (3 bytes UTF-8); appended as a whole unit so no +# further boundary concerns beyond step 4. +def _truncate_for_attribute(serialized: str, cap_bytes: int) -> str: + """Truncate ``serialized`` to fit within ``cap_bytes`` UTF-8 bytes, + returning the original string unchanged if it already fits.""" + encoded = serialized.encode("utf-8") + full_length = len(encoded) + if full_length <= cap_bytes: + return serialized + marker = _TRUNCATION_MARKER_TEMPLATE.format(m=full_length) + marker_bytes = marker.encode("utf-8") + target = cap_bytes - len(marker_bytes) + if target <= 0: + # Cap is smaller than the marker itself — the __post_init__ + # validation guards against this (256-byte minimum allows a + # ~36-byte marker plus preview), but be defensive. + return marker + # UTF-8 lead-byte detection: a byte is a continuation byte when its + # top two bits are 10. Backtrack from ``target`` until we land on a + # lead byte (or hit 0). This is the cheapest correct way to find + # the largest code-point boundary ≤ target without round-tripping + # through ``str``. + boundary = target + while boundary > 0 and (encoded[boundary] & 0b1100_0000) == 0b1000_0000: + boundary -= 1 + return encoded[:boundary].decode("utf-8", errors="strict") + marker + + @dataclass class _InvState: """Per-invocation span state. One instance per concurrent @@ -181,30 +257,72 @@ class OTelObserver: Constructor knobs: - - ``detached_subgraphs``: set of subgraph wrapper node names - that should run in their own trace (§4.4). One detached trace - per such subgraph. - - ``detached_fan_outs``: set of fan-out node names whose - INSTANCES each get their own trace. One detached trace per - instance. - - ``disable_llm_spans``: when ``True`` the observer skips the - §5.5 LLM provider span. All other spans (node, subgraph, - fan-out, etc.) emit normally. Useful when an external - auto-instrumentation library (OpenInference, etc.) is the - canonical source of LLM spans. + - ``span_processor``: a single :class:`SpanProcessor` or a sequence + of them. Every processor is registered on the private + :class:`TracerProvider`; spans flow to each. + - ``resource``: optional :class:`Resource` passed to the private + :class:`TracerProvider`. Sets ``service.name`` / ``service.version`` + / etc. without relying on environment variables. + - ``detached_subgraphs``: set of subgraph wrapper node names that + run in their own trace. One detached trace per such subgraph. + - ``detached_fan_outs``: set of fan-out node names whose instances + each get their own trace. One detached trace per instance. + - ``disable_llm_spans``: when ``True`` the observer skips the LLM + provider span; all other spans emit normally. + - ``disable_llm_payload``: default ``True``. Gates the LLM input/ + output payload attributes (``openarmature.llm.input.messages``, + ``openarmature.llm.output.content``, + ``openarmature.llm.request.extras``). + - ``disable_genai_semconv``: default ``False``. Gates the + ``gen_ai.*`` attribute set on the LLM span. + - ``payload_max_bytes``: per-attribute byte cap for the LLM payload + attributes. Default 64 KiB; minimum 256 bytes (rejected at + construction time below that). + - ``attribute_enrichers``: optional sequence of callables run just + before the observer ends each span. Each receives the live + :class:`Span` plus the :class:`NodeEvent` that triggered the + close (or ``None`` on synthetic close sites). Exceptions are + caught and warned; never propagated. - ``spec_version``: string surfaced as ``openarmature.graph.spec_version`` on the invocation span. - Safe to share across concurrent invocations and across - resumes of the same correlation_id; every internal span map is - outer-keyed by ``invocation_id``, and parent resolution stays - within a single event handler's scope. + Safe to share across concurrent invocations and across resumes of + the same correlation_id; every internal span map is outer-keyed by + ``invocation_id``, and parent resolution stays within a single + event handler's scope. """ - span_processor: SpanProcessor + # span_processor accepts a single processor or a sequence per + # observability friction-roundup #5. The dataclass field type is + # the union; ``__post_init__`` normalizes to a tuple internally. + span_processor: SpanProcessor | Sequence[SpanProcessor] + # Optional Resource per friction-roundup #4. Default behavior + # (resource=None) falls through to OTel's default Resource (reads + # OTEL_SERVICE_NAME / OTEL_RESOURCE_ATTRIBUTES env vars at + # construction time). + resource: Resource | None = None detached_subgraphs: frozenset[str] = field(default_factory=_empty_str_frozenset) detached_fan_outs: frozenset[str] = field(default_factory=_empty_str_frozenset) disable_llm_spans: bool = False + # disable_llm_payload defaults to True per observability §5.5.4. + # Default-off because the payload may contain PII the user hasn't + # audited — opting in is a deliberate second choice. Naming inverts + # the natural reading ("default-off via True") to keep symmetry + # with the existing disable_llm_spans parameter family. + disable_llm_payload: bool = True + # disable_genai_semconv defaults to False (emit) per §5.5.4. The + # value proposition of installing the OTel observer is that + # LLM-aware backends (Langfuse, Phoenix, Honeycomb's LLM lens) + # render correctly out of the box, which keys off gen_ai.*. + disable_genai_semconv: bool = False + # Per-attribute byte cap for the §5.5.1 payload attributes. Default + # 64 KiB; minimum 256 bytes (§5.5.5), validated in __post_init__. + payload_max_bytes: int = _PAYLOAD_DEFAULT_BYTES + # attribute_enrichers per friction-roundup #7p2 — runs before every + # span.end() the observer issues. NodeEvent is None on synthetic + # close sites (subgraph dispatch, detached root, fan-out instance, + # invocation span, shutdown drain). + attribute_enrichers: Sequence[Callable[[Span, NodeEvent | None], None]] = () # Read from the package's ``__spec_version__`` (one of the three # places the spec version is pinned per CLAUDE.md). Bumping the # spec submodule + the two version fields automatically updates @@ -226,12 +344,61 @@ class OTelObserver: ) def __post_init__(self) -> None: + # §5.5.5 minimum-cap validation. Reject misconfigurations at + # construction time rather than emitting silently broken + # attributes. + if self.payload_max_bytes < _PAYLOAD_MIN_BYTES: + raise ValueError( + f"payload_max_bytes={self.payload_max_bytes} below the spec §5.5.5 " + f"minimum cap of {_PAYLOAD_MIN_BYTES} bytes" + ) # Private provider per spec §6 TracerProvider isolation — - # MUST NOT be registered globally. - self._provider = TracerProvider() - self._provider.add_span_processor(self.span_processor) + # MUST NOT be registered globally. Resource set on the + # provider when supplied; otherwise OTel's default Resource + # (which reads OTEL_SERVICE_NAME / OTEL_RESOURCE_ATTRIBUTES + # env vars at construction time) applies. + if self.resource is not None: + self._provider = TracerProvider(resource=self.resource) + else: + self._provider = TracerProvider() + # Multi-processor: a sequence registers every entry; a single + # processor wraps in a 1-tuple. ``SpanProcessor`` is itself a + # class so we can't use isinstance against ``Sequence`` first + # (Sequence matches strings too); compare against the explicit + # union arms. + if isinstance(self.span_processor, SpanProcessor): + processors: Sequence[SpanProcessor] = (self.span_processor,) + else: + processors = tuple(self.span_processor) + for proc in processors: + self._provider.add_span_processor(proc) self._tracer = self._provider.get_tracer("openarmature") + # ------------------------------------------------------------------ + # Enricher invocation (friction-roundup #7p2) + # ------------------------------------------------------------------ + + # Exception isolation mirrors the observer-error-isolation contract + # in ``openarmature.graph.observer`` — enricher raises are caught + + # warned, never propagated to the dispatch worker. + # ``event`` is None on synthetic close sites (subgraph dispatch, + # detached root, fan-out instance, invocation span, orphan drain). + def _run_enrichers(self, span: Span, event: NodeEvent | None) -> None: + """Invoke configured enrichers against ``span`` before + ``span.end()`` is called.""" + if not self.attribute_enrichers: + return + import warnings + + for enricher in self.attribute_enrichers: + try: + enricher(span, event) + except Exception as e: # noqa: BLE001 + warnings.warn( + f"attribute_enricher raised {type(e).__name__}: {e}", + stacklevel=2, + ) + # ------------------------------------------------------------------ # Per-invocation state lookup # ------------------------------------------------------------------ @@ -423,6 +590,7 @@ def _handle_completed(self, event: NodeEvent) -> None: inv_open.span.set_status(Status(StatusCode.ERROR, description=event.error.category)) else: span.set_status(Status(StatusCode.OK)) + self._run_enrichers(span, event) span.end() # If this was a detached root prefix, drop the root entry so a # subsequent re-entry mints a fresh trace. @@ -485,6 +653,7 @@ def _emit_checkpoint_migrate_span(self, event: NodeEvent) -> None: attributes=attrs, ) span.set_status(Status(StatusCode.OK)) + self._run_enrichers(span, event) span.end() def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: @@ -517,22 +686,38 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: attributes=attrs, ) span.set_status(Status(StatusCode.OK)) + self._run_enrichers(span, event) span.end() + # LLM provider span per observability §5.5 — parented to the + # calling node's span via the calling-node identity carried on + # the LlmEventPayload (namespace_prefix + attempt_index + + # fan_out_index). Lookup hits the per-invocation_id open_spans + # so concurrent fan-out instances each find their own calling + # node, not a sibling's. + # + # v0.17.0 attribute set (proposal 0024): + # - Baseline openarmature.llm.* attributes (preserved) + # - §5.5.1 payload (input.messages, output.content, + # request.extras) gated by disable_llm_payload + # - §5.5.2 gen_ai.request.* request params + # - §5.5.3 gen_ai.* response semconv set + # - §5.5.4 opt-out flags + # - §5.5.5 truncation contract on payload attributes + # + # Prompt-identity attributes come from the LlmEventPayload + # active_prompt / active_prompt_group snapshots taken at dispatch + # time — NOT the ContextVar. The dispatch worker's task-local + # Context doesn't see node-body ContextVar writes. def _handle_llm_event(self, event: NodeEvent) -> None: - """LLM provider span per spec §5.5 — parented to the calling - node's span via the calling-node identity carried on the - ``_LlmEventState`` payload (namespace_prefix + attempt_index - + fan_out_index). Lookup hits the per-invocation_id - ``open_spans`` so concurrent fan-out instances each find - their own calling node, not a sibling's.""" - from openarmature.llm.providers.openai import _LlmEventState + """Build and close the ``openarmature.llm.complete`` span for an + LLM provider event pair.""" from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, ) - if not isinstance(event.pre_state, _LlmEventState): + if not isinstance(event.pre_state, LlmEventPayload): # Defensive — callers other than the OpenAIProvider hook # shouldn't dispatch through the LLM_NAMESPACE sentinel. return @@ -547,19 +732,53 @@ def _handle_llm_event(self, event: NodeEvent) -> None: cid = current_correlation_id() if cid is not None: attrs["openarmature.correlation_id"] = cid - # Per prompt-management spec §11, surface prompt identity - # on the LLM-call span when the call fired inside a - # with_active_prompt / with_active_prompt_group context. - active_prompt = current_prompt_result() + # Prompt-identity attributes: sourced from the dispatch- + # time snapshot on the payload. Reading the ContextVar + # here would return None because the dispatch worker + # task's Context was snapshotted at ``invoke()`` entry, + # before any node body opened a ``with_active_prompt`` + # block. + active_prompt = payload.active_prompt if active_prompt is not None: attrs["openarmature.prompt.name"] = active_prompt.name attrs["openarmature.prompt.version"] = active_prompt.version attrs["openarmature.prompt.label"] = active_prompt.label attrs["openarmature.prompt.template_hash"] = active_prompt.template_hash attrs["openarmature.prompt.rendered_hash"] = active_prompt.rendered_hash - active_group = current_prompt_group() + active_group = payload.active_prompt_group if active_group is not None: attrs["openarmature.prompt.group_name"] = active_group.group_name + # §5.5.2 + §5.5.3 GenAI semconv attributes (gated by + # ``disable_genai_semconv``). Emit gen_ai.system, + # gen_ai.request.model (mirrors openarmature.llm.model), + # and per-set gen_ai.request.* params (only fields the + # caller supplied — absence is meaningful). + if not self.disable_genai_semconv: + attrs["gen_ai.system"] = payload.genai_system + attrs["gen_ai.request.model"] = payload.model + request_params = payload.request_params or {} + if "temperature" in request_params: + attrs["gen_ai.request.temperature"] = request_params["temperature"] + if "max_tokens" in request_params: + attrs["gen_ai.request.max_tokens"] = request_params["max_tokens"] + if "top_p" in request_params: + attrs["gen_ai.request.top_p"] = request_params["top_p"] + if "seed" in request_params: + attrs["gen_ai.request.seed"] = request_params["seed"] + # §5.5.1 payload attributes (gated by ``disable_llm_payload``). + # ``input.messages`` and ``request.extras`` go on the started + # span; ``output.content`` lands on the completed branch. + if not self.disable_llm_payload: + if payload.input_messages: + serialized = _serialize_for_attribute(payload.input_messages) + attrs["openarmature.llm.input.messages"] = _truncate_for_attribute( + serialized, self.payload_max_bytes + ) + if payload.request_extras: + serialized_extras = _serialize_for_attribute(payload.request_extras) + attrs["openarmature.llm.request.extras"] = _truncate_for_attribute( + serialized_extras, self.payload_max_bytes + ) span = self._tracer.start_span( name="openarmature.llm.complete", context=cast("Any", parent_ctx), @@ -572,6 +791,7 @@ def _handle_llm_event(self, event: NodeEvent) -> None: if open_span is None: return span = open_span.span + # Baseline §5.5 attributes (preserved from v0.7.0). if payload.finish_reason is not None: span.set_attribute("openarmature.llm.finish_reason", payload.finish_reason) if payload.prompt_tokens is not None: @@ -580,6 +800,30 @@ def _handle_llm_event(self, event: NodeEvent) -> None: span.set_attribute("openarmature.llm.usage.completion_tokens", payload.completion_tokens) if payload.total_tokens is not None: span.set_attribute("openarmature.llm.usage.total_tokens", payload.total_tokens) + # §5.5.3 GenAI semconv response attributes (gated by + # ``disable_genai_semconv``). Tokens mirror the baseline + # OA-prefixed usage attributes; finish_reasons wraps the + # scalar in a single-element array per semconv; + # response.{id,model} emit only when the provider + # returned non-null values. + if not self.disable_genai_semconv: + if payload.prompt_tokens is not None: + span.set_attribute("gen_ai.usage.input_tokens", payload.prompt_tokens) + if payload.completion_tokens is not None: + span.set_attribute("gen_ai.usage.output_tokens", payload.completion_tokens) + if payload.finish_reason is not None: + span.set_attribute("gen_ai.response.finish_reasons", [payload.finish_reason]) + if payload.response_id is not None: + span.set_attribute("gen_ai.response.id", payload.response_id) + if payload.response_model is not None: + span.set_attribute("gen_ai.response.model", payload.response_model) + # §5.5.1 output payload. Assistant messages with empty + # content (tool-call-only responses) MUST NOT emit this + # attribute per spec — ``output_content`` on the payload + # is already None in that case (see provider.py). + if not self.disable_llm_payload and payload.output_content: + attrs_out = _truncate_for_attribute(payload.output_content, self.payload_max_bytes) + span.set_attribute("openarmature.llm.output.content", attrs_out) if payload.error_type is not None: span.set_status( Status( @@ -591,6 +835,7 @@ def _handle_llm_event(self, event: NodeEvent) -> None: span.set_attribute("openarmature.error.category", payload.error_category) else: span.set_status(Status(StatusCode.OK)) + self._run_enrichers(span, event) span.end() def _resolve_llm_parent( @@ -849,6 +1094,7 @@ def _close_subgraph_span(self, inv_state: _InvState, prefix: tuple[str, ...]) -> if open_span is None: return open_span.span.set_status(Status(StatusCode.OK)) + self._run_enrichers(open_span.span, None) open_span.span.end() def _open_detached_subgraph_root( @@ -1024,6 +1270,7 @@ def _close_fan_out_instance_dispatch_span(self, inv_state: _InvState, key: tuple if open_span is None: return open_span.span.set_status(Status(StatusCode.OK)) + self._run_enrichers(open_span.span, None) open_span.span.end() def _close_detached_root(self, inv_state: _InvState, prefix: tuple[str, ...]) -> None: @@ -1032,14 +1279,16 @@ def _close_detached_root(self, inv_state: _InvState, prefix: tuple[str, ...]) -> if open_span is None: return open_span.span.set_status(Status(StatusCode.OK)) + self._run_enrichers(open_span.span, None) open_span.span.end() - @staticmethod - def _drain_open_span(open_span: _OpenSpan) -> None: + def _drain_open_span(self, open_span: _OpenSpan) -> None: """Close an open span as an orphan during shutdown: OK status, end. No paired completed event will arrive, so we - don't have an error category to record.""" + don't have an error category to record. Enrichers run with + ``event=None`` — they can no-op when event context matters.""" open_span.span.set_status(Status(StatusCode.OK)) + self._run_enrichers(open_span.span, None) open_span.span.end() def _find_fan_out_node_span(self, inv_state: _InvState, prefix: tuple[str, ...]) -> _OpenSpan | None: @@ -1182,14 +1431,29 @@ def _close_invocation_span(self, invocation_id: str) -> None: # exporters map UNSET to OK by convention, and the explicit # ERROR-set in ``_handle_completed`` handles the failure # path. + self._run_enrichers(open_span.span, None) open_span.span.end() def shutdown(self) -> None: - """Close any still-open spans across all in-flight - invocations and shut down the underlying provider. Each - per-invocation state is drained in child→parent order (LLM - spans → leaf spans → detached roots → subgraph dispatch); - invocation spans drain last. Idempotent.""" + """Close any still-open spans across all in-flight invocations + and shut down the underlying provider. Each per-invocation + state is drained in child→parent order (LLM spans → leaf spans + → detached roots → subgraph dispatch); invocation spans drain + last. Idempotent. + + **BatchSpanProcessor flush note.** ``self._provider.shutdown()`` + flushes every registered processor. Under fast or unusual + teardown orderings (e.g., FastAPI ``TestClient`` teardown that + closes the event loop before the BatchSpanProcessor's export + thread finishes), the flush may not complete in time and spans + can appear dropped. Workarounds: + + - Call ``observer._provider.force_flush(timeout_millis=…)`` + explicitly before this method. + - Use :class:`SimpleSpanProcessor` instead of + :class:`BatchSpanProcessor` in tests; it exports synchronously + and is unaffected by teardown timing. + """ for invocation_id in list(self._inv_states.keys()): inv_state = self._inv_states.pop(invocation_id) self._drain_inv_state(inv_state) diff --git a/tests/conformance/harness/directives.py b/tests/conformance/harness/directives.py index e7e58e6..09dfd78 100644 --- a/tests/conformance/harness/directives.py +++ b/tests/conformance/harness/directives.py @@ -262,13 +262,36 @@ class ParallelBranchesSpec(_AllowExtras): errors_field: str | None = None +class RuntimeConfigSpec(_AllowExtras): + """``calls_llm.config`` block — mirrors ``RuntimeConfig`` (llm-provider + §6). Used by observability fixtures 016-018 (request-parameter and + extras emission) and by the GenAI semconv set. + + Each field maps one-to-one to ``openarmature.llm.response.RuntimeConfig`` + on the source side. ``extras`` is the ``extra="allow"`` pass-through + bag for provider-specific parameters (frequency_penalty, etc.). + """ + + temperature: float | None = None + max_tokens: int | None = None + top_p: float | None = None + seed: int | None = None + extras: dict[str, Any] | None = None + + class CallsLlmSpec(_AllowExtras): """LLM-using node: sends ``messages`` to the harness's mock provider and stores the response (assistant content) in ``stores_response_in``. - Used by observability fixtures to verify LLM-provider span emission.""" + Used by observability fixtures to verify LLM-provider span emission. + + ``config`` (proposal 0024, fixtures 016-018) carries the optional + ``RuntimeConfig`` field set for the call — temperature, max_tokens, + top_p, seed, and a provider-specific ``extras`` bag. + """ messages: list[dict[str, Any]] stores_response_in: str + config: RuntimeConfigSpec | None = None class EmitsLogSpec(_AllowExtras): @@ -555,6 +578,7 @@ class LlmCallSpec(_AllowExtras): "NodeSpec", "ObserverSpec", "RetryMiddleware", + "RuntimeConfigSpec", "ShortCircuitMiddleware", "StateFieldSpec", "StateSchema", diff --git a/tests/conformance/harness/fixtures.py b/tests/conformance/harness/fixtures.py index d76bcd5..71010c0 100644 --- a/tests/conformance/harness/fixtures.py +++ b/tests/conformance/harness/fixtures.py @@ -225,6 +225,19 @@ class GraphFixture(_ForbidExtras): detached_subgraphs: list[str] | None = None detached_fan_outs: list[str] | None = None disable_llm_spans: bool | None = None + # Proposal 0024 (v0.17.0): observer-level opt-outs for the new + # §5.5.1 payload and §5.5.2/§5.5.3 GenAI semconv attribute sets. + # ``disable_llm_payload`` defaults to True per §5.5.4 — fixtures + # that EXERCISE payload emission set it false explicitly (013-018). + # ``disable_genai_semconv`` defaults to False — fixture 021 sets + # it true to verify the opt-out. + disable_llm_payload: bool | None = None + disable_genai_semconv: bool | None = None + # Proposal 0024 (v0.17.0, fixture 020): provider-level configuration + # overrides — ``provider.genai_system`` overrides the default + # ``"openai"`` value of ``gen_ai.system`` for OpenAI-compatible + # providers serving non-OpenAI endpoints (vLLM, LM Studio, …). + provider: dict[str, Any] | None = None mock_llm: list[MockResponse] | None = None caller_global_otel_active: bool | None = None invocations: int | None = None diff --git a/tests/conformance/harness/llm_attribute_assertions.py b/tests/conformance/harness/llm_attribute_assertions.py new file mode 100644 index 0000000..6e76982 --- /dev/null +++ b/tests/conformance/harness/llm_attribute_assertions.py @@ -0,0 +1,188 @@ +# Assertion helpers for the v0.17.0 LLM span attribute fixtures +# (012-021). Check the attribute shapes mandated by observability +# §5.5.1-§5.5.5 against the OTel spans the fixture run produced. +# Deliberately small + pure so fixture drivers can assemble their own +# assertions from this toolkit. +# +# Supported assertion-key shapes inside a span entry's expected block: +# attributes_absent: [name, …] +# attribute_parses_as_messages: {attr: expected_message_list} +# attribute_parses_as_object: {attr: expected_object} +# attribute_does_not_contain: {attr: {forbidden_substring_kind: …}} +# attribute_truncation: {attr: {max_bytes, marker_pattern, +# utf8_valid, +# prefix_of_full_serialization}} +"""Assertion helpers for the v0.17.0 LLM span attribute fixtures.""" + +from __future__ import annotations + +import json +import re +from typing import Any + +# Records the deterministic base64 prefixes emitted by the +# ``base64_data_synthetic`` directive across a fixture run. The +# ``attribute_does_not_contain`` assertion (with +# ``forbidden_substring_kind=synthetic_base64_prefix``) reads from here +# to know what substring to verify is absent from the emitted attribute. +# Reset per fixture run. +SYNTHESIZED_BASE64_PREFIXES: list[str] = [] + + +def reset_synthesized_base64_prefixes() -> None: + """Clear the synthesized-base64 record. Call at the start of each + fixture run.""" + SYNTHESIZED_BASE64_PREFIXES.clear() + + +def record_synthesized_base64_prefix(prefix: str) -> None: + """Record a base64 prefix the harness emitted via the + ``base64_data_synthetic`` directive. The + ``attribute_does_not_contain`` assertion uses these to verify + image bytes don't leak into emitted attributes.""" + SYNTHESIZED_BASE64_PREFIXES.append(prefix) + + +def assert_attributes_absent(attrs: dict[str, Any], absent: list[str]) -> None: + """Assert no name in ``absent`` appears in ``attrs``.""" + for name in absent: + assert name not in attrs, f"attribute {name!r} MUST NOT be present; found value {attrs[name]!r}" + + +# Used by fixtures 013 (payload enabled) and 015 (inline-image +# redaction) to assert the parsed message structure without depending +# on bytewise JSON output. +def assert_attribute_parses_as_messages( + attrs: dict[str, Any], + expected_by_attr: dict[str, list[dict[str, Any]]], +) -> None: + """Assert each ``attrs[name]`` is a JSON-encoded message array + whose parse equals the expected list.""" + for attr_name, expected_messages in expected_by_attr.items(): + raw = attrs.get(attr_name) + assert isinstance(raw, str), ( + f"attribute {attr_name!r} MUST be present as a string; got {type(raw).__name__}" + ) + parsed = json.loads(raw) + assert parsed == expected_messages, ( + f"attribute {attr_name!r} parsed-shape mismatch.\n" + f"expected: {expected_messages!r}\ngot: {parsed!r}" + ) + + +# Used by fixture 018 (request.extras). +def assert_attribute_parses_as_object( + attrs: dict[str, Any], + expected_by_attr: dict[str, dict[str, Any]], +) -> None: + """Assert each ``attrs[name]`` is a JSON-encoded object whose parse + equals the expected dict.""" + for attr_name, expected_obj in expected_by_attr.items(): + raw = attrs.get(attr_name) + assert isinstance(raw, str), ( + f"attribute {attr_name!r} MUST be present as a string; got {type(raw).__name__}" + ) + parsed = json.loads(raw) + assert parsed == expected_obj, ( + f"attribute {attr_name!r} parsed-shape mismatch.\nexpected: {expected_obj!r}\ngot: {parsed!r}" + ) + + +# The only ``forbidden_substring_kind`` currently defined is +# ``synthetic_base64_prefix`` — used by fixture 015 to verify inline +# image bytes don't leak through redaction. The kind names the most- +# recently synthesized base64 prefix recorded by the harness when it +# processed a ``base64_data_synthetic`` directive. +def assert_attribute_does_not_contain( + attrs: dict[str, Any], + forbidden_by_attr: dict[str, dict[str, Any]], +) -> None: + """Assert each ``attrs[name]`` does not contain the forbidden + substring identified by ``forbidden_substring_kind``.""" + for attr_name, spec in forbidden_by_attr.items(): + kind = spec.get("forbidden_substring_kind") + if kind != "synthetic_base64_prefix": + raise AssertionError(f"unknown forbidden_substring_kind: {kind!r}") + if not SYNTHESIZED_BASE64_PREFIXES: + raise AssertionError( + "synthetic_base64_prefix assertion requested but no base64 was " + "synthesized during this fixture run" + ) + raw = attrs.get(attr_name) + assert isinstance(raw, str), ( + f"attribute {attr_name!r} MUST be present as a string; got {type(raw).__name__}" + ) + for forbidden in SYNTHESIZED_BASE64_PREFIXES: + # 64-char prefix is plenty of signal that bytes leaked; the + # full blob may have been truncated by the cap so substring + # at the front is the realistic check. + probe = forbidden[: min(64, len(forbidden))] + assert probe not in raw, ( + f"attribute {attr_name!r} contains forbidden synthesized base64 " + f"prefix (first 64 chars present); image bytes leaked through redaction" + ) + + +# truncation_by_attr entries carry: +# max_bytes: configured cap (emitted byte length must be ≤ this) +# marker_pattern: regex the attribute ends with (captures the +# "…[truncated, M bytes total]" shape) +# utf8_valid: when True, emitted attribute decodes as valid UTF-8 +# (catches mid-multi-byte-sequence cuts) +# prefix_of_full_serialization: when True, the bytes preceding the +# marker are a literal prefix of the full pre-truncation +# serialization (supplied via full_serialization_by_attr[name]) +def assert_attribute_truncation( + attrs: dict[str, Any], + truncation_by_attr: dict[str, dict[str, Any]], + full_serialization_by_attr: dict[str, str] | None = None, +) -> None: + """Verify the truncation contract on a payload attribute.""" + for attr_name, spec in truncation_by_attr.items(): + raw = attrs.get(attr_name) + assert isinstance(raw, str), ( + f"attribute {attr_name!r} MUST be present as a string; got {type(raw).__name__}" + ) + encoded = raw.encode("utf-8") + max_bytes = int(spec["max_bytes"]) + assert len(encoded) <= max_bytes, ( + f"attribute {attr_name!r} byte length {len(encoded)} exceeds configured cap {max_bytes}" + ) + marker_pattern = str(spec["marker_pattern"]) + match = re.search(marker_pattern, raw) + assert match is not None, ( + f"attribute {attr_name!r} MUST end with marker matching /{marker_pattern}/; " + f"got suffix {raw[-80:]!r}" + ) + if bool(spec.get("utf8_valid", True)): + try: + encoded.decode("utf-8") + except UnicodeDecodeError as e: + raise AssertionError( + f"attribute {attr_name!r} contains invalid UTF-8 — " + f"truncation may have split a code point: {e}" + ) from e + if bool(spec.get("prefix_of_full_serialization", False)): + if full_serialization_by_attr is None or attr_name not in full_serialization_by_attr: + raise AssertionError( + f"prefix_of_full_serialization=True requires the harness to supply " + f"the full pre-truncation serialization for {attr_name!r}" + ) + full = full_serialization_by_attr[attr_name] + preceding = raw[: match.start()] + assert full.startswith(preceding), ( + f"attribute {attr_name!r} prefix {preceding[:60]!r}… is not a prefix of " + f"the full serialization {full[:60]!r}…" + ) + + +__all__ = [ + "SYNTHESIZED_BASE64_PREFIXES", + "assert_attribute_does_not_contain", + "assert_attribute_parses_as_messages", + "assert_attribute_parses_as_object", + "assert_attribute_truncation", + "assert_attributes_absent", + "record_synthesized_base64_prefix", + "reset_synthesized_base64_prefixes", +] diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 13539d1..25b7ee6 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -51,6 +51,29 @@ from .adapter import build_graph # noqa: E402 + +# OTel SDK 1.x makes ``set_tracer_provider`` one-shot: once a non-default +# provider is set, subsequent calls are no-ops (the SDK logs a warning +# and returns). The set is guarded by a ``Once`` primitive at +# ``opentelemetry.trace._TRACER_PROVIDER_SET_ONCE``, not just by the +# value of ``_TRACER_PROVIDER``. Restoring via the public API silently +# fails after a prior set, leaking the test's global provider into +# subsequent tests that also touch the OTel global. This helper resets +# BOTH the value and the Once via the SDK's private API so a sibling +# test running after this one starts from a clean global state. +def _reset_otel_global_tracer_provider(restore_to: object) -> None: + from opentelemetry import trace as otel_trace + + once = otel_trace._TRACER_PROVIDER_SET_ONCE # type: ignore[attr-defined] + with once._lock: # pyright: ignore[reportPrivateUsage] + if isinstance(restore_to, otel_trace.ProxyTracerProvider): + otel_trace._TRACER_PROVIDER = None # type: ignore[attr-defined] + once._done = False # pyright: ignore[reportPrivateUsage] + else: + otel_trace._TRACER_PROVIDER = restore_to # type: ignore[attr-defined] + once._done = True # pyright: ignore[reportPrivateUsage] + + CONFORMANCE_DIR = ( Path(__file__).resolve().parents[2] / "openarmature-spec" / "spec" / "observability" / "conformance" ) @@ -69,6 +92,17 @@ "009-otel-correlation-id-cross-cutting", "010-otel-log-correlation", "011-otel-determinism", + # v0.17.0 — proposal 0024 (friction-roundup #1, #2, #6). + "012-otel-llm-payload-default-off", + "013-otel-llm-payload-enabled", + "014-otel-llm-payload-truncation", + "015-otel-llm-payload-image-redaction", + "016-otel-llm-request-params", + "017-otel-llm-request-params-partial", + "018-otel-llm-request-extras", + "019-otel-llm-genai-semconv", + "020-otel-llm-genai-system-override", + "021-otel-llm-disable-genai-semconv", } ) @@ -132,6 +166,19 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_010(spec) elif fixture_id == "011-otel-determinism": await _run_fixture_011(spec) + elif fixture_id in { + "012-otel-llm-payload-default-off", + "013-otel-llm-payload-enabled", + "014-otel-llm-payload-truncation", + "015-otel-llm-payload-image-redaction", + "016-otel-llm-request-params", + "017-otel-llm-request-params-partial", + "018-otel-llm-request-extras", + "019-otel-llm-genai-semconv", + "020-otel-llm-genai-system-override", + "021-otel-llm-disable-genai-semconv", + }: + await _run_llm_payload_fixture(spec) else: raise AssertionError(f"no driver for supported fixture {fixture_id!r}") @@ -1002,7 +1049,11 @@ async def _run_fixture_005_case(case: Mapping[str, Any]) -> None: ) finally: if caller_global_active and prior_global is not None: - otel_trace.set_tracer_provider(prior_global) + # OTel SDK 1.x makes set_tracer_provider one-shot: once a + # non-default provider is set, subsequent calls are no-ops. + # Restore by resetting the private Once + state directly so + # the global doesn't leak into subsequent tests. + _reset_otel_global_tracer_provider(prior_global) # Common assertions: the LLM span presence/absence + (when # present) attributes + parent-child to the calling node. @@ -1734,3 +1785,351 @@ def _build_observer_with_detached(detached_subgraphs: frozenset[str]) -> tuple[O detached_subgraphs=detached_subgraphs, ) return observer, exporter + + +# --------------------------------------------------------------------------- +# v0.17.0 LLM-payload + GenAI-semconv fixtures (012-021) +# --------------------------------------------------------------------------- + + +async def _run_llm_payload_fixture(spec: Mapping[str, Any]) -> None: + """Generic driver for the ten v0.17.0 LLM-attribute fixtures. + + Each fixture is single-case (GraphFixture shape) with a top-level + ``cases:`` list of one entry; the case carries the graph + the + ``calls_llm`` config + the optional observer/provider flags. + """ + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + try: + await _run_llm_payload_case(case) + except AssertionError as e: + raise AssertionError(f"case {case.get('name')!r}: {e}") from e + + +async def _run_llm_payload_case(case: Mapping[str, Any]) -> None: + """Build + invoke the graph, then walk the expected span tree + asserting via the LLM-attribute helpers (parse-shape, truncation, + redaction-substring-absence).""" + import json + from collections.abc import Sequence + + import httpx + from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: PLC0415 + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: PLC0415 + InMemorySpanExporter, + ) + + from openarmature.graph import END, GraphBuilder + from openarmature.llm import OpenAIProvider + from openarmature.llm.response import RuntimeConfig + + from .adapter import build_state_cls + from .harness.llm_attribute_assertions import ( + assert_attribute_does_not_contain, + assert_attribute_parses_as_messages, + assert_attribute_parses_as_object, + assert_attribute_truncation, + assert_attributes_absent, + record_synthesized_base64_prefix, + reset_synthesized_base64_prefixes, + ) + + reset_synthesized_base64_prefixes() + + # ---- Resolve harness primitives (content_repeat, base64_data_synthetic) + nodes_spec = cast("dict[str, Any]", case["nodes"]) + entry_name = cast("str", case["entry"]) + calls_llm_spec = cast("dict[str, Any]", nodes_spec[entry_name]["calls_llm"]) + raw_messages = cast("list[dict[str, Any]]", calls_llm_spec.get("messages", [])) + materialized_messages, full_input_serialization = _materialize_messages( + raw_messages, + record_base64_prefix=record_synthesized_base64_prefix, + ) + + # ---- RuntimeConfig from the calls_llm.config block + config_spec = cast("dict[str, Any] | None", calls_llm_spec.get("config")) + runtime_config: RuntimeConfig | None = None + if config_spec: + extras = cast("dict[str, Any]", config_spec.get("extras") or {}) + runtime_config_kwargs: dict[str, Any] = { + k: v for k, v in config_spec.items() if k in {"temperature", "max_tokens", "top_p", "seed"} + } + runtime_config_kwargs.update(extras) + runtime_config = RuntimeConfig(**runtime_config_kwargs) + + # ---- Provider knobs (provider.genai_system override) + provider_spec = cast("dict[str, Any] | None", case.get("provider")) + genai_system = "openai" + if provider_spec and isinstance(provider_spec.get("genai_system"), str): + genai_system = cast("str", provider_spec["genai_system"]) + + # ---- Mock LLM transport + mock_responses = list(cast("list[dict[str, Any]]", case.get("mock_llm") or [])) + + def _handler(_request: httpx.Request) -> httpx.Response: + if not mock_responses: + raise AssertionError("mock_llm queue exhausted") + spec_resp = mock_responses.pop(0) + body = cast("dict[str, Any]", spec_resp.get("body") or {}) + return httpx.Response( + int(spec_resp.get("status", 200)), + content=json.dumps(body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + + provider = OpenAIProvider( + base_url="http://mock-llm.test", + model="test-model", + api_key="test", + transport=httpx.MockTransport(_handler), + genai_system=genai_system, + ) + + # ---- State + node body + state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"]) + state_cls = build_state_cls("LlmPayloadFixtureState", state_fields) + stores_in = cast("str", calls_llm_spec.get("stores_response_in", "msg")) + + async def ask_llm_body(_s: Any) -> dict[str, str]: + response = await provider.complete( + cast("Sequence[Any]", materialized_messages), + config=runtime_config, + ) + return {stores_in: response.message.content or ""} + + builder = ( + GraphBuilder(state_cls) + .add_node(entry_name, ask_llm_body) + .add_edge(entry_name, END) + .set_entry(entry_name) + ) + graph = builder.compile() + + # ---- Observer + exporter = InMemorySpanExporter() + observer_kwargs: dict[str, Any] = {"span_processor": SimpleSpanProcessor(exporter)} + if "disable_llm_payload" in case: + observer_kwargs["disable_llm_payload"] = bool(case["disable_llm_payload"]) + if "disable_genai_semconv" in case: + observer_kwargs["disable_genai_semconv"] = bool(case["disable_genai_semconv"]) + if "disable_llm_spans" in case: + observer_kwargs["disable_llm_spans"] = bool(case["disable_llm_spans"]) + observer = OTelObserver(**observer_kwargs) + graph.attach_observer(observer) + + # ---- Run + collect spans + initial_state_cls = graph.state_cls + await graph.invoke(initial_state_cls()) + await graph.drain() + observer.shutdown() + spans = exporter.get_finished_spans() + + # ---- Walk expected.span_tree and check per-span assertions + expected = cast("dict[str, Any]", case["expected"]) + expected_tree = cast("list[dict[str, Any]]", expected.get("span_tree") or []) + _check_payload_span_tree( + spans, + expected_tree, + full_input_serialization=full_input_serialization, + assert_attributes_absent=assert_attributes_absent, + assert_attribute_parses_as_messages=assert_attribute_parses_as_messages, + assert_attribute_parses_as_object=assert_attribute_parses_as_object, + assert_attribute_does_not_contain=assert_attribute_does_not_contain, + assert_attribute_truncation=assert_attribute_truncation, + ) + + +def _materialize_messages( + raw_messages: list[dict[str, Any]], + *, + record_base64_prefix: Any, +) -> tuple[list[Any], str | None]: + """Resolve harness directives (``content_repeat``, + ``base64_data_synthetic``) into real ``Message`` instances. + + Returns the message list AND the canonical full-serialization + string for the materialized payload — the truncation fixture + needs the latter for its ``prefix_of_full_serialization`` check. + """ + from openarmature.llm.messages import UserMessage + + out: list[Any] = [] + full_serial_target: str | None = None + for msg in raw_messages: + role = msg.get("role") + # ``content_repeat`` may live at the message level (fixture 014: + # ``{role: user, content_repeat: {char, bytes}}``) — no ``content`` + # key in that case; synthesize a string of N repeated chars. + content: Any + if "content_repeat" in msg: + repeat = cast("dict[str, Any]", msg["content_repeat"]) + content = cast("str", repeat["char"]) * int(repeat["bytes"]) + else: + content = msg.get("content") + if role == "user": + materialized = _materialize_user_content( + content, + record_base64_prefix=record_base64_prefix, + ) + out.append(UserMessage(content=materialized)) + elif role == "system": + from openarmature.llm.messages import SystemMessage + + out.append(SystemMessage(content=cast("str", content))) + else: + raise AssertionError(f"unsupported role in payload fixture: {role!r}") + + # Compute the full serialization (what the observer would emit + # before truncation). The provider's _serialize_messages_for_payload + # is the canonical encoder; mirror its shape via the same import. + from openarmature.llm.providers.openai import _serialize_messages_for_payload + + plain = _serialize_messages_for_payload(out) + import json + + full_serial_target = json.dumps(plain, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + return out, full_serial_target + + +def _materialize_user_content(content: Any, *, record_base64_prefix: Any) -> Any: + """Resolve the user message's content. Strings pass through; lists + of blocks materialize the harness directives in each block. + + ``content_repeat: {char, bytes}`` on a string-only message synthesizes + a repeated-character string of N bytes. ``base64_data_synthetic: + {bytes}`` on an inline image source synthesizes a deterministic + base64 blob; the prefix is recorded via the supplied callable so + the ``attribute_does_not_contain`` assertion can verify absence. + """ + from openarmature.llm.messages import ( + ImageBlock, + ImageSourceInline, + ImageSourceURL, + TextBlock, + ) + + # Compact form: ``content`` is a dict with ``content_repeat`` — + # synthesize a string of N repeated chars. + if isinstance(content, dict) and "content_repeat" in content: + repeat = cast("dict[str, Any]", content["content_repeat"]) + char = cast("str", repeat["char"]) + nbytes = int(repeat["bytes"]) + return char * nbytes + if isinstance(content, str): + return content + # List of content blocks. + blocks: list[Any] = [] + for block in cast("list[dict[str, Any]]", content): + btype = block.get("type") + if btype == "text": + blocks.append(TextBlock(text=cast("str", block["text"]))) + elif btype == "image": + source_spec = cast("dict[str, Any]", block["source"]) + stype = source_spec.get("type") + if stype == "inline": + synth = cast("dict[str, Any] | None", source_spec.get("base64_data_synthetic")) + if synth is not None: + nbytes = int(synth["bytes"]) + blob = _synth_base64(nbytes) + record_base64_prefix(blob) + source = ImageSourceInline(base64_data=blob) + else: + source = ImageSourceInline(base64_data=cast("str", source_spec["base64_data"])) + elif stype == "url": + source = ImageSourceURL(url=cast("str", source_spec["url"])) + else: + raise AssertionError(f"unsupported image source type: {stype!r}") + blocks.append( + ImageBlock( + source=source, + media_type=cast("str | None", block.get("media_type")), + detail=cast("Any", block.get("detail")), + ) + ) + else: + raise AssertionError(f"unsupported content block type: {btype!r}") + # Compact form: a single ``content_repeat`` entry inside a list. + return blocks + + +def _synth_base64(nbytes: int) -> str: + """Synthesize a deterministic base64 blob of exactly ``nbytes`` bytes. + + Fixture 015 uses 4096 bytes; deterministic so the synthesized prefix + can be recorded once and the ``attribute_does_not_contain`` helper + verifies the same prefix is absent from the redacted attribute. + """ + # Repeated-letter base64 — valid base64 chars, deterministic, length + # exactly nbytes. + alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" + # Use a single character so the prefix-check signal is strong; the + # bytes are not a real PNG (the redaction rule is about SHAPE). + return alphabet[0] * nbytes + + +def _check_payload_span_tree( + spans: Any, + expected_tree: list[dict[str, Any]], + *, + full_input_serialization: str | None, + assert_attributes_absent: Any, + assert_attribute_parses_as_messages: Any, + assert_attribute_parses_as_object: Any, + assert_attribute_does_not_contain: Any, + assert_attribute_truncation: Any, +) -> None: + """Walk ``expected_tree`` and verify each expected span's attribute + block matches the spans in ``spans``.""" + spans_by_name: dict[str, list[Any]] = {} + for s in spans: + spans_by_name.setdefault(s.name, []).append(s) + + def _walk(expected_entries: list[dict[str, Any]]) -> None: + for entry in expected_entries: + name = cast("str", entry["name"]) + candidates = spans_by_name.get(name, []) + assert candidates, f"expected a span named {name!r}; got {sorted(spans_by_name.keys())}" + # The fixtures we cover have unique span names in each tree. + span = candidates[0] + attrs = dict(span.attributes or {}) + # ``attributes:`` block — exact match per key. + for k, v in cast("dict[str, Any]", entry.get("attributes") or {}).items(): + actual: Any = attrs.get(k) + # OTel attribute arrays come back as tuples; normalize. + if isinstance(v, list) and isinstance(actual, tuple): + actual = list(cast("tuple[Any, ...]", actual)) + assert actual == v, f"span {name!r} attribute {k!r} mismatch: expected {v!r}, got {actual!r}" + # ``attributes_absent:`` list of names that MUST NOT appear. + absent = entry.get("attributes_absent") + if absent: + assert_attributes_absent(attrs, cast("list[str]", absent)) + # ``attribute_parses_as_messages:`` shape assertion. + parses_as_messages = entry.get("attribute_parses_as_messages") + if parses_as_messages: + assert_attribute_parses_as_messages(attrs, cast("dict[str, Any]", parses_as_messages)) + # ``attribute_parses_as_object:`` shape assertion. + parses_as_object = entry.get("attribute_parses_as_object") + if parses_as_object: + assert_attribute_parses_as_object(attrs, cast("dict[str, Any]", parses_as_object)) + # ``attribute_does_not_contain:`` substring absence. + does_not_contain = entry.get("attribute_does_not_contain") + if does_not_contain: + assert_attribute_does_not_contain(attrs, cast("dict[str, Any]", does_not_contain)) + # ``attribute_truncation:`` §5.5.5 contract. + truncation = entry.get("attribute_truncation") + if truncation: + full_map: dict[str, str] = {} + # The fixture is single-attribute; supply the full + # serialization under the same key for the + # prefix_of_full_serialization clause. + if full_input_serialization is not None: + for attr_name in cast("dict[str, Any]", truncation): + full_map[attr_name] = full_input_serialization + assert_attribute_truncation(attrs, cast("dict[str, Any]", truncation), full_map) + # Recurse into children. + children = cast("list[dict[str, Any]] | None", entry.get("children")) + if children: + _walk(children) + + _walk(expected_tree) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index ac13dd0..672338c 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.7.0" - assert openarmature.__spec_version__ == "0.16.1" + assert openarmature.__spec_version__ == "0.17.0" def test_spec_version_matches_pyproject() -> None: diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 9573fc5..106725d 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -95,15 +95,39 @@ def _build_linear_graph( # --------------------------------------------------------------------------- +# OTel SDK 1.x makes ``set_tracer_provider`` one-shot: once a non-default +# provider is set, subsequent ``set_tracer_provider`` calls are no-ops +# (the SDK logs a warning and returns). The set is guarded by a ``Once`` +# primitive at ``opentelemetry.trace._TRACER_PROVIDER_SET_ONCE``, not +# just by the value of ``_TRACER_PROVIDER``. Restoring via the public +# API silently fails after a prior set, leaking the test's global +# provider into subsequent tests that also touch the OTel global (e.g., +# the conformance fixture 005 sub-case verifying private/global +# isolation). Tests that need to manipulate the global provider use +# this helper to reset BOTH the value and the Once. +def _reset_otel_global_tracer_provider(restore_to: object) -> None: + once = otel_trace._TRACER_PROVIDER_SET_ONCE # type: ignore[attr-defined] + with once._lock: # pyright: ignore[reportPrivateUsage] + if isinstance(restore_to, otel_trace.ProxyTracerProvider): + # No real provider was set before this test; return the + # global to "unset" state so the next set_tracer_provider + # call works as if it were the first. + otel_trace._TRACER_PROVIDER = None # type: ignore[attr-defined] + once._done = False # pyright: ignore[reportPrivateUsage] + else: + otel_trace._TRACER_PROVIDER = restore_to # type: ignore[attr-defined] + once._done = True # pyright: ignore[reportPrivateUsage] + + async def test_observer_uses_private_provider_not_global() -> None: """Spec §6 TracerProvider isolation: the OTelObserver MUST use a PRIVATE TracerProvider; spans MUST NOT appear on the OTel global provider's exporter (this is the load-bearing guarantee against duplicate spans from external auto-instrumentation libraries).""" - # Save the prior global provider so we can restore it after the - # test — pytest fixture-scoping doesn't cover OTel global state. + # Save prior global state and install a separate exporter on the + # OTel global provider. Pytest fixture-scoping doesn't cover the + # OTel global, so we restore it manually in the finally block. prior_global = otel_trace.get_tracer_provider() - # Install a separate exporter on the OTel global provider. global_exporter = InMemorySpanExporter() global_provider = TracerProvider() global_provider.add_span_processor(SimpleSpanProcessor(global_exporter)) @@ -125,7 +149,7 @@ async def test_observer_uses_private_provider_not_global() -> None: f"global provider MUST NOT receive openarmature spans; got {[s.name for s in global_spans]}" ) finally: - otel_trace.set_tracer_provider(prior_global) + _reset_otel_global_tracer_provider(prior_global) # --------------------------------------------------------------------------- @@ -380,7 +404,7 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: from openarmature.graph.events import NodeEvent from openarmature.llm.messages import UserMessage - from openarmature.llm.providers.openai import _LlmEventState + from openarmature.llm.providers.openai import LlmEventPayload from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, @@ -389,8 +413,6 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: Prompt, PromptGroup, PromptResult, - with_active_prompt, - with_active_prompt_group, ) exporter = InMemorySpanExporter() @@ -420,29 +442,48 @@ async def test_active_prompt_propagates_to_llm_span_attributes() -> None: token = _set_invocation_id("inv-1") try: - with with_active_prompt(result), with_active_prompt_group(group): - started = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="started", - pre_state=_LlmEventState(call_id="test-call-prompt", model="test-m"), - post_state=None, - error=None, - parent_states=(), - ) - completed = NodeEvent( - node_name="openarmature.llm.complete", - namespace=("openarmature.llm.complete",), - step=-1, - phase="completed", - pre_state=_LlmEventState(call_id="test-call-prompt", model="test-m", finish_reason="stop"), - post_state=None, - error=None, - parent_states=(), - ) - await observer(started) - await observer(completed) + # Proposal 0024 / friction-roundup #3: the provider captures + # ``current_prompt_result()`` and ``current_prompt_group()`` + # at dispatch time and puts them on the LLM event payload. + # The observer reads from the payload, NOT from the live + # ContextVar — that ContextVar is unreachable from the + # dispatch worker's task-local Context. This test verifies + # the observer correctly surfaces prompt attributes when the + # payload carries them; the cross-task regression case is + # covered separately by an end-to-end test. + started = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="started", + pre_state=LlmEventPayload( + call_id="test-call-prompt", + model="test-m", + active_prompt=result, + active_prompt_group=group, + ), + post_state=None, + error=None, + parent_states=(), + ) + completed = NodeEvent( + node_name="openarmature.llm.complete", + namespace=("openarmature.llm.complete",), + step=-1, + phase="completed", + pre_state=LlmEventPayload( + call_id="test-call-prompt", + model="test-m", + finish_reason="stop", + active_prompt=result, + active_prompt_group=group, + ), + post_state=None, + error=None, + parent_states=(), + ) + await observer(started) + await observer(completed) finally: _reset_invocation_id(token) @@ -462,11 +503,11 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None """Without ``with_active_prompt``, the LLM-call span MUST NOT carry ``openarmature.prompt.*`` attributes.""" from openarmature.graph.events import NodeEvent - from openarmature.llm.providers.openai import _LlmEventState from openarmature.observability.correlation import ( _reset_invocation_id, _set_invocation_id, ) + from openarmature.observability.llm_event import LlmEventPayload exporter = InMemorySpanExporter() observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) @@ -478,7 +519,7 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None namespace=("openarmature.llm.complete",), step=-1, phase="started", - pre_state=_LlmEventState(call_id="test-call-noprompt", model="test-m"), + pre_state=LlmEventPayload(call_id="test-call-noprompt", model="test-m"), post_state=None, error=None, parent_states=(), @@ -488,7 +529,7 @@ async def test_llm_span_has_no_prompt_attributes_when_no_active_prompt() -> None namespace=("openarmature.llm.complete",), step=-1, phase="completed", - pre_state=_LlmEventState(call_id="test-call-noprompt", model="test-m", finish_reason="stop"), + pre_state=LlmEventPayload(call_id="test-call-noprompt", model="test-m", finish_reason="stop"), post_state=None, error=None, parent_states=(), @@ -514,7 +555,7 @@ async def test_disable_llm_spans_skips_llm_provider_span() -> None: # LLM event through the observer's __call__ and assert no span was # produced. This isolates the disable_llm_spans branch from the # provider's own queue-dispatch wiring. - from openarmature.llm.providers.openai import _LlmEventState + from openarmature.observability.llm_event import LlmEventPayload exporter = InMemorySpanExporter() observer = OTelObserver( @@ -529,7 +570,7 @@ async def test_disable_llm_spans_skips_llm_provider_span() -> None: namespace=("openarmature.llm.complete",), step=-1, phase="started", - pre_state=_LlmEventState(call_id="test-call-1", model="test-m"), + pre_state=LlmEventPayload(call_id="test-call-1", model="test-m"), post_state=None, error=None, parent_states=(), @@ -539,7 +580,7 @@ async def test_disable_llm_spans_skips_llm_provider_span() -> None: namespace=("openarmature.llm.complete",), step=-1, phase="completed", - pre_state=_LlmEventState(call_id="test-call-1", model="test-m", finish_reason="stop"), + pre_state=LlmEventPayload(call_id="test-call-1", model="test-m", finish_reason="stop"), post_state=None, error=None, parent_states=(), @@ -1214,3 +1255,130 @@ async def first_line_log_node(_s: _S) -> dict[str, Any]: root.filters[:] = prior_filters logging.setLogRecordFactory(prior_factory) test_logger.setLevel(prior_test_level) + + +# --------------------------------------------------------------------------- +# Friction-roundup #3 regression: prompt context propagates across the +# dispatch-worker task boundary +# --------------------------------------------------------------------------- + + +async def test_prompt_context_propagates_cross_task_via_provider_complete() -> None: + """End-to-end #3 regression: open ``with_active_prompt`` inside a + node body, call ``provider.complete()``, and assert the LLM span + carries ``openarmature.prompt.name``. + + Pre-fix this test failed because: + + - ``invoke()`` calls ``asyncio.create_task(deliver_loop(queue))`` + BEFORE any node body runs. The worker's Context is snapshotted + at task-creation time, so it never sees ContextVars set later + inside a node body. + - The observer used to read ``current_prompt_result()`` from the + worker task — it returned ``None`` because the worker's snapshot + doesn't have ``_active_prompt`` set. + + Post-fix the provider captures ``current_prompt_result()`` at + dispatch time (in the node task's Context, where + ``with_active_prompt`` IS active) and puts the snapshot on the + ``LlmEventPayload``. The observer reads from the payload, not from + a ContextVar. + """ + import json + from datetime import UTC, datetime + + import httpx + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + + from openarmature.graph import END, GraphBuilder, State + from openarmature.llm import OpenAIProvider, UserMessage + from openarmature.prompts import ( + Prompt, + PromptResult, + with_active_prompt, + ) + + def _handler(_request: httpx.Request) -> httpx.Response: + body = { + "id": "cc-test", + "object": "chat.completion", + "model": "test-model", + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": "hi back"}, + "finish_reason": "stop", + } + ], + "usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}, + } + return httpx.Response( + 200, + content=json.dumps(body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + + provider = OpenAIProvider( + base_url="http://mock.test", + model="test-model", + api_key="k", + transport=httpx.MockTransport(_handler), + ) + + now = datetime.now(UTC) + prompt = Prompt( + name="greeting", + version="v1", + label="production", + template="Hello, {{ user }}!", + template_hash="sha256:tpl", + fetched_at=now, + ) + rendered = PromptResult( + name=prompt.name, + version=prompt.version, + label=prompt.label, + template_hash=prompt.template_hash, + rendered_hash="sha256:rendered", + messages=[UserMessage(content="Hello, Alice!")], + variables={"user": "Alice"}, + fetched_at=now, + rendered_at=now, + ) + + class _S(State): + reply: str = "" + + async def ask_llm(_s: _S) -> dict[str, str]: + # The ContextVar set here lives in the node task. Pre-fix, the + # dispatch worker (a separate task) could not see this set. + with with_active_prompt(rendered): + response = await provider.complete(rendered.messages) + return {"reply": response.message.content} + + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + graph = ( + GraphBuilder(_S).add_node("ask_llm", ask_llm).add_edge("ask_llm", END).set_entry("ask_llm") + ).compile() + graph.attach_observer(observer) + try: + await graph.invoke(_S()) + await graph.drain() + finally: + observer.shutdown() + await provider.aclose() + + spans = exporter.get_finished_spans() + llm_spans = [s for s in spans if s.name == "openarmature.llm.complete"] + assert len(llm_spans) == 1, f"expected one LLM span; got {len(llm_spans)}" + attrs = dict(llm_spans[0].attributes or {}) + # Pre-fix these were all None; post-fix all populated from the + # dispatch-time PromptResult snapshot. + assert attrs.get("openarmature.prompt.name") == "greeting" + assert attrs.get("openarmature.prompt.version") == "v1" + assert attrs.get("openarmature.prompt.label") == "production" + assert attrs.get("openarmature.prompt.template_hash") == "sha256:tpl" + assert attrs.get("openarmature.prompt.rendered_hash") == "sha256:rendered"