Skip to content

Commit b2d22b1

Browse files
Emit trace.input/output via Langfuse SDK adapter (#100)
* Emit trace.input/output via Langfuse SDK adapter PR #99 (proposal 0043) shipped the Langfuse observer's three-lever decision tree but left the SDK adapter's `update_trace(input=..., output=...)` as a no-op — only the InMemoryLangfuseClient applied the values. Production users of `LangfuseSDKAdapter` saw blank `Input` / `Output` columns in the Langfuse Traces list view despite the observer emitting the values. Wire the adapter to apply both via the v4 SDK's `set_trace_io`: - `update_trace(input=...)` caches `pending_input` in `_trace_info`. The next `_start_observation` for that trace pops the cache and calls `obs.set_trace_io(input=cached)` on the just-created observation. Piggybacks on a real span; no extra observations added in the common case. - `update_trace(output=...)` opens a synthetic short-lived `openarmature.trace_io` observation as the carrier for `set_trace_io(output=...)`. By the time the `InvocationCompletedEvent` reaches the observer all real node spans have ended, so a synthetic span is the only path with an active OTel span context. - Edge case: an invocation that fails before any node fires has no real span. The synthetic output observation also applies the cached pending_input, so both fields still land. The Langfuse v4 SDK marks `set_trace_io` deprecated ("removal in a future major version"). Empirical verification against Langfuse Cloud v4.7.1 confirms it remains the only path that surfaces `trace.input` / `trace.output` on the Traces list view headline columns; `propagate_attributes(metadata=...)` writes the values into the metadata bag but the UI does not project them as headline columns from there. Documented in CHANGELOG; will revisit when Langfuse publishes a v5 migration path. Adds two integration tests (`tests/integration/`) gated by `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY`. Both pass against Langfuse Cloud end-to-end (real-obs + synthetic-only paths). * Mark live-Langfuse tests as integration PR #100 review caught a gap: the integration tests gated only on env-var presence are still picked up by `pytest tests/` when a developer has `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY` in scope locally. The default `pyproject.toml` config excludes `@pytest.mark.integration` via `addopts = ["-m", "not integration"]` but not unmarked tests in a separate directory. Add the marker to both tests so they match the existing precedent at `tests/unit/test_observability_langfuse_adapter.py:177` and stay out of the default test run regardless of credential availability.
1 parent 5a586c8 commit b2d22b1

4 files changed

Lines changed: 227 additions & 26 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
2828
### Notes
2929

3030
- **Pinned spec version bumped from v0.31.0 to v0.35.0.** Absorbs proposals 0042 (reserved-key extension), 0043 (Langfuse trace.input/output sourcing), and the textual additions in v0.32.0 (Gemini wire-format mapping, 0038, not yet implemented) and v0.33.0 (sessions capability, 0020, not yet implemented).
31-
- The SDK adapter caches `input` / `output` in its `_trace_info` map; landing the values on the live Langfuse Trace from outside an active span context requires SDK-version-specific calls (v4's `langfuse.update_current_trace` works inside a context; cross-context REST updates need `client.api.trace.update`). The `InMemoryLangfuseClient` used by tests applies the fields directly. SDK-adapter end-to-end emit lands in a follow-up.
31+
- `LangfuseSDKAdapter` now applies `trace.input` / `trace.output` to the live Langfuse Trace. Input lands on the first real observation under the trace via `set_trace_io`; output uses a synthetic short-lived `openarmature.trace_io` observation as the carrier. The InMemoryLangfuseClient used by tests applies the fields directly.
32+
- The Langfuse v4 SDK marks `set_current_trace_io` / `Span.set_trace_io` deprecated ("removal in a future major version"). Empirical verification against Langfuse Cloud (v4.7.1, 2026-05-29) confirms it remains the **only** path that populates the Traces list view's headline `Input` / `Output` columns; `propagate_attributes(metadata=...)` does not substitute for it in the current UI. We will revisit when Langfuse publishes a concrete migration guide for v5.
3233

3334
## [0.10.0] — 2026-05-27
3435

src/openarmature/observability/langfuse/adapter.py

Lines changed: 85 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -226,38 +226,80 @@ def update_trace(
226226
) -> None:
227227
# Merge into the trace_info cache so subsequent observations
228228
# (and the first one if not yet created) pick up the updated
229-
# values. Since propagate_attributes runs on every observation
230-
# using cached info, update_trace takes effect on the NEXT
231-
# observation under this trace_id, not retroactively on prior
232-
# observations.
229+
# values. ``name`` / ``metadata`` are propagated via
230+
# ``propagate_attributes`` around every observation under
231+
# ``id``; ``input`` / ``output`` follow the SDK's
232+
# ``set_trace_io`` path (per proposal 0043 + the
233+
# empirically-validated v4.7.1 behaviour — see CHANGELOG).
233234
#
234-
# Proposal 0043 ``input`` / ``output`` are cached but landing
235-
# them on the live Langfuse Trace from outside an active
236-
# span context is SDK-version-dependent (v4 exposes
237-
# ``langfuse.update_current_trace(input=..., output=...)``
238-
# only inside a context; cross-context REST updates need
239-
# ``client.api.trace.update``). The InMemoryLangfuseClient
240-
# surface used by tests applies them directly. The SDK
241-
# adapter's apply path is a follow-up — caching here so the
242-
# Protocol contract is satisfied without breaking SDK-adapter
243-
# users.
235+
# ``input`` is staged on the cache; applied to the FIRST real
236+
# observation that opens under this trace_id (``_start_observation``
237+
# below). Piggybacks on a real span so the trace tree gains no
238+
# extra observation in the common case.
239+
#
240+
# ``output`` is applied immediately via a synthetic short-lived
241+
# observation. By the time the LangfuseObserver dispatches the
242+
# invocation-completed event all real spans have ended, so a
243+
# synthetic span is the only path that has an active OTel span
244+
# context for ``set_trace_io`` to find.
244245
entry = self._trace_info.get(id)
245246
if entry is None:
246-
self._trace_info[id] = {
247+
entry = {
247248
"name": name,
248249
"metadata": dict(metadata) if metadata is not None else {},
249-
"input": input,
250-
"output": output,
251250
}
252-
return
253-
if name is not None:
254-
entry["name"] = name
255-
if metadata is not None:
256-
entry["metadata"].update(metadata)
251+
self._trace_info[id] = entry
252+
else:
253+
if name is not None:
254+
entry["name"] = name
255+
if metadata is not None:
256+
entry["metadata"].update(metadata)
257257
if input is not None:
258-
entry["input"] = input
258+
entry["pending_input"] = input
259259
if output is not None:
260-
entry["output"] = output
260+
self._emit_trace_output_synthetic(id, output)
261+
262+
def _emit_trace_output_synthetic(self, trace_id: str, output: Any) -> None:
263+
# Open a synthetic short-lived observation, set
264+
# ``trace.output`` on it via ``set_trace_io``, end immediately.
265+
# The synthetic span shows in the trace as a small observation
266+
# named ``openarmature.trace_io``; the value lands on the
267+
# Langfuse Trace's ``output`` headline field through the
268+
# ``langfuse.trace.output`` OTel attribute set inside.
269+
#
270+
# Edge case: if no real node observation ever opened for this
271+
# trace (e.g., a resume-path validation failure aborted the
272+
# invocation before any node fired), the cached ``pending_input``
273+
# has no real span to piggyback on. Apply it here so the input
274+
# still lands — the synthetic observation becomes the sole
275+
# carrier for both fields. Pops the cache so we don't re-apply
276+
# if ``update_trace`` is called more than once.
277+
entry = self._trace_info.get(trace_id)
278+
pending_input = entry.pop("pending_input", None) if entry is not None else None
279+
280+
trace_context: TraceContext = {"trace_id": _to_otel_trace_id(trace_id)}
281+
with ExitStack() as stack:
282+
if entry is not None:
283+
stack.enter_context(
284+
propagate_attributes(
285+
trace_name=entry["name"],
286+
metadata=_stringify_metadata(entry["metadata"]),
287+
)
288+
)
289+
obs = cast(
290+
"Any",
291+
self._client.start_observation(
292+
name="openarmature.trace_io",
293+
as_type="span",
294+
trace_context=trace_context,
295+
),
296+
)
297+
try:
298+
# Deprecation rationale on the equivalent call in
299+
# ``_start_observation``.
300+
obs.set_trace_io(input=pending_input, output=output) # pyright: ignore[reportDeprecated]
301+
finally:
302+
obs.end()
261303

262304
def span(
263305
self,
@@ -395,7 +437,25 @@ def _start_observation(
395437
metadata=_stringify_metadata(trace_entry["metadata"]),
396438
)
397439
)
398-
return cast("Any", self._client.start_observation(**kwargs))
440+
obs = cast("Any", self._client.start_observation(**kwargs))
441+
# Proposal 0043 (PR 8.5a): apply any pending ``trace.input``
442+
# cached by ``update_trace`` to the FIRST real observation
443+
# under this trace. ``set_trace_io`` needs an active OTel
444+
# span context — piggybacking on the just-created
445+
# observation is the lowest-overhead path. ``pop`` so
446+
# subsequent observations under the same trace_id don't
447+
# re-apply (the value is one-shot per trace).
448+
#
449+
# The Langfuse SDK marks ``set_trace_io`` deprecated as of
450+
# v4.6 ("removal in a future major version"); per the
451+
# empirical verification in PR 8.5a it remains the only
452+
# path that surfaces ``trace.input`` in the Langfuse UI's
453+
# Traces list view. See CHANGELOG for the deprecation note.
454+
if trace_entry is not None:
455+
pending_input = trace_entry.pop("pending_input", None)
456+
if pending_input is not None:
457+
obs.set_trace_io(input=pending_input) # pyright: ignore[reportDeprecated]
458+
return obs
399459

400460

401461
__all__ = ["LangfuseSDKAdapter"]

tests/integration/__init__.py

Whitespace-only changes.
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""Integration tests for LangfuseSDKAdapter against the live Langfuse
2+
test account.
3+
4+
Gated by the presence of ``LANGFUSE_PUBLIC_KEY`` / ``LANGFUSE_SECRET_KEY``
5+
env vars. Skipped in CI and local runs that don't have credentials in
6+
scope; runs end-to-end against Langfuse Cloud when invoked from a
7+
shell with credentials (the documented test-account env vars per
8+
[[reference_langfuse_test_account.md]]).
9+
10+
Each test polls the REST API after ``flush()`` with retries to absorb
11+
the eventual-consistency lag between ingestion and the REST projection.
12+
"""
13+
14+
from __future__ import annotations
15+
16+
import os
17+
import time
18+
import uuid
19+
from typing import Any
20+
21+
import pytest
22+
23+
# Skip the entire module when credentials aren't sourced. Avoids a
24+
# cryptic ``ImportError`` / ``ValueError`` cascade from the SDK when
25+
# the test environment is bare.
26+
pytestmark = pytest.mark.skipif(
27+
not (os.environ.get("LANGFUSE_PUBLIC_KEY") and os.environ.get("LANGFUSE_SECRET_KEY")),
28+
reason="Requires LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY (live Langfuse account)",
29+
)
30+
31+
32+
def _poll_trace_with_retry(client: Any, hex_id: str, *, attempts: int = 12, sleep_s: float = 5.0) -> Any:
33+
"""Poll Langfuse's REST API until the trace appears or the budget
34+
runs out. The Langfuse list-view UI updates faster than the REST
35+
GET projection, so a freshly-flushed trace can 404 for ~30-60s.
36+
Linear backoff is fine; the API is rate-limited gently and the
37+
test runs once per CI invocation."""
38+
from langfuse.api import NotFoundError
39+
40+
last_exc: Exception | None = None
41+
for _ in range(attempts):
42+
try:
43+
return client.api.trace.get(hex_id)
44+
except NotFoundError as exc:
45+
last_exc = exc
46+
time.sleep(sleep_s)
47+
raise AssertionError(
48+
f"Trace {hex_id} did not appear in REST API after {attempts * sleep_s:.0f}s; last error: {last_exc!r}"
49+
)
50+
51+
52+
@pytest.mark.integration
53+
async def test_sdk_adapter_emits_trace_input_output_to_live_langfuse() -> None:
54+
"""End-to-end: open a trace via LangfuseSDKAdapter, push input via
55+
update_trace at the start, push output via update_trace at the end,
56+
flush, and confirm both fields populate on the live Trace entity."""
57+
from langfuse import Langfuse
58+
59+
from openarmature.observability.langfuse.adapter import LangfuseSDKAdapter
60+
61+
client = Langfuse()
62+
adapter = LangfuseSDKAdapter(client)
63+
64+
invocation_id = str(uuid.uuid4())
65+
expected_input = {"entry_node": "verify_entry", "correlation_id": "test-corr-1"}
66+
expected_output = {"final_node": "verify_entry", "status": "completed"}
67+
68+
# Simulate the LangfuseObserver call sequence: trace open →
69+
# InvocationStartedEvent (update_trace with input) → first node
70+
# span → node ends → InvocationCompletedEvent (update_trace with
71+
# output).
72+
adapter.trace(
73+
id=invocation_id,
74+
name="test_sdk_adapter_trace_io",
75+
metadata={"test_run": "trace_io_emit"},
76+
)
77+
adapter.update_trace(id=invocation_id, input=expected_input)
78+
# Open + close a real observation so the cached pending_input has
79+
# something to piggyback on.
80+
span_handle = adapter.span(trace_id=invocation_id, name="verify_entry")
81+
span_handle.end()
82+
adapter.update_trace(id=invocation_id, output=expected_output)
83+
84+
adapter.force_flush()
85+
# Brief settle for the UI projection; REST poll handles the
86+
# longer-tail consistency window separately.
87+
time.sleep(2)
88+
89+
hex_id = invocation_id.replace("-", "")
90+
trace = _poll_trace_with_retry(client, hex_id)
91+
92+
# `trace.input` / `trace.output` are the headline columns proposal
93+
# 0043 motivates; assert both ingested correctly. These are the
94+
# spec-compliance signal — they project off OTel attributes on
95+
# incoming spans and populate on the Trace entity directly.
96+
assert trace.input == expected_input, f"trace.input mismatch: got {trace.input!r}"
97+
assert trace.output == expected_output, f"trace.output mismatch: got {trace.output!r}"
98+
# Note: we deliberately do NOT assert on ``trace.observations``
99+
# here. Langfuse's REST projection for the observations list
100+
# lags the Trace's headline fields by an indeterminate window —
101+
# the two tests in this module hit different consistency points
102+
# against the same backend. The synthetic ``openarmature.trace_io``
103+
# observation is verified in the next test (which uses ONLY the
104+
# synthetic carrier and reliably shows in the list).
105+
106+
107+
@pytest.mark.integration
108+
async def test_sdk_adapter_handles_invocation_with_no_real_observation() -> None:
109+
"""Edge case: invocation fails before any node observation opens
110+
(resume-path validation failure, etc.). The cached pending_input
111+
has no real span to piggyback on, so the synthetic output
112+
observation becomes the sole carrier for BOTH fields."""
113+
from langfuse import Langfuse
114+
115+
from openarmature.observability.langfuse.adapter import LangfuseSDKAdapter
116+
117+
client = Langfuse()
118+
adapter = LangfuseSDKAdapter(client)
119+
120+
invocation_id = str(uuid.uuid4())
121+
expected_input = {"entry_node": "fail_fast", "correlation_id": "test-corr-2"}
122+
expected_output = {"final_node": "fail_fast", "status": "failed"}
123+
124+
adapter.trace(id=invocation_id, name="test_sdk_adapter_no_real_span")
125+
adapter.update_trace(id=invocation_id, input=expected_input)
126+
# NO real span opens — straight to the output update.
127+
adapter.update_trace(id=invocation_id, output=expected_output)
128+
129+
adapter.force_flush()
130+
time.sleep(2)
131+
132+
hex_id = invocation_id.replace("-", "")
133+
trace = _poll_trace_with_retry(client, hex_id)
134+
135+
# Both input and output land on the Trace even with the synthetic
136+
# observation as the sole span.
137+
assert trace.input == expected_input
138+
assert trace.output == expected_output
139+
assert len(trace.observations) == 1
140+
assert trace.observations[0].name == "openarmature.trace_io"

0 commit comments

Comments
 (0)