diff --git a/CHANGELOG.md b/CHANGELOG.md index dfac0ae..de7964f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,11 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The - **Detached-trace invocation span** (proposal 0061, observability §4.4, spec v0.61.0). The OTel observer now synthesizes an `openarmature.invocation` span at the root of each detached trace (a detached subgraph and each detached fan-out instance), carrying the parent's shared `invocation_id` (detached mode is observer-side trace rendering, not a new run) and the detached unit's own `entry_node`; the detached subgraph / instance span nests under it. A raising detached subgraph surfaces ERROR plus the error category and an OTel exception event on both the parent dispatch span and the detached invocation span. This is observer-side only, with no graph-engine change; the Langfuse observer is unchanged (its Trace entity already plays the invocation-level-container role). Conformance fixtures 008 (rewritten) and 058 (newly wired) run in `test_observability`. - **Per-attempt LLM spans under call-level retry** (proposal 0050, observability §5.5 / llm-provider §7.1). Completes proposal 0050, which shipped `partial` in v0.14.0 (failure-isolation middleware and the `complete(retry=...)` loop landed then; the per-attempt span surface was deferred). Under call-level retry the OTel observer now emits one `openarmature.llm.complete` span per attempt, each carrying `openarmature.llm.attempt_index` (0-based, 0..N-1, and 0 for a no-retry call). An intermediate failed attempt's span carries ERROR status plus its error category and the request-side attributes; the final attempt's span carries the terminal outcome and, on success, the full response surface. A python-internal `LlmRetryAttemptEvent`, dispatched once per attempt, is the sole source of the OTel span; the terminal `LlmCompletionEvent` / `LlmFailedEvent` stay one per call (payload, latency, Langfuse Generation) and no longer drive the OTel span. Langfuse renders one terminal Generation per call, with the per-attempt detail on the OTel span surface only (a spec-side §8 clarification to pin this is tracked, non-blocking). `conformance.toml` flips proposal 0050 to `implemented`; the call-level fixtures 056-058 are driven through the provider plus OTel observer and the single-attempt observability fixture 057 is wired. +- **Langfuse `trace.userId` / `trace.sessionId` population** (proposal 0064, observability §8.4.1, spec v0.62.0). The Langfuse observer now promotes a recognized `userId` key in the caller-supplied invocation metadata to Langfuse's first-class `trace.userId` field (the Users dashboard), additively: the key also remains at `trace.metadata.userId`. Promotion is automatic and unconditional; an absent key leaves `trace.userId` unset. The `LangfuseClient.trace()` surface (the Protocol, the in-memory client, and the SDK adapter) gains `session_id` / `user_id`. `trace.sessionId` is sourced from `openarmature.session_id`, which the sessions capability (proposal 0020) establishes; that capability is not yet implemented in python, so the `sessionId` plumbing is in place but dormant (no source) and unset in the interim. `conformance.toml` records proposal 0064 `partial` on that basis: fixture 084 cases 2/3/4 (not session-bound, `userId` present additively, `userId` absent) run, and the session-bound cases 1/5 defer until 0020. Langfuse-only: the OTel side already carries `openarmature.session_id` and `openarmature.user.*` as span attributes, and OTel has no trace-level session/user field. ### Changed -- **Pinned spec advances v0.60.0 → v0.61.0** (proposal 0061, the detached-trace invocation span above). A single step this cycle; `conformance.toml` records proposal 0061 as `implemented`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. +- **Pinned spec advances v0.60.0 → v0.62.0** across the v0.15.0 cycle: v0.61.0 (proposal 0061, the detached-trace invocation span above) and v0.62.0 (proposal 0064, the Langfuse session/user population above). `conformance.toml` records 0061 `implemented` and 0064 `partial` (its `sessionId` half is dormant pending the sessions capability). Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. ## [0.14.0] — 2026-06-17 diff --git a/conformance.toml b/conformance.toml index 7ead03c..5e10814 100644 --- a/conformance.toml +++ b/conformance.toml @@ -698,3 +698,10 @@ note = "Descriptive catalog of the failure-mock family (flaky + failure_sequence status = "implemented" since = "0.15.0" note = "The OTel observer synthesizes an openarmature.invocation span at the root of each detached trace (a detached subgraph + each detached fan-out instance), carrying the parent's SHARED invocation_id (detached mode is observer-side trace rendering, not a new run) and the detached unit's own entry_node; the detached subgraph / instance span nests under it. A raising detached subgraph surfaces ERROR + the category + an OTel exception event on BOTH the parent dispatch span and the detached invocation span. Observer-side only -- no graph-engine change; the Langfuse observer is unchanged (its Trace entity already plays the invocation-level-container role). Fixtures 008 (rewritten) and 058 (newly wired) run in test_observability." + +# Spec v0.62.0 (proposal 0064). Langfuse trace.sessionId / trace.userId +# population (observability §8.4.1 / §8.10). +[proposals."0064"] +status = "partial" +since = "0.15.0" +note = "The Langfuse observer promotes a recognized userId caller-metadata key to the first-class trace.userId (additive: the key also stays in trace.metadata.userId), and sets trace.sessionId from openarmature.session_id when present. trace.userId is LIVE (sourced from 0034 caller metadata): fixture 084 cases 2/3/4 (not-session-bound, userId present additive, userId absent) pass. partial because trace.sessionId is DORMANT -- openarmature.session_id is established by the sessions capability (0020, observability §5.6), unimplemented in python until v0.19.0, so there is no session_id source yet; the trace(session_id=) plumbing is wired end to end but the observer passes None. Fixture 084 session-bound cases 1 + 5 are deferred (per-case) pending 0020. Langfuse-only: no OTel change (the OTel side already carries openarmature.session_id + openarmature.user.* as span attributes; no trace-level OTel equivalent)." diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 7aa2a01..0d3f63e 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -1048,6 +1048,18 @@ for a runnable demo. - **Trace name.** Defaults to the entry-node name (spec §8.6 fallback). Caller-supplied invocation labels land in PR 4 (proposal 0034). +- **Session / user grouping (`trace.sessionId` / `trace.userId`).** + The observer populates the two cross-trace grouping fields behind + Langfuse's Sessions and Users dashboards (spec §8.4.1, proposal + 0064). `trace.userId` is promoted from a recognized `userId` key in + the caller-supplied invocation metadata, automatically and + additively (the key also stays at `trace.metadata.userId`); an + absent key leaves it unset. `trace.sessionId` is sourced from + `openarmature.session_id` (the sessions capability), which is not + yet implemented, so it is unset for now. There is no OTel + equivalent (an OTel trace has no trace-level session / user field); + the same identity already rides as `openarmature.session_id` and the + `openarmature.user.*` family on the OTel span side. - **Per-observation metadata.** Each Span / Generation carries `namespace`, `step`, `attempt_index`, optional `fan_out_index` / `branch_name`, and the `correlation_id` cross-cutting join key diff --git a/examples/langfuse-observability/main.py b/examples/langfuse-observability/main.py index 493ec0b..56192e2 100644 --- a/examples/langfuse-observability/main.py +++ b/examples/langfuse-observability/main.py @@ -16,7 +16,10 @@ observation picks that up and links back to the entity, which is how production Langfuse dashboards thread "this generation came from prompt v7 of `mission-briefing`" without you having to wire anything up -manually. +manually. It also tags each trace with a ``userId`` (operator identity) +via invocation metadata; the observer promotes that to Langfuse's +first-class user dimension, so the Users dashboard groups and filters +the assistant's traffic by operator. The example uses the bundled ``InMemoryLangfuseClient`` recorder so the demo runs without a Langfuse account; at the end we print the captured @@ -193,6 +196,8 @@ def _format_trace(trace: LangfuseTrace) -> str: lines: list[str] = [] lines.append(f"Trace id={trace.id}") lines.append(f" name={trace.name!r}") + if trace.user_id is not None: + lines.append(f" userId={trace.user_id!r} (promoted to the Langfuse Users dimension)") lines.append(f" metadata={_format_metadata(trace.metadata)}") for obs in trace.children_of(None): _format_observation(lines, trace, obs, indent=" ") @@ -274,7 +279,15 @@ async def main() -> None: graph.attach_observer(observer) try: - final = await graph.invoke(BriefingState(question=question)) + # metadata={"userId": ...} tags the trace with an operator + # identity. The Langfuse observer promotes a recognized ``userId`` + # key to the first-class trace.userId field so the Users dashboard + # can group and filter traces by operator (additive: it also stays + # in trace.metadata.userId). + final = await graph.invoke( + BriefingState(question=question), + metadata={"userId": "flight-controller-gene"}, + ) finally: # Required for short-lived processes: invoke() returns when the # graph reaches END regardless of whether the observer queue diff --git a/openarmature-spec b/openarmature-spec index 50bd070..963504e 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit 50bd0700876792cd1891fe740890002beb6e0279 +Subproject commit 963504ede4fda5ae32cfd5b68331036536a1fefb diff --git a/pyproject.toml b/pyproject.toml index 43556da..28e0c43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec" openarmature = "openarmature.cli:main" [tool.openarmature] -spec_version = "0.61.0" +spec_version = "0.62.0" [dependency-groups] dev = [ diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index ee406af..87fc502 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1,6 +1,6 @@ # OpenArmature — Agent documentation -*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.61.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* +*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.62.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* ## TL;DR @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents: ## Capability contracts -_Sourced from openarmature-spec v0.61.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ +_Sourced from openarmature-spec v0.62.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ ### Capability: `graph-engine` diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index a3eaf54..7a41674 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -25,7 +25,7 @@ """ __version__ = "0.14.0" -__spec_version__ = "0.61.0" +__spec_version__ = "0.62.0" # Proposal 0052 (spec observability §5.1 / §8.4.1): canonical # package-registry name for this implementation. Surfaces on every # OTel invocation span as ``openarmature.implementation.name`` and on diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index ed2964b..ed7bc93 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -209,6 +209,8 @@ def trace( id: str, name: str | None = None, metadata: dict[str, Any] | None = None, + session_id: str | None = None, + user_id: str | None = None, ) -> None: # v4 has no explicit trace creation; cache the info and apply # it via propagate_attributes on every observation under this @@ -221,7 +223,15 @@ def trace( # reserved (proposal 0041), so no caller metadata collides. if not _is_uuid(id): md.setdefault("invocation_id", id) - self._trace_info[id] = {"name": name, "metadata": md} + # Proposal 0064 §8.4.1: cache the session/user grouping fields so + # propagate_attributes can apply them around every observation + # under this trace_id (v4 has no explicit trace-create call). + self._trace_info[id] = { + "name": name, + "metadata": md, + "session_id": session_id, + "user_id": user_id, + } def update_trace( self, @@ -292,6 +302,8 @@ def _emit_trace_output_synthetic(self, trace_id: str, output: Any) -> None: propagate_attributes( trace_name=entry["name"], metadata=_stringify_metadata(entry["metadata"]), + session_id=entry.get("session_id"), + user_id=entry.get("user_id"), ) ) obs = cast( @@ -438,6 +450,8 @@ def _start_back_dated_generation( propagate_attributes( trace_name=trace_entry["name"], metadata=_stringify_metadata(trace_entry["metadata"]), + session_id=trace_entry.get("session_id"), + user_id=trace_entry.get("user_id"), ) ) stack.enter_context(otel_trace_api.use_span(remote_parent_span)) @@ -524,6 +538,8 @@ def _start_observation( propagate_attributes( trace_name=trace_entry["name"], metadata=_stringify_metadata(trace_entry["metadata"]), + session_id=trace_entry.get("session_id"), + user_id=trace_entry.get("user_id"), ) ) obs = cast("Any", self._client.start_observation(**kwargs)) diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 276498b..4a9d6ed 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -104,6 +104,12 @@ class LangfuseTrace: # invocation-boundary events; absent when no observer wrote them. input: Any | None = None output: Any | None = None + # Proposal 0064 §8.4.1: Langfuse's two cross-trace grouping fields. + # ``session_id`` groups traces sharing a session (Sessions dashboard); + # ``user_id`` populates the Users dimension. Each is unset (None) when + # its source is absent. + session_id: str | None = None + user_id: str | None = None observations: list[LangfuseObservation] = field(default_factory=list[LangfuseObservation]) def find_observation(self, observation_id: str) -> LangfuseObservation | None: @@ -170,12 +176,18 @@ def trace( id: str, name: str | None = None, metadata: dict[str, Any] | None = None, + session_id: str | None = None, + user_id: str | None = None, ) -> None: """Create a new Trace. The Trace `id` MUST be the OA invocation_id verbatim. Implementations track Traces internally; observation calls pass `trace_id` to associate. + + `session_id` / `user_id` (proposal 0064 §8.4.1) populate + Langfuse's cross-trace grouping fields (the Sessions / Users + dashboards); each is unset when its source is absent. """ # Spec §8.4.1: the Trace id is the OA invocation_id verbatim. ... @@ -368,11 +380,15 @@ def trace( id: str, name: str | None = None, metadata: dict[str, Any] | None = None, + session_id: str | None = None, + user_id: str | None = None, ) -> None: self.traces[id] = LangfuseTrace( id=id, name=name, metadata=dict(metadata) if metadata is not None else {}, + session_id=session_id, + user_id=user_id, ) def update_trace( diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 08b4a7e..be12b94 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -160,6 +160,16 @@ def _apply_caller_metadata(metadata: dict[str, Any], caller_metadata: Mapping[st metadata[key] = value +def _promoted_user_id(metadata: Mapping[str, Any]) -> str | None: + # Proposal 0064 §8.4.1: a recognized ``userId`` caller-metadata key + # promotes to Langfuse's first-class trace.userId (recognized, not + # reserved; automatic, not opt-in). Read from the already-merged trace + # metadata, so the promotion is additive -- the key also remains at + # trace.metadata.userId. Absent key -> None (trace.userId unset). + value = metadata.get("userId") + return str(value) if value is not None else None + + def _subgraph_identity_at(event: NodeEvent, depth: int) -> str: """Return the compiled-subgraph identity for the wrapper at the given 1-based namespace depth, or the empty string when no @@ -781,6 +791,24 @@ def _state_to_jsonable(state: Any) -> Any: return str(state) return str(state) + def _client_trace(self, *, id: str, name: str | None, metadata: dict[str, Any]) -> None: + # Proposal 0064 §8.4.1: every Trace open routes through here so the + # sessionId / userId promotions apply uniformly across the main, + # lazy, and detached trace-open sites. + # - trace.userId: promoted from the recognized ``userId`` caller + # key (already merged into ``metadata`` by _apply_caller_metadata). + # - trace.sessionId: sourced from openarmature.session_id (sessions + # capability, observability §5.6 / proposal 0020). python has no + # session_id source until 0020 lands, so it is unset (None) today; + # this is the single hook 0020 wires the source into. + self.client.trace( + id=id, + name=name, + metadata=metadata, + session_id=None, + user_id=_promoted_user_id(metadata), + ) + def _open_trace_lazy( self, invocation_id: str, @@ -805,7 +833,7 @@ def _open_trace_lazy( if correlation_id is not None: metadata["correlation_id"] = correlation_id _apply_caller_metadata(metadata, current_invocation_metadata()) - self.client.trace(id=invocation_id, name=entry_node, metadata=metadata) + self._client_trace(id=invocation_id, name=entry_node, metadata=metadata) self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) def _open_trace(self, invocation_id: str, correlation_id: str | None, event: NodeEvent) -> None: @@ -834,7 +862,7 @@ def _open_trace(self, invocation_id: str, correlation_id: str | None, event: Nod # The caller-supplied path lands in proposal 0034 (PR 4) — for # now only the fallback is wired. trace_name = entry_node - self.client.trace(id=invocation_id, name=trace_name, metadata=metadata) + self._client_trace(id=invocation_id, name=trace_name, metadata=metadata) self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) def _key_for(self, event: NodeEvent) -> _StackKey: @@ -1150,7 +1178,7 @@ def _open_detached_subgraph_trace( # ``subgraph_identity = "X"``, not every wrapper that # happens to be named ``X``. wrapper_obs_name = identity or prefix[-1] - self.client.trace(id=detached_trace_id, name=wrapper_obs_name, metadata=detached_metadata) + self._client_trace(id=detached_trace_id, name=wrapper_obs_name, metadata=detached_metadata) # §8.4.2 (proposal 0042): `detached: true` lives on the # PARENT-side dispatching observation (the link observation # above), not on the dispatch observation IN the detached @@ -1228,7 +1256,7 @@ def _open_detached_fan_out_instance_trace( if correlation_id is not None: detached_metadata["correlation_id"] = correlation_id _apply_caller_metadata(detached_metadata, event.caller_invocation_metadata) - self.client.trace( + self._client_trace( id=detached_trace_id, name=prefix[-1], metadata=detached_metadata, @@ -1662,7 +1690,7 @@ def _open_trace_for_typed_event( metadata["correlation_id"] = correlation_id if event.caller_invocation_metadata is not None: _apply_caller_metadata(metadata, event.caller_invocation_metadata) - self.client.trace(id=invocation_id, name=entry_node, metadata=metadata) + self._client_trace(id=invocation_id, name=entry_node, metadata=metadata) self._inv_states[invocation_id] = _InvState(trace_id=invocation_id) def _maybe_truncate_for_input(self, value: Any) -> Any: diff --git a/tests/conformance/harness/expectations.py b/tests/conformance/harness/expectations.py index fd682b9..fb0cec4 100644 --- a/tests/conformance/harness/expectations.py +++ b/tests/conformance/harness/expectations.py @@ -197,6 +197,11 @@ class ObservabilityExpected(_ForbidExtras): determinism_check: dict[str, Any] | None = None # Multi-invocation fixtures (009 cross-cutting, 011 determinism). invocation_count: int | None = None + # Langfuse Trace-level expectations (proposal 0064, fixture 084): the + # single-trace shape (sessionId / userId / metadata) and the + # multi-invocation grouping case's per-trace list. + langfuse_trace: dict[str, Any] | None = None + langfuse_traces: list[dict[str, Any]] | None = None # --------------------------------------------------------------------------- @@ -240,6 +245,9 @@ class ObservabilityExpected(_ForbidExtras): "no_llm_provider_span", "determinism_check", "invocation_count", + # proposal 0064 (fixture 084) Langfuse Trace-level expectations + "langfuse_trace", + "langfuse_traces", } ) diff --git a/tests/conformance/harness/fixtures.py b/tests/conformance/harness/fixtures.py index 311ffdf..4754fc7 100644 --- a/tests/conformance/harness/fixtures.py +++ b/tests/conformance/harness/fixtures.py @@ -134,7 +134,9 @@ class CaseSpec(BaseModel): caller_correlation_id: str | None = None # observability — mock LLM responses + per-case run config. mock_llm: list[MockResponse] | None = None - invocations: int | None = None + # ``int`` = a run-count (fixtures 009 / 011); ``list`` = per-invocation + # specs for a multi-invocation fixture (proposal 0064 fixture 084 case 5). + invocations: int | list[dict[str, Any]] | None = None # --------------------------------------------------------------------------- diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 06943e9..485576d 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -153,6 +153,11 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # (v0.61.0) resolves the detached-invocation-span shape case 2 # presupposed — the whole fixture was unwired pending that. "058-implementation-attribution-otel", + # v0.62.0 — proposal 0064 (Langfuse trace.sessionId / trace.userId + # population). Cases 2/3/4 (not session-bound + userId promotion) + # run; session-bound cases 1/5 defer until the sessions capability + # (0020) supplies openarmature.session_id. + "084-langfuse-session-user-promotion", } ) @@ -246,6 +251,8 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_056(spec) elif fixture_id == "058-implementation-attribution-otel": await _run_fixture_058(spec) + elif fixture_id == "084-langfuse-session-user-promotion": + await _run_fixture_084(spec) elif fixture_id in { "012-otel-llm-payload-default-off", "013-otel-llm-payload-enabled", @@ -2175,6 +2182,74 @@ async def delete(self, invocation_id: str) -> None: ) +# --------------------------------------------------------------------------- +# Fixture 084 — Langfuse session/user promotion (proposal 0064) +# --------------------------------------------------------------------------- + + +async def _run_fixture_084(spec: Mapping[str, Any]) -> None: + from openarmature.observability.langfuse import ( # noqa: PLC0415 + InMemoryLangfuseClient, + LangfuseObserver, + ) + + # Proposal 0064 §8.4.1. Cases 1 + 5 are session-bound: they supply + # session_id at invoke(), which needs the sessions capability + # (proposal 0020, §5.6) to surface openarmature.session_id. That is + # unimplemented in python until v0.19.0, so trace.sessionId has no + # source and these cases defer (per-case continue). Cases 2/3/4 (not + # session-bound + the userId promotion) run now. + _deferred_cases = { + "session_bound_sets_trace_session_id", + "multi_invocation_shared_session_groups", + } + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + case_name = cast("str", case["name"]) + if case_name in _deferred_cases: + continue + try: + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + trace: list[str] = [] + built = build_graph(case, trace=trace) + compiled = built.builder.compile() + compiled.attach_observer(observer) + initial_state = built.initial_state(case.get("initial_state", {})) + caller_metadata = cast("dict[str, Any] | None", case.get("caller_metadata")) + if caller_metadata is not None: + await compiled.invoke(initial_state, metadata=caller_metadata) + else: + await compiled.invoke(initial_state) + await compiled.drain() + observer.shutdown() + + assert len(client.traces) == 1, f"expected 1 trace, got {len(client.traces)}" + lf_trace = next(iter(client.traces.values())) + expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"]) + # trace.sessionId is unset for the runnable cases (no session + # source until 0020). + assert lf_trace.session_id == expected.get("sessionId"), ( + f"sessionId: got {lf_trace.session_id!r}, expected {expected.get('sessionId')!r}" + ) + # trace.userId: promoted from the userId caller key (case 3), + # unset otherwise (cases 2/4). + assert lf_trace.user_id == expected.get("userId"), ( + f"userId: got {lf_trace.user_id!r}, expected {expected.get('userId')!r}" + ) + # Additive promotion + unaffected metadata: every concrete + # (non-placeholder) expected metadata key also lands top-level. + expected_md = cast("dict[str, Any]", expected.get("metadata") or {}) + for key, val in expected_md.items(): + if isinstance(val, str) and val.startswith("<") and val.endswith(">"): + continue + assert lf_trace.metadata.get(key) == val, ( + f"metadata.{key}: got {lf_trace.metadata.get(key)!r}, expected {val!r}" + ) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + # --------------------------------------------------------------------------- # Fixture 010 — log correlation # diff --git a/tests/integration/test_langfuse_sdk_adapter.py b/tests/integration/test_langfuse_sdk_adapter.py index 1b449f2..6be3c4e 100644 --- a/tests/integration/test_langfuse_sdk_adapter.py +++ b/tests/integration/test_langfuse_sdk_adapter.py @@ -247,3 +247,50 @@ async def test_sdk_adapter_generation_timestamps_round_trip_through_langfuse() - # queues. Without this, a long-running pytest process could # accumulate background threads across integration tests. client.shutdown() + + +@pytest.mark.integration +async def test_sdk_adapter_populates_session_and_user_id_on_live_langfuse() -> None: + """End-to-end (proposal 0064 §8.4.1): trace(session_id=, user_id=) + populates the live Trace's sessionId / userId grouping fields. + + The observer leaves session_id dormant until the sessions capability + (proposal 0020) supplies openarmature.session_id, but the adapter + passes whatever it is given, so this exercises BOTH passthroughs at + the SDK boundary: when 0020 lands, the session_id rides the same + propagate_attributes path the userId promotion uses today. + """ + from langfuse import Langfuse + + from openarmature.observability.langfuse.adapter import LangfuseSDKAdapter + + client = Langfuse() + adapter = LangfuseSDKAdapter(client) + + invocation_id = str(uuid.uuid4()) + session_id = f"sess-{uuid.uuid4().hex[:8]}" + user_id = f"user-{uuid.uuid4().hex[:8]}" + + # session_id / user_id ride on the observations under the trace via + # propagate_attributes (the same carrier as name / metadata), so open + # one real observation for them to attach to. + adapter.trace( + id=invocation_id, + name="test_sdk_adapter_session_user", + metadata={"userId": user_id}, + session_id=session_id, + user_id=user_id, + ) + span_handle = adapter.span(trace_id=invocation_id, name="verify_entry") + span_handle.end() + + adapter.force_flush() + time.sleep(2) + + hex_id = invocation_id.replace("-", "") + try: + trace = _poll_trace_with_retry(client, hex_id) + assert trace.session_id == session_id, f"trace.session_id mismatch: got {trace.session_id!r}" + assert trace.user_id == user_id, f"trace.user_id mismatch: got {trace.user_id!r}" + finally: + client.shutdown() diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 9e18efd..e678ac7 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.14.0" - assert openarmature.__spec_version__ == "0.61.0" + assert openarmature.__spec_version__ == "0.62.0" def test_spec_version_matches_pyproject() -> None: diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 44a320f..0372ca0 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -51,6 +51,35 @@ def test_in_memory_recorder_trace_create_then_update() -> None: assert trace.metadata == {"correlation_id": "c1", "extra": "value"} +def test_in_memory_trace_records_session_and_user_id() -> None: + # Proposal 0064 §8.4.1: trace(session_id=, user_id=) populates the two + # Langfuse cross-trace grouping fields. This exercises the session_id + # plumbing the observer leaves dormant until 0020 (so the deferred + # fixture-084 session cases are still covered at the client layer). + client = InMemoryLangfuseClient() + client.trace(id="t1", name="a", metadata={"userId": "u-7"}, session_id="sess-9", user_id="u-7") + trace = client.traces["t1"] + assert trace.session_id == "sess-9" + assert trace.user_id == "u-7" + # Additive: userId also remains in the metadata bag. + assert trace.metadata["userId"] == "u-7" + # Both default to None when not supplied. + client.trace(id="t2", name="b", metadata={}) + assert client.traces["t2"].session_id is None + assert client.traces["t2"].user_id is None + + +def test_promoted_user_id_recognizes_userid_key() -> None: + # Proposal 0064 §8.4.1: the userId promotion reads a recognized key, + # coerces to str, and is None when absent. + from openarmature.observability.langfuse.observer import _promoted_user_id + + assert _promoted_user_id({"userId": "u-1"}) == "u-1" + assert _promoted_user_id({"userId": 42}) == "42" + assert _promoted_user_id({"tenantId": "acme"}) is None + assert _promoted_user_id({}) is None + + def test_in_memory_recorder_span_handle_update_and_end() -> None: client = InMemoryLangfuseClient() client.trace(id="t1") diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index 6f5c452..0d567ee 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -337,7 +337,10 @@ async def test_adapter_against_real_langfuse_cloud() -> None: .compile() ) graph.attach_observer(observer) - await graph.invoke(_S()) + # metadata={"userId": ...} exercises the proposal 0064 promotion end + # to end: the observer lifts the recognized userId caller key to the + # live Trace's first-class userId field (the Users dashboard). + await graph.invoke(_S(), metadata={"userId": "flight-controller-gene"}) await graph.drain() observer.shutdown() # Use ``client.shutdown()`` rather than ``client.flush()`` here: @@ -348,7 +351,9 @@ async def test_adapter_against_real_langfuse_cloud() -> None: # to drain without releasing SDK resources. client.shutdown() # Manual check: open the trace in the dashboard and confirm - # "step_a" + "step_b" appear as Span observations under one Trace. + # "step_a" + "step_b" appear as Span observations under one Trace, + # and that the Trace's userId field reads "flight-controller-gene" + # (the proposal 0064 promotion from the userId caller-metadata key). # The trace_id in the dashboard is the 32-char hex form (no dashes) # of OA's UUID4 invocation_id; strip dashes from any logged # correlation_id / invocation_id to find it.