-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_observability_langfuse_adapter.py
More file actions
207 lines (166 loc) · 8.41 KB
/
Copy pathtest_observability_langfuse_adapter.py
File metadata and controls
207 lines (166 loc) · 8.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""Unit + integration tests for LangfuseSDKAdapter against langfuse>=4.6.
The unit test instantiates a real ``langfuse.Langfuse`` client with
dummy credentials and verifies the adapter satisfies the
:class:`LangfuseClient` Protocol via runtime ``isinstance`` — no
network calls. Skipped when the ``[langfuse]`` extra isn't installed.
The integration test, gated by ``@pytest.mark.integration`` plus
``LANGFUSE_PUBLIC_KEY`` / ``LANGFUSE_SECRET_KEY`` env vars, runs a
small graph end-to-end against real Langfuse Cloud. Use::
LANGFUSE_PUBLIC_KEY=pk-lf-... \\
LANGFUSE_SECRET_KEY=sk-lf-... \\
LANGFUSE_BASE_URL=https://cloud.langfuse.com \\
uv run pytest tests/unit/test_observability_langfuse_adapter.py \\
-m integration -v
CI does NOT run integration tests; they're opt-in for local
verification.
"""
from __future__ import annotations
import os
from typing import Annotated, Any
import pytest
# Skip the whole module if langfuse isn't installed (extras not present).
pytest.importorskip("langfuse")
from langfuse import Langfuse # noqa: E402
from openarmature.graph import END, GraphBuilder, State, append # noqa: E402
from openarmature.observability.langfuse import ( # noqa: E402
LangfuseClient,
LangfuseObserver,
LangfuseSDKAdapter,
)
def _dummy_client() -> Langfuse:
# langfuse 4.x's Langfuse() constructor accepts credentials via env
# vars or kwargs. Dummy keys bypass auth_check (which is called
# opportunistically) — the adapter only needs the methods present
# on the constructed instance, not a working API connection.
return Langfuse(
public_key="pk-lf-test",
secret_key="sk-lf-test",
host="http://localhost:0", # unreachable; we don't make calls in unit tests
)
def test_adapter_satisfies_langfuse_client_protocol() -> None:
# Structural typing: the adapter MUST satisfy LangfuseClient at
# runtime so LangfuseObserver accepts it. This is the load-bearing
# test for the [langfuse] extras pin — if a future SDK release
# breaks the Protocol's surface, this fails loudly.
adapter = LangfuseSDKAdapter(_dummy_client())
assert isinstance(adapter, LangfuseClient)
def test_adapter_observer_construction() -> None:
# End-to-end: the observer accepts the adapter as its client
# (Protocol satisfaction proves out at instantiation time under
# the LangfuseClient annotation).
adapter = LangfuseSDKAdapter(_dummy_client())
observer = LangfuseObserver(client=adapter)
assert observer.client is adapter
def test_adapter_caches_trace_info() -> None:
# The trace() call doesn't hit the SDK; it caches info that
# propagate_attributes applies on every observation under that
# trace_id (not just the first — v4's last-wins display logic
# would otherwise let later observations clobber the trace name).
adapter = LangfuseSDKAdapter(_dummy_client())
adapter.trace(id="trace-1", name="my-trace", metadata={"correlation_id": "c-1"})
assert "trace-1" in adapter._trace_info # noqa: SLF001
cached = adapter._trace_info["trace-1"] # noqa: SLF001
assert cached["name"] == "my-trace"
assert cached["metadata"] == {"correlation_id": "c-1"}
def test_adapter_converts_uuid_trace_id_to_otel_hex() -> None:
# Langfuse v4 expects OTel-format trace IDs (32-char lowercase
# hex, no dashes). OA's invocation_id is a UUID4 with dashes.
# The adapter MUST convert before passing to TraceContext, or
# the SDK fails with ValueError("invalid literal for int() with
# base 16: 'uuid-with-dashes'") at the OTel-attribute layer
# — which OA's observer-error-isolation pattern swallows as a
# warnings.warn, leaving the trace invisibly broken.
from openarmature.observability.langfuse.adapter import _to_otel_trace_id
assert _to_otel_trace_id("b24eda93-d06d-4eaa-9891-ca5e56f35722") == "b24eda93d06d4eaa9891ca5e56f35722"
# Idempotent on already-hex input.
assert _to_otel_trace_id("b24eda93d06d4eaa9891ca5e56f35722") == "b24eda93d06d4eaa9891ca5e56f35722"
# Non-UUID inputs pass through unchanged (consumers passing an
# already-OTel-formatted trace_id from elsewhere don't get
# mangled).
assert _to_otel_trace_id("custom-trace-id") == "custom-trace-id"
def test_adapter_update_trace_merges_into_cache() -> None:
# update_trace merges into the cache so subsequent observations
# under this trace_id pick up the new values via propagate_attributes.
adapter = LangfuseSDKAdapter(_dummy_client())
adapter.trace(id="trace-1", name="initial", metadata={"key1": "v1"})
adapter.update_trace(id="trace-1", name="renamed", metadata={"key2": "v2"})
cached = adapter._trace_info["trace-1"] # noqa: SLF001
assert cached["name"] == "renamed"
assert cached["metadata"] == {"key1": "v1", "key2": "v2"}
def test_adapter_force_flush_delegates_to_client() -> None:
# force_flush() must invoke the wrapped SDK's flush() so callers
# in fast-teardown harnesses get the SDK's internal drain
# (OTel TracerProvider.force_flush + score/media queue joins).
# Mock(wraps=...) lets us assert delegation without simulating
# the SDK's full surface.
from unittest.mock import Mock
real_client = _dummy_client()
wrapped = Mock(wraps=real_client)
adapter = LangfuseSDKAdapter(wrapped)
assert adapter.force_flush() is True
wrapped.flush.assert_called_once_with()
# timeout_ms is accepted but unused per the documented contract.
assert adapter.force_flush(timeout_ms=1_000) is True
assert wrapped.flush.call_count == 2
# ---------------------------------------------------------------------------
# Integration test against real Langfuse Cloud (opt-in)
# ---------------------------------------------------------------------------
class _S(State):
trail: Annotated[list[str], append] = []
async def _node(name: str) -> Any:
return {"trail": [name]}
@pytest.mark.integration
async def test_adapter_against_real_langfuse_cloud() -> None:
# Validates that the adapter actually exchanges data with Langfuse
# Cloud — instantiates the real SDK, runs a tiny graph through
# LangfuseObserver, calls flush(). No assertions on the
# remote-side ingest (which is async). Manually verify via the
# Langfuse dashboard that the trace appears with the expected
# observation tree.
public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
if not public_key or not secret_key:
pytest.skip("LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY not set")
# Mirror the SDK's precedence: Langfuse() reads LANGFUSE_BASE_URL
# first, then LANGFUSE_HOST. Resolve the same order here so this
# explicit host matches what a no-arg Langfuse() would pick up.
host = (
os.environ.get("LANGFUSE_BASE_URL") or os.environ.get("LANGFUSE_HOST") or "https://cloud.langfuse.com"
)
client = Langfuse(
public_key=public_key,
secret_key=secret_key,
host=host,
)
# Fail loudly on bad credentials. Without this, a 401 from the
# background export thread is just a logged warning and the test
# passes while traces vanish.
assert client.auth_check(), (
"Langfuse auth_check failed — verify LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY / LANGFUSE_BASE_URL"
)
observer = LangfuseObserver(client=LangfuseSDKAdapter(client))
graph = (
GraphBuilder(_S)
.add_node("step_a", lambda _s: _node("step_a"))
.add_node("step_b", lambda _s: _node("step_b"))
.add_edge("step_a", "step_b")
.add_edge("step_b", END)
.set_entry("step_a")
.compile()
)
graph.attach_observer(observer)
await graph.invoke(_S())
await graph.drain()
observer.shutdown()
# Use ``client.shutdown()`` rather than ``client.flush()`` here:
# both block on the SDK's internal drain (OTel's force_flush plus
# the score/media queue joins), but shutdown() also tears down
# the resource manager so the test process exits cleanly. flush()
# is the appropriate call from a long-lived process that wants
# to drain without releasing SDK resources.
client.shutdown()
# Manual check: open the trace in the dashboard and confirm
# "step_a" + "step_b" appear as Span observations under one Trace.
# The trace_id in the dashboard is the 32-char hex form (no dashes)
# of OA's UUID4 invocation_id; strip dashes from any logged
# correlation_id / invocation_id to find it.