-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_observability_langfuse.py
More file actions
369 lines (302 loc) · 13.3 KB
/
Copy pathtest_observability_langfuse.py
File metadata and controls
369 lines (302 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
"""Focused unit tests for the LangfuseObserver and InMemoryLangfuseClient.
The conformance suite (``tests/conformance/test_observability_langfuse.py``)
exercises the end-to-end Trace + Observation shape against
spec/observability/conformance/022-024. These unit tests fill gaps
those fixtures don't isolate directly: payload-cap validation,
truncation algorithm boundaries, in-memory recorder field handling,
and the synthetic-dispatch-observation paths (subgraph, fan-out
non-detached, detached subgraph, detached fan-out) that no Langfuse
spec fixture exercises today.
"""
from __future__ import annotations
from typing import Annotated, Any, cast
import pytest
from openarmature.graph import END, GraphBuilder, State, append
from openarmature.observability.langfuse import (
InMemoryLangfuseClient,
LangfuseObservation,
LangfuseObserver,
LangfuseTrace,
LangfuseUsage,
)
def test_observer_payload_cap_below_minimum_rejected() -> None:
# §5.5.5 minimum-cap mirror — 255 sits one byte below the spec
# minimum and MUST be rejected at construction time.
client = InMemoryLangfuseClient()
with pytest.raises(ValueError, match="below the spec §5.5.5 minimum"):
LangfuseObserver(client=client, payload_byte_cap=255)
def test_observer_payload_cap_at_minimum_accepted() -> None:
client = InMemoryLangfuseClient()
observer = LangfuseObserver(client=client, payload_byte_cap=256)
assert observer.payload_byte_cap == 256
def test_in_memory_recorder_trace_create_then_update() -> None:
client = InMemoryLangfuseClient()
client.trace(id="t1", name="initial", metadata={"correlation_id": "c1"})
client.update_trace(id="t1", name="renamed", metadata={"extra": "value"})
trace = client.traces["t1"]
assert trace.id == "t1"
assert trace.name == "renamed"
assert trace.metadata == {"correlation_id": "c1", "extra": "value"}
def test_in_memory_recorder_span_handle_update_and_end() -> None:
client = InMemoryLangfuseClient()
client.trace(id="t1")
handle = client.span(trace_id="t1", name="step", metadata={"k": 1})
handle.update(metadata={"extra": "v"})
handle.end(level="ERROR", status_message="failed")
trace = client.traces["t1"]
assert len(trace.observations) == 1
obs = trace.observations[0]
assert obs.name == "step"
assert obs.ended is True
assert obs.level == "ERROR"
assert obs.status_message == "failed"
assert obs.metadata == {"k": 1, "extra": "v"}
def test_in_memory_recorder_generation_captures_native_fields() -> None:
client = InMemoryLangfuseClient()
client.trace(id="t1")
handle = client.generation(
trace_id="t1",
name="openarmature.llm.complete",
model="test-model",
model_parameters={"temperature": 0.7},
input=[{"role": "user", "content": "hi"}],
output="hello back",
usage=LangfuseUsage(input=5, output=2, total=7),
prompt="lf-prompt-ref-1",
)
handle.end(metadata={"finish_reason": "stop"})
trace = client.traces["t1"]
assert len(trace.observations) == 1
obs = trace.observations[0]
assert obs.type == "generation"
assert obs.model == "test-model"
assert obs.model_parameters == {"temperature": 0.7}
assert obs.input == [{"role": "user", "content": "hi"}]
assert obs.output == "hello back"
assert obs.usage is not None
assert obs.usage.input == 5
assert obs.usage.output == 2
assert obs.usage.total == 7
assert obs.prompt_entity_link == "lf-prompt-ref-1"
assert obs.metadata == {"finish_reason": "stop"}
def test_in_memory_recorder_observation_id_is_unique_per_recorder() -> None:
client = InMemoryLangfuseClient()
client.trace(id="t1")
a = client.span(trace_id="t1", name="a")
b = client.span(trace_id="t1", name="b")
assert a.id != b.id
def test_observer_force_flush_delegates_to_client() -> None:
# LangfuseObserver.force_flush() calls into the client; the
# InMemoryLangfuseClient's force_flush is a no-op that returns
# True, so this just verifies the delegation wires correctly.
client = InMemoryLangfuseClient()
observer = LangfuseObserver(client=client)
assert observer.force_flush() is True
assert observer.force_flush(timeout_ms=1000) is True
def test_in_memory_recorder_force_flush_is_no_op() -> None:
# In-memory recorder has no outbound buffer; force_flush returns
# True immediately. The timeout_ms parameter is accepted for
# Protocol compatibility but unused.
client = InMemoryLangfuseClient()
assert client.force_flush() is True
assert client.force_flush(timeout_ms=5000) is True
def test_in_memory_recorder_children_of_walks_parent_links() -> None:
client = InMemoryLangfuseClient()
client.trace(id="t1")
root = client.span(trace_id="t1", name="root")
child = client.span(trace_id="t1", name="child", parent_observation_id=root.id)
other = client.span(trace_id="t1", name="other")
trace = client.traces["t1"]
top_level = trace.children_of(None)
assert {o.name for o in top_level} == {"root", "other"}
root_children = trace.children_of(root.id)
assert [o.name for o in root_children] == ["child"]
# Unrelated observation not under root.
assert child.id != other.id
# ---------------------------------------------------------------------------
# Dispatch synthesis (PR 3.5) — subgraph, fan-out non-detached, detached
# ---------------------------------------------------------------------------
# The Langfuse mapping has no spec fixtures for subgraph dispatch /
# fan-out per-instance / detached-trace mode (spec proposal 0031's
# 022-024 only exercise linear graphs + LLM + prompt linkage). These
# tests pin the synthesis-helper behavior locally so future changes
# don't silently break parenting under composition.
class _S(State):
trail: Annotated[list[str], append] = []
worker_results: Annotated[list[str], append] = []
class _WorkerState(State):
result: str = ""
async def _record(name: str) -> Any:
return {"trail": [name]}
def _attach(graph: Any) -> tuple[Any, InMemoryLangfuseClient, LangfuseObserver]:
client = InMemoryLangfuseClient()
observer = LangfuseObserver(client=client)
graph.attach_observer(observer)
return graph, client, observer
def _attach_with_detached(
graph: Any,
*,
detached_subgraphs: frozenset[str] = frozenset(),
detached_fan_outs: frozenset[str] = frozenset(),
) -> tuple[Any, InMemoryLangfuseClient, LangfuseObserver]:
client = InMemoryLangfuseClient()
observer = LangfuseObserver(
client=client,
detached_subgraphs=detached_subgraphs,
detached_fan_outs=detached_fan_outs,
)
graph.attach_observer(observer)
return graph, client, observer
def _find_observation(trace: LangfuseTrace, name: str) -> LangfuseObservation:
for obs in trace.observations:
if obs.name == name:
return obs
raise AssertionError(f"observation {name!r} not in trace {trace.id!r}")
async def test_subgraph_dispatch_observation_parents_inner_node() -> None:
inner = (
GraphBuilder(_S)
.add_node("inner_a", lambda _s: _record("inner_a"))
.add_edge("inner_a", END)
.set_entry("inner_a")
.compile()
)
parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile()
graph, client, _ = _attach(parent)
await graph.invoke(_S())
await graph.drain()
trace = next(iter(client.traces.values()))
sub_dispatch = _find_observation(trace, "sub")
inner_node = _find_observation(trace, "inner_a")
# inner_a must parent under the synthesized subgraph dispatch
# observation, not directly under the Trace.
assert inner_node.parent_observation_id == sub_dispatch.id
# The subgraph dispatch lives at the top level of the Trace.
assert sub_dispatch.parent_observation_id is None
async def test_fan_out_non_detached_per_instance_dispatch() -> None:
async def _worker(_s: _WorkerState) -> Any:
return {"result": "done"}
inner = (
GraphBuilder(_WorkerState)
.add_node("worker", _worker)
.add_edge("worker", END)
.set_entry("worker")
.compile()
)
parent = (
GraphBuilder(_S)
.add_fan_out_node(
"fan",
subgraph=inner,
count=2,
collect_field="result",
target_field="worker_results",
)
.add_edge("fan", END)
.set_entry("fan")
.compile()
)
graph, client, _ = _attach(parent)
await graph.invoke(_S())
await graph.drain()
trace = next(iter(client.traces.values()))
fan_node = _find_observation(trace, "fan")
# Per-instance dispatch observations share the fan-out node name.
dispatches = [o for o in trace.observations if o.name == "fan" and o.parent_observation_id == fan_node.id]
assert len(dispatches) == 2, f"expected 2 per-instance dispatches, got {len(dispatches)}"
# Each per-instance dispatch carries the fan_out_index in metadata.
indices = {d.metadata.get("fan_out_index") for d in dispatches}
assert indices == {0, 1}
# Worker observations parent under their per-instance dispatch.
workers = [o for o in trace.observations if o.name == "worker"]
assert len(workers) == 2
worker_parents = {w.parent_observation_id for w in workers}
dispatch_ids = {d.id for d in dispatches}
assert worker_parents == dispatch_ids
async def test_detached_subgraph_opens_separate_trace() -> None:
inner = (
GraphBuilder(_S)
.add_node("inner_a", lambda _s: _record("inner_a"))
.add_edge("inner_a", END)
.set_entry("inner_a")
.compile()
)
parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile()
graph, client, _ = _attach_with_detached(parent, detached_subgraphs=frozenset({"sub"}))
await graph.invoke(_S())
await graph.drain()
# Two Traces: main invocation + detached subgraph.
assert len(client.traces) == 2
main = next(t for t in client.traces.values() if "detached_from_invocation_id" not in t.metadata)
detached = next(t for t in client.traces.values() if "detached_from_invocation_id" in t.metadata)
# Main Trace has the link observation with detached_child_trace_ids.
link_obs = _find_observation(main, "sub")
assert detached.id in link_obs.metadata["detached_child_trace_ids"]
# Detached Trace has its own dispatch observation + inner_a under it.
detached_dispatch = _find_observation(detached, "sub")
assert detached_dispatch.parent_observation_id is None
inner_node = _find_observation(detached, "inner_a")
assert inner_node.parent_observation_id == detached_dispatch.id
async def test_detached_fan_out_each_instance_gets_trace() -> None:
async def _worker(_s: _WorkerState) -> Any:
return {"result": "done"}
inner = (
GraphBuilder(_WorkerState)
.add_node("worker", _worker)
.add_edge("worker", END)
.set_entry("worker")
.compile()
)
parent = (
GraphBuilder(_S)
.add_fan_out_node(
"fan",
subgraph=inner,
count=3,
collect_field="result",
target_field="worker_results",
)
.add_edge("fan", END)
.set_entry("fan")
.compile()
)
graph, client, _ = _attach_with_detached(parent, detached_fan_outs=frozenset({"fan"}))
await graph.invoke(_S())
await graph.drain()
# Main Trace + one detached Trace per instance.
assert len(client.traces) == 1 + 3
main = next(t for t in client.traces.values() if "detached_from_invocation_id" not in t.metadata)
detached_traces = [t for t in client.traces.values() if "detached_from_invocation_id" in t.metadata]
assert len(detached_traces) == 3
fan_node = _find_observation(main, "fan")
# The fan-out node's metadata accumulates all 3 detached trace ids.
link_ids = fan_node.metadata.get("detached_child_trace_ids")
assert isinstance(link_ids, list)
assert set(cast(list[str], link_ids)) == {t.id for t in detached_traces}
# Each detached Trace has its own per-instance dispatch with a
# worker observation under it.
for t in detached_traces:
dispatch = _find_observation(t, "fan")
worker = _find_observation(t, "worker")
assert worker.parent_observation_id == dispatch.id
async def test_subgraph_dispatch_observation_ended_on_invocation_close() -> None:
# Synthetic dispatch observations close on cursor-move; without
# the close_invocation drain a subgraph at the tail of an
# invocation would leave its dispatch in-flight forever. Verifies
# the drain path ends everything.
inner = (
GraphBuilder(_S)
.add_node("inner_a", lambda _s: _record("inner_a"))
.add_edge("inner_a", END)
.set_entry("inner_a")
.compile()
)
parent = GraphBuilder(_S).add_subgraph_node("sub", inner).add_edge("sub", END).set_entry("sub").compile()
graph, client, observer = _attach(parent)
await graph.invoke(_S())
await graph.drain()
# Without explicit close_invocation, the sub dispatch would still
# be in-flight (ended=False). Call shutdown() to drain.
observer.shutdown()
trace = next(iter(client.traces.values()))
for obs in trace.observations:
assert obs.ended, f"observation {obs.name!r} not ended after shutdown()"