diff --git a/conformance.toml b/conformance.toml index c99cf63..b3e4063 100644 --- a/conformance.toml +++ b/conformance.toml @@ -687,3 +687,10 @@ since = "0.14.0" status = "textual-only" since = "0.14.0" note = "Descriptive catalog of the failure-mock family (flaky + failure_sequence, flaky_by_index, flaky_per_index, flaky_instance_only, flaky_resume_aware) that the adapter already implements and the existing retry / failure-isolation / checkpoint-resume fixtures exercise; no new behavior, no new fixtures, no code change. Mirrors 0055's textual-only treatment of the conformance-adapter capability. The success-state naming drift (success_update / on_success / success_compute) is documented as-is by the proposal and left unchanged." + +# Spec v0.61.0 (proposal 0061). Detached-trace invocation span +# (observability §4.4 / §4.3 / §4.1 / §4.2 / §5.1). +[proposals."0061"] +status = "implemented" +since = "0.15.0" +note = "The OTel observer synthesizes an openarmature.invocation span at the root of each detached trace (a detached subgraph + each detached fan-out instance), carrying the parent's SHARED invocation_id (detached mode is observer-side trace rendering, not a new run) and the detached unit's own entry_node; the detached subgraph / instance span nests under it. A raising detached subgraph surfaces ERROR + the category + an OTel exception event on BOTH the parent dispatch span and the detached invocation span. Observer-side only -- no graph-engine change; the Langfuse observer is unchanged (its Trace entity already plays the invocation-level-container role). Fixtures 008 (rewritten) and 058 (newly wired) run in test_observability." diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 2dad1ce..26f23dd 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -644,8 +644,11 @@ obs = OTelObserver( ) ``` -A detached subgraph or fan-out gets a fresh trace root (new -`trace_id`); the `correlation_id` still propagates through, so +A detached subgraph or fan-out renders into a fresh trace, rooted in +its own `openarmature.invocation` span that carries the same +`invocation_id` as the parent (detached mode is an observer-side +rendering choice, not a separate run). The new trace has a fresh +`trace_id`, and the `correlation_id` still propagates through, so join semantics survive even when trace boundaries don't. The non-detached default is what you want most of the time: one diff --git a/openarmature-spec b/openarmature-spec index e4d4045..50bd070 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit e4d4045240b30fcfca417fb62f87570ed3e41989 +Subproject commit 50bd0700876792cd1891fe740890002beb6e0279 diff --git a/pyproject.toml b/pyproject.toml index a53e65d..43556da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec" openarmature = "openarmature.cli:main" [tool.openarmature] -spec_version = "0.60.0" +spec_version = "0.61.0" [dependency-groups] dev = [ diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index 9e4a4f2..ee406af 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1,6 +1,6 @@ # OpenArmature — Agent documentation -*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.60.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* +*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.61.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* ## TL;DR @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents: ## Capability contracts -_Sourced from openarmature-spec v0.60.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ +_Sourced from openarmature-spec v0.61.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ ### Capability: `graph-engine` diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index c502c6f..a3eaf54 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -25,7 +25,7 @@ """ __version__ = "0.14.0" -__spec_version__ = "0.60.0" +__spec_version__ = "0.61.0" # Proposal 0052 (spec observability §5.1 / §8.4.1): canonical # package-registry name for this implementation. Surfaces on every # OTel invocation span as ``openarmature.implementation.name`` and on diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 27cff6b..39c3832 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -312,6 +312,24 @@ class _InvState: open_spans: dict[_StackKey, _OpenSpan] = field(default_factory=dict[_StackKey, _OpenSpan]) subgraph_spans: dict[tuple[str, ...], _OpenSpan] = field(default_factory=dict[tuple[str, ...], _OpenSpan]) detached_roots: dict[tuple[str, ...], _OpenSpan] = field(default_factory=dict[tuple[str, ...], _OpenSpan]) + # Proposal 0061 (observability §4.4): a detached trace roots in its + # own ``openarmature.invocation`` span carrying the parent's + # invocation_id; the detached subgraph / fan-out-instance span in + # ``detached_roots`` nests under it. Keyed identically to + # ``detached_roots`` (the prefix for a detached subgraph, ``prefix + + # (str(fan_out_index),)`` for a detached fan-out instance) so the + # two close together as a coterminous parent/child pair. + detached_invocation_spans: dict[tuple[str, ...], _OpenSpan] = field( + default_factory=dict[tuple[str, ...], _OpenSpan] + ) + # Proposal 0061 §4.2: keys (matching ``detached_roots`` / + # ``detached_invocation_spans``, and the detached prefix in + # ``subgraph_spans``) whose spans the detached-error propagation + # marked ERROR. The synthetic close paths consult this to SKIP their + # default ``set_status(OK)``: the OTel SDK treats OK as final and + # lets it OVERRIDE a prior ERROR (only a set to UNSET is ignored), so + # an unconditional OK at close would erase the detached-trace ERROR. + errored_detached_keys: set[tuple[str, ...]] = field(default_factory=set[tuple[str, ...]]) fan_out_instance_root_prefixes: set[tuple[str, ...]] = field(default_factory=set[tuple[str, ...]]) # Per spec §5.4 + proposal 0013 (v0.10.0): non-detached fan-outs # synthesize per-instance dispatch spans nested between the fan-out @@ -810,14 +828,23 @@ def _handle_completed(self, event: NodeEvent) -> None: # Per spec §4.2 / fixture 003: the invocation span MUST # end with ERROR status when any child node errors. OTel # doesn't auto-propagate child status to parents — we set - # it explicitly here. The OTel SDK's status-precedence - # rule preserves ERROR through any subsequent - # ``set_status(OK)`` calls (only UNSET → OK transitions - # are honoured), so the close path's UNSET-leave still - # works for clean invocations. + # it explicitly here. This ERROR survives to export because + # ``_close_invocation_span`` deliberately never calls + # ``set_status(OK)`` on the clean-completion path (it leaves + # the status UNSET, which exporters map to OK). That matters + # because the OTel SDK treats OK as FINAL and lets a later + # OK OVERRIDE a prior ERROR (only a set to UNSET is ignored) + # — an unconditional OK at a close site would erase this, + # the same hazard the §4.2 detached-error propagation guards + # against via ``errored_detached_keys``. inv_open = self._invocation_span.get(invocation_id) if inv_open is not None: inv_open.span.set_status(Status(StatusCode.ERROR, description=event.error.category)) + # Proposal 0061 §4.2: when the erroring node is inside a + # DETACHED subtree, that trace needs its OWN error carriers + # — the invocation span set just above belongs to the PARENT + # trace. Surface ERROR on the detached trace's spans too. + self._propagate_error_to_detached_spans(inv_state, event) else: span.set_status(Status(StatusCode.OK)) self._run_enrichers(span, event) @@ -826,6 +853,51 @@ def _handle_completed(self, event: NodeEvent) -> None: # subsequent re-entry mints a fresh trace. inv_state.detached_roots.pop(event.namespace, None) + def _propagate_error_to_detached_spans(self, inv_state: _InvState, event: NodeEvent) -> None: + # Proposal 0061 §4.2 (Detached invocation span status): a node + # raising inside a detached subtree surfaces ERROR on that + # trace's OWN carriers, not just the parent trace's. For each + # enclosing detached prefix: + # - the detached invocation span (the detached trace's root / + # authoritative carrier) and the parent-trace dispatch span + # (the §4.4 Link carrier) each get the FULL treatment — + # ERROR status + an OTel exception event + the §4 category + # attribute, mirroring the parent invocation span; + # - the detached subgraph / instance span between them gets + # ERROR status only (the invocation span above it carries + # the exception event for that trace). + # Set while the spans are still open; the synthetic close paths + # SKIP their default ``set_status(OK)`` for keys recorded in + # ``errored_detached_keys`` (OTel treats OK as final and lets it + # override a prior ERROR), so the ERROR survives to export. Keys + # cover both the detached-subgraph (prefix) and detached-fan-out- + # instance (prefix + index) schemes. + if event.error is None: + return + err = event.error + category = err.category + for prefix_len in range(len(event.namespace), 0, -1): + base = event.namespace[:prefix_len] + keys = [base] + if event.fan_out_index is not None: + keys.append(base + (str(event.fan_out_index),)) + for key in keys: + inv_span = inv_state.detached_invocation_spans.get(key) + if inv_span is None: + continue + inv_state.errored_detached_keys.add(key) + inv_span.span.set_status(Status(StatusCode.ERROR, description=category)) + inv_span.span.record_exception(err) + inv_span.span.set_attribute("openarmature.error.category", category) + dispatch = inv_state.subgraph_spans.get(key) + if dispatch is not None: + dispatch.span.set_status(Status(StatusCode.ERROR, description=category)) + dispatch.span.record_exception(err) + dispatch.span.set_attribute("openarmature.error.category", category) + root = inv_state.detached_roots.get(key) + if root is not None: + root.span.set_status(Status(StatusCode.ERROR, description=category)) + # ------------------------------------------------------------------ # Metadata augmentation (proposal 0040 §3.4 + §6) # ------------------------------------------------------------------ @@ -1605,7 +1677,9 @@ def _sync_subgraph_spans( # fan-out (event.fan_out_index populated, fan-out NODE # name at ``prefix[-1]`` in the configured set). if event.fan_out_index is not None and prefix[-1] in self.detached_fan_outs: - self._open_detached_fan_out_instance_root(inv_state, correlation_id, prefix, event) + self._open_detached_fan_out_instance_root( + inv_state, invocation_id, correlation_id, prefix, event + ) continue # Per spec §5.4 + proposal 0013: non-detached fan-out # instances get a synthetic per-instance dispatch span @@ -1720,10 +1794,45 @@ def _close_subgraph_span(self, inv_state: _InvState, prefix: tuple[str, ...]) -> open_span = inv_state.subgraph_spans.pop(prefix, None) if open_span is None: return - open_span.span.set_status(Status(StatusCode.OK)) + # Skip the default OK for a detached dispatch span the §4.2 error + # path marked ERROR (proposal 0061) — OTel lets a later OK + # override ERROR (see errored_detached_keys). + if prefix not in inv_state.errored_detached_keys: + open_span.span.set_status(Status(StatusCode.OK)) self._run_enrichers(open_span.span, None) open_span.span.end() + def _detached_invocation_attrs( + self, + invocation_id: str, + correlation_id: str | None, + prefix: tuple[str, ...], + event: NodeEvent, + ) -> dict[str, Any]: + # Proposal 0061 §5.1 attribute set for a detached invocation + # span. Mirrors ``_open_invocation_span`` but carries the SAME + # invocation_id as the parent (detached mode shares the run + # identity per §4.3 — distinct from checkpoint-resume, which + # mints a fresh id) and the detached unit's OWN entry node: the + # namespace segment immediately after the detached prefix, which + # is the entry node of the outermost graph OF THE DETACHED TRACE + # (the detached subgraph, or the fan-out instance subgraph). + # Callers always pass a strict-ancestor prefix (the ancestor walk + # in ``_sync_subgraph_spans`` only fires at depths below the + # event's namespace length), so ``len(prefix) < len(namespace)`` + # and the index is always in range. + attrs: dict[str, Any] = { + "openarmature.graph.entry_node": event.namespace[len(prefix)], + "openarmature.graph.spec_version": self.spec_version, + "openarmature.implementation.name": self.implementation_name, + "openarmature.implementation.version": self.implementation_version, + "openarmature.invocation_id": invocation_id, + } + if correlation_id is not None: + attrs["openarmature.correlation_id"] = correlation_id + _apply_caller_metadata(attrs, event.caller_invocation_metadata) + return attrs + def _open_detached_subgraph_root( self, inv_state: _InvState, @@ -1784,16 +1893,41 @@ def _open_detached_subgraph_root( ) inv_state.subgraph_spans[prefix] = _OpenSpan(span=parent_dispatch) - # 3. Open the detached root span — parented to the synthetic - # detached SpanContext so OTel uses the new trace_id. + # 3. Proposal 0061: the detached trace roots in its OWN + # ``openarmature.invocation`` span (parented to the synthetic + # detached SpanContext so OTel uses the new trace_id). It + # carries the §5.1 invocation-span attribute set with the + # SAME invocation_id as the parent — detached mode is + # observer-side trace rendering, not a new run (§4.3) — so + # the §5.1 always-emit attribution invariant lands on the + # detached trace's root with no per-context caveat. detached_parent_ctx = otel_trace.set_span_in_context( NonRecordingSpan(detached_sc), otel_context.Context() ) + detached_invocation = self._tracer.start_span( + name="openarmature.invocation", + context=cast("Any", detached_parent_ctx), + kind=SpanKind.INTERNAL, + attributes=self._detached_invocation_attrs(invocation_id, correlation_id, prefix, event), + ) + # A fresh detached invocation span at this prefix starts clean: + # discard any ERROR marker a prior generation left here (cyclic + # / fire-and-forget re-entry at the same prefix). Keys are only + # ever added while such a span is open, so this open is the one + # place stale state could otherwise persist; clearing it keeps + # the synthetic close paths reflecting only this generation. + inv_state.errored_detached_keys.discard(prefix) + inv_state.detached_invocation_spans[prefix] = _OpenSpan(span=detached_invocation) + + # 4. Open the detached subgraph span as a child of the detached + # invocation span (normal §4.3 nesting within the detached + # trace). Inner-node spans continue to parent under THIS span + # via ``detached_roots`` — the invocation span sits above it. attrs_root: dict[str, Any] = dict(attrs_parent) attrs_root["openarmature.subgraph.detached"] = True detached_root = self._tracer.start_span( name=prefix[-1], - context=cast("Any", detached_parent_ctx), + context=cast("Any", set_span_in_context(detached_invocation)), kind=SpanKind.INTERNAL, attributes=attrs_root, ) @@ -1802,6 +1936,7 @@ def _open_detached_subgraph_root( def _open_detached_fan_out_instance_root( self, inv_state: _InvState, + invocation_id: str, correlation_id: str | None, prefix: tuple[str, ...], event: NodeEvent, @@ -1833,10 +1968,29 @@ def _open_detached_fan_out_instance_root( if fan_out_open is not None: fan_out_open.span.add_link(detached_sc) - # Open the detached instance root span. + # Proposal 0061: each detached instance trace roots in its OWN + # ``openarmature.invocation`` span (shared parent invocation_id, + # the instance subgraph's entry node), with the fan-out instance + # span nested under it. Keyed by prefix + (str(fan_out_index),) + # so per-instance roots stay distinct. + instance_key = prefix + (str(event.fan_out_index),) detached_parent_ctx = otel_trace.set_span_in_context( NonRecordingSpan(detached_sc), otel_context.Context() ) + detached_invocation = self._tracer.start_span( + name="openarmature.invocation", + context=cast("Any", detached_parent_ctx), + kind=SpanKind.INTERNAL, + attributes=self._detached_invocation_attrs(invocation_id, correlation_id, prefix, event), + ) + # Clear any stale ERROR marker for this instance key before the + # fresh span opens (see the detached-subgraph open path) so a + # re-run of the same instance starts clean. + inv_state.errored_detached_keys.discard(instance_key) + inv_state.detached_invocation_spans[instance_key] = _OpenSpan(span=detached_invocation) + + # Open the detached instance root span as a child of the + # per-instance invocation span. attrs: dict[str, Any] = { "openarmature.node.name": prefix[-1], "openarmature.fan_out.parent_node_name": prefix[-1], @@ -1847,13 +2001,10 @@ def _open_detached_fan_out_instance_root( _apply_caller_metadata(attrs, event.caller_invocation_metadata) instance_root = self._tracer.start_span( name=prefix[-1], - context=cast("Any", detached_parent_ctx), + context=cast("Any", set_span_in_context(detached_invocation)), kind=SpanKind.INTERNAL, attributes=attrs, ) - # Key by prefix + (str(fan_out_index),) so per-instance - # roots stay distinct. - instance_key = prefix + (str(event.fan_out_index),) inv_state.detached_roots[instance_key] = _OpenSpan(span=instance_root) inv_state.fan_out_instance_root_prefixes.add(instance_key) @@ -2011,9 +2162,26 @@ def _close_parallel_branches_branch_dispatch_span( def _close_detached_root(self, inv_state: _InvState, prefix: tuple[str, ...]) -> None: inv_state.fan_out_instance_root_prefixes.discard(prefix) open_span = inv_state.detached_roots.pop(prefix, None) + if open_span is not None: + if prefix not in inv_state.errored_detached_keys: + open_span.span.set_status(Status(StatusCode.OK)) + self._run_enrichers(open_span.span, None) + open_span.span.end() + # Proposal 0061: close the paired detached invocation span (the + # parent of the detached root within the detached trace) AFTER + # the root — children before parents. + self._close_detached_invocation_span(inv_state, prefix) + + def _close_detached_invocation_span(self, inv_state: _InvState, prefix: tuple[str, ...]) -> None: + open_span = inv_state.detached_invocation_spans.pop(prefix, None) if open_span is None: return - open_span.span.set_status(Status(StatusCode.OK)) + # Skip the default OK when the §4.2 error path marked this key + # ERROR (proposal 0061) — OTel lets a later OK override ERROR + # (see errored_detached_keys); an UNSET close maps to OK by + # exporter convention. + if prefix not in inv_state.errored_detached_keys: + open_span.span.set_status(Status(StatusCode.OK)) self._run_enrichers(open_span.span, None) open_span.span.end() @@ -2158,6 +2326,10 @@ def _drain_inv_state(self, inv_state: _InvState) -> None: self._drain_open_span(open_span) for prefix in sorted(inv_state.detached_roots.keys(), key=lambda k: -len(k)): self._close_detached_root(inv_state, prefix) + # Defensive: _close_detached_root closes the paired invocation + # span, but a re-entry pop (or a partial open) could orphan one. + for prefix in sorted(inv_state.detached_invocation_spans.keys(), key=lambda k: -len(k)): + self._close_detached_invocation_span(inv_state, prefix) for prefix in sorted(inv_state.subgraph_spans.keys(), key=lambda k: -len(k)): self._close_subgraph_span(inv_state, prefix) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index fa0f0fc..c6b7bcb 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -35,7 +35,7 @@ import re from collections.abc import Mapping, Sequence from pathlib import Path -from typing import Any, cast +from typing import TYPE_CHECKING, Any, cast import pytest import yaml @@ -51,6 +51,9 @@ from .adapter import build_graph # noqa: E402 +if TYPE_CHECKING: + from opentelemetry.sdk.trace import ReadableSpan + # OTel SDK 1.x makes ``set_tracer_provider`` one-shot: once a non-default # provider is set, subsequent calls are no-ops (the SDK logs a warning @@ -141,6 +144,12 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "054-llm-completion-event-fan-out-index-population", "055-llm-completion-event-branch-name-population", "056-llm-completion-event-strict-serial-ordering", + # proposal 0052 attribution fixture (case 1) + proposal 0061 + # (case 2: the §5.1 attribution lands on the detached trace's own + # openarmature.invocation span). Wired together now that 0061 + # (v0.61.0) resolves the detached-invocation-span shape case 2 + # presupposed — the whole fixture was unwired pending that. + "058-implementation-attribution-otel", } ) @@ -232,6 +241,8 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_055(spec) elif fixture_id == "056-llm-completion-event-strict-serial-ordering": await _run_fixture_056(spec) + elif fixture_id == "058-implementation-attribution-otel": + await _run_fixture_058(spec) elif fixture_id in { "012-otel-llm-payload-default-off", "013-otel-llm-payload-enabled", @@ -1696,14 +1707,22 @@ async def _run_fixture_008(spec: Mapping[str, Any]) -> None: raise AssertionError(f"case {case_name!r}: {e}") from e -async def _run_fixture_008_case(case: Mapping[str, Any]) -> None: - case_name = case["name"] - # The fixture configures detached subgraphs by the SUBGRAPH'S - # IDENTITY NAME (the key in ``subgraphs:``), but the OTel observer - # keys on the WRAPPER NODE'S NAME in the parent graph (consistent - # with graph-engine §6's namespace convention — see fixture 029 - # spec note). Translate by looking up the wrapper node that - # references each detached subgraph identity. +async def _run_detached_case_graph( + case: Mapping[str, Any], *, expect_raise: bool = False +) -> Sequence[ReadableSpan]: + """Build + invoke a detached-mode fixture case; return the finished + spans. Shared by fixture 008 (detached trace mode) and fixture 058 + case 2 (attribution on the detached invocation span). + + ``expect_raise`` swallows the ``invoke()`` exception for the + error-status case (the detached subgraph raises and propagates); the + spans are captured regardless. + """ + # The fixture configures detached subgraphs by the SUBGRAPH'S IDENTITY + # NAME (the key in ``subgraphs:``), but the OTel observer keys on the + # WRAPPER NODE'S NAME in the parent graph (graph-engine §6 namespace + # convention; see the fixture 029 spec note). Translate by looking up + # the wrapper node that references each detached subgraph identity. detached_subgraph_identities = set(cast("list[str]", case.get("detached_subgraphs") or [])) nodes = cast("dict[str, Any]", case.get("nodes") or {}) wrapper_names_for_detached: set[str] = set() @@ -1726,24 +1745,42 @@ async def _run_fixture_008_case(case: Mapping[str, Any]) -> None: detached_fan_outs=detached_fan_outs, ) - # Patch the inner subgraph's ``update_pure_from_state`` directive - # if present — the adapter doesn't translate it, but the test - # assertions only inspect span structure (Links, trace counts), - # not the computed values. Replacing with a no-op ``update_pure`` - # keeps the graph runnable. + # Patch test-seam directives the adapter doesn't translate + # (``update_pure_from_state`` etc.) with a benign no-op; these + # assertions only inspect span structure, not computed values. _patch_unsupported_directives(case) - # Build subgraphs declared by the fixture (subgraph: or subgraphs:). subgraphs = _compile_subgraphs(case) trace_log: list[str] = [] built = build_graph(case, subgraphs=subgraphs, trace=trace_log) compiled = built.builder.compile() compiled.attach_observer(observer) initial_state = built.initial_state(case.get("initial_state", {})) - await compiled.invoke(initial_state) + try: + await compiled.invoke(initial_state) + except Exception: + if not expect_raise: + raise await compiled.drain() observer.shutdown() - spans = exporter.get_finished_spans() + return exporter.get_finished_spans() + + +def _invocation_id_of(span: Any) -> Any: + """The ``openarmature.invocation_id`` attribute off a span (or None).""" + return dict(span.attributes or {}).get("openarmature.invocation_id") + + +def _has_exception_event(span: Any) -> bool: + """Whether the span recorded an OTel exception event.""" + events = cast("list[Any]", span.events or []) + return any(getattr(e, "name", None) == "exception" for e in events) + + +async def _run_fixture_008_case(case: Mapping[str, Any]) -> None: + case_name = case["name"] + expect_raise = case_name == "detached_subgraph_raises_error_status_on_both_spans" + spans = await _run_detached_case_graph(case, expect_raise=expect_raise) if case_name == "detached_subgraph_two_traces_one_link": # Group by trace_id. Span context is non-None for any span @@ -1763,7 +1800,8 @@ async def _run_fixture_008_case(case: Mapping[str, Any]) -> None: # Find the parent dispatch span (it's the one with a Link). dispatch_spans = [s for s in spans if s.name == "dispatch"] # Two "dispatch" spans: one in parent trace (with Link), one in - # detached trace (the root). Pick the one with links. + # detached trace (under the detached invocation span). Pick the + # one with links. parent_dispatch = next((s for s in dispatch_spans if s.links), None) assert parent_dispatch is not None, "expected a 'dispatch' span carrying a Link to the detached trace" assert len(parent_dispatch.links) == 1, ( @@ -1782,6 +1820,21 @@ async def _run_fixture_008_case(case: Mapping[str, Any]) -> None: f"Link target trace_id MUST match the detached trace's trace_id; " f"got link={link_target_trace_id!r}, detached={detached_trace_id!r}" ) + # Proposal 0061: the detached trace roots in its OWN + # openarmature.invocation span sharing the parent's invocation_id + # (invariant detached_invocation_id_equals_parent). + inv_spans = [s for s in spans if s.name == "openarmature.invocation"] + assert len(inv_spans) == 2, f"expected parent + detached invocation spans; got {len(inv_spans)}" + parent_inv = next((s for s in inv_spans if cast("Any", s.context).trace_id == parent_trace_id), None) + detached_inv = next( + (s for s in inv_spans if cast("Any", s.context).trace_id == detached_trace_id), None + ) + assert detached_inv is not None, "detached trace MUST root in an openarmature.invocation span" + assert parent_inv is not None + parent_iid = _invocation_id_of(parent_inv) + assert parent_iid is not None and _invocation_id_of(detached_inv) == parent_iid, ( + "detached invocation span MUST share the parent's invocation_id (§4.3)" + ) return if case_name == "detached_fan_out_one_trace_per_instance": @@ -1807,6 +1860,145 @@ async def _run_fixture_008_case(case: Mapping[str, Any]) -> None: assert len(parent_fan_out.links) == 3, ( f"fan-out span MUST carry one Link per instance (3); got {len(parent_fan_out.links)}" ) + # Proposal 0061: each instance trace roots in its OWN + # openarmature.invocation span sharing the parent's invocation_id. + parent_trace_id = cast("Any", parent_fan_out.context).trace_id + inv_spans = [s for s in spans if s.name == "openarmature.invocation"] + assert len(inv_spans) == 4, f"expected parent + 3 instance invocation spans; got {len(inv_spans)}" + parent_inv = next((s for s in inv_spans if cast("Any", s.context).trace_id == parent_trace_id), None) + assert parent_inv is not None + parent_iid = _invocation_id_of(parent_inv) + instance_invs = [s for s in inv_spans if cast("Any", s.context).trace_id != parent_trace_id] + assert len(instance_invs) == 3, ( + f"expected 3 detached instance invocation spans; got {len(instance_invs)}" + ) + for inv in instance_invs: + assert parent_iid is not None and _invocation_id_of(inv) == parent_iid, ( + "each detached instance invocation span MUST share the parent's invocation_id" + ) + return + + if case_name == "detached_subgraph_raises_error_status_on_both_spans": + # Proposal 0061 §4.2: a raising detached subgraph surfaces ERROR + # on BOTH the parent's dispatch span and the detached invocation + # span — distinct traces, shared invocation_id, each with the §4 + # category + an OTel exception event. + dispatch_spans = [s for s in spans if s.name == "dispatch"] + parent_dispatch = next((s for s in dispatch_spans if s.links), None) + assert parent_dispatch is not None, "expected a parent 'dispatch' span with a Link" + assert parent_dispatch.status.status_code.name == "ERROR", ( + "parent dispatch span MUST carry ERROR for a raising detached subgraph" + ) + assert _has_exception_event(parent_dispatch), "parent dispatch span MUST record the exception" + assert dict(parent_dispatch.attributes or {}).get("openarmature.error.category") == "node_exception" + parent_trace_id = cast("Any", parent_dispatch.context).trace_id + detached_trace_id = parent_dispatch.links[0].context.trace_id + assert detached_trace_id != parent_trace_id, "detached + parent traces MUST be distinct" + inv_spans = [s for s in spans if s.name == "openarmature.invocation"] + detached_inv = next( + (s for s in inv_spans if cast("Any", s.context).trace_id == detached_trace_id), None + ) + parent_inv = next((s for s in inv_spans if cast("Any", s.context).trace_id == parent_trace_id), None) + assert detached_inv is not None, "detached trace MUST root in an openarmature.invocation span" + assert parent_inv is not None + assert detached_inv.status.status_code.name == "ERROR", ( + "detached invocation span MUST carry the detached unit's ERROR status (§4.2)" + ) + assert _has_exception_event(detached_inv), "detached invocation span MUST record the exception" + assert dict(detached_inv.attributes or {}).get("openarmature.error.category") == "node_exception" + parent_iid = _invocation_id_of(parent_inv) + assert parent_iid is not None and _invocation_id_of(detached_inv) == parent_iid, ( + "detached invocation span MUST share the parent's invocation_id" + ) + return + + raise AssertionError(f"unknown sub-case {case_name!r}") + + +def _assert_attribution_on_invocation_only(spans: Any) -> None: + """Assert every openarmature.invocation span carries non-empty + implementation.name (the canonical python value) and + implementation.version, and that no inner span carries either.""" + # Proposal 0052 §5.1: the attribution attributes are invocation-span- + # only (not §5.6 cross-cutting), so inner spans MUST NOT carry them. + inv_spans = [s for s in spans if s.name == "openarmature.invocation"] + assert inv_spans, "expected at least one openarmature.invocation span" + for inv in inv_spans: + iattrs = dict(inv.attributes or {}) + name = iattrs.get("openarmature.implementation.name") + version = iattrs.get("openarmature.implementation.version") + assert name == "openarmature-python", ( + f"implementation.name MUST equal the canonical python value; got {name!r}" + ) + assert isinstance(version, str) and len(version) > 0, ( + "implementation.version MUST be a non-empty string" + ) + for s in spans: + if s.name == "openarmature.invocation": + continue + sattrs = dict(s.attributes or {}) + assert "openarmature.implementation.name" not in sattrs, ( + f"inner span {s.name!r} MUST NOT carry implementation.name" + ) + assert "openarmature.implementation.version" not in sattrs, ( + f"inner span {s.name!r} MUST NOT carry implementation.version" + ) + + +async def _run_fixture_058(spec: Mapping[str, Any]) -> None: + """Implementation-attribution attributes. Case 1: present on the + invocation span, absent on inner spans. Case 2: the attribution + lands on the detached trace's OWN openarmature.invocation span, with + the subgraph-wrapper span nested under it.""" + # Case 1 covers proposal 0052 (§5.1 attribution); case 2 additionally + # depends on proposal 0061 (the detached trace gains its own + # invocation-span root for the attributes to land on). + for case in cast("list[dict[str, Any]]", spec["cases"]): + case_name = cast("str", case["name"]) + try: + await _run_fixture_058_case(case) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + +async def _run_fixture_058_case(case: Mapping[str, Any]) -> None: + case_name = case["name"] + if case_name == "implementation_attribution_attributes_present_on_invocation_span": + observer, exporter = _build_observer() + await _run_graph(case, observer) + observer.shutdown() + _assert_attribution_on_invocation_only(exporter.get_finished_spans()) + return + + if case_name == "detached_subgraph_attribution_propagates_to_child_trace_invocation_span": + spans = await _run_detached_case_graph(case) + # Two invocation spans (parent + detached); §5.1 attribution on + # both, absent on every inner span across both traces. + inv_spans = [s for s in spans if s.name == "openarmature.invocation"] + assert len(inv_spans) == 2, f"expected parent + detached invocation spans; got {len(inv_spans)}" + _assert_attribution_on_invocation_only(spans) + # Proposal 0061: the detached subgraph-wrapper span nests BETWEEN + # the detached invocation span and the inner node (058 case 2 + # added the previously-missing wrapper layer). + wrapper = next( + (s for s in spans if dict(s.attributes or {}).get("openarmature.subgraph.detached") is True), + None, + ) + assert wrapper is not None, "expected a detached subgraph-wrapper span" + detached_trace_id = cast("Any", wrapper.context).trace_id + detached_inv = next( + (s for s in inv_spans if cast("Any", s.context).trace_id == detached_trace_id), None + ) + assert detached_inv is not None, "detached trace MUST root in an openarmature.invocation span" + assert ( + wrapper.parent is not None and wrapper.parent.span_id == cast("Any", detached_inv.context).span_id + ), "detached subgraph-wrapper span MUST nest under the detached invocation span" + inner = [ + s + for s in spans + if s.parent is not None and s.parent.span_id == cast("Any", wrapper.context).span_id + ] + assert inner, "the inner node MUST nest under the subgraph-wrapper span (no skipped layer)" return raise AssertionError(f"unknown sub-case {case_name!r}") diff --git a/tests/test_smoke.py b/tests/test_smoke.py index a4f0635..9e18efd 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.14.0" - assert openarmature.__spec_version__ == "0.60.0" + assert openarmature.__spec_version__ == "0.61.0" def test_spec_version_matches_pyproject() -> None: diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 6fd0ba1..2fe2b0e 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -2711,14 +2711,31 @@ async def _leaf(_s: _InnerS) -> dict[str, str]: and dict(s.attributes or {}).get("openarmature.subgraph.detached") is True ] assert len(detached_roots) == 1 - # Its trace_id MUST differ from the leaf's parent invocation trace. + # Proposal 0061: the detached trace roots in its OWN + # ``openarmature.invocation`` span (parent + detached = two + # invocation spans), both carrying the SAME invocation_id, with the + # detached subgraph span nested under the detached invocation span. inv_spans = [s for s in spans if s.name == "openarmature.invocation"] - assert len(inv_spans) == 1 - inv_trace_id = cast("Any", inv_spans[0].context).trace_id + assert len(inv_spans) == 2, f"expected parent + detached invocation spans, got {len(inv_spans)}" detached_trace_id = cast("Any", detached_roots[0].context).trace_id - assert detached_trace_id != inv_trace_id, ( + detached_inv = next((s for s in inv_spans if cast("Any", s.context).trace_id == detached_trace_id), None) + parent_inv = next((s for s in inv_spans if cast("Any", s.context).trace_id != detached_trace_id), None) + assert detached_inv is not None, "detached trace MUST root in an openarmature.invocation span" + assert parent_inv is not None + assert detached_trace_id != cast("Any", parent_inv.context).trace_id, ( "detached subgraph root MUST live in a fresh trace, not the parent invocation trace" ) + # Shared invocation_id across the trace boundary (§4.3). + detached_inv_id = dict(detached_inv.attributes or {}).get("openarmature.invocation_id") + parent_inv_id = dict(parent_inv.attributes or {}).get("openarmature.invocation_id") + assert detached_inv_id is not None and detached_inv_id == parent_inv_id, ( + "detached invocation span MUST carry the SAME invocation_id as the parent (§4.3)" + ) + # The detached subgraph span nests under the detached invocation span. + assert detached_roots[0].parent is not None + assert detached_roots[0].parent.span_id == cast("Any", detached_inv.context).span_id, ( + "detached subgraph span MUST nest under the detached invocation span" + ) async def test_three_deep_mixed_pb_fan_out_pb_composition() -> None: