Skip to content

Commit 9310fad

Browse files
Wire tier-2a Langfuse trace-shape fixtures (#185)
* Wire tier-2a Langfuse trace-shape fixtures Move six fixtures (022/031/032 Langfuse trace + observation tree, 035/036 caller-invocation-id derivation, 059 implementation attribution) from _UNIT_TESTED_FIXTURES into _SUPPORTED_FIXTURES, driven through a LangfuseObserver + InMemoryLangfuseClient recorder. Second tier of the fixture-harness catch-up; test-only, no library change, no pin bump. Adds a Langfuse-trace runner plus a value-matcher for the placeholder tokens (<uuid-hex>, <any-string>, <corr_id_N> first-occurrence binding) and the assertion sub-key matchers (harness_parameterized, non_empty_string), and an invocation-id runner. The fixture trace.id is the derived Langfuse id, so the harness bridges the recorder's raw invocation_id through the impl's own langfuse_trace_id. No deferrals; 023/024 (Langfuse Generation) are tier 2b. * Apply review fixes to tier-2a harness Address review feedback on the tier-2a fixture wiring: - _run_langfuse_trace_case wraps invoke/drain in try/finally so the observer is always shut down, even if the graph raises. - _run_invocation_id_case now holds the LangfuseObserver reference and shuts it down in finally, matching the other Langfuse runners. - _assert_langfuse_observation_tree enables the value-matcher only when both bindings and params are provided (and, not or), so a partial call degrades to exact match instead of half-enabling the matcher.
1 parent ec73dc5 commit 9310fad

1 file changed

Lines changed: 250 additions & 23 deletions

File tree

tests/conformance/test_observability.py

Lines changed: 250 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
170170
"068-llm-completion-event-response-model-distinct-from-request",
171171
"071-llm-failure-event-call-id-distinct-from-completion-event",
172172
"072-llm-failure-event-mutual-exclusion-with-completion-event",
173+
# Fixture-harness catch-up tier 2a: trace-shape Langfuse fixtures
174+
# driven through a LangfuseObserver + InMemoryLangfuseClient recorder.
175+
# 022/031/032 assert the Trace + observation tree (proposal 0031/0035/
176+
# 0061); 035/036 the caller-invocation-id -> trace.id derivation
177+
# (proposal 0039); 059 the implementation-attribution trace metadata
178+
# (proposal 0052). 023/024 (Langfuse Generation) are tier 2b.
179+
"022-langfuse-basic-trace",
180+
"031-langfuse-subgraph-span-hierarchy",
181+
"032-langfuse-fan-out-per-instance-spans",
182+
"035-caller-invocation-id-uuid",
183+
"036-caller-invocation-id-non-uuid",
184+
"059-implementation-attribution-langfuse",
173185
# proposal 0052 attribution fixture (case 1) + proposal 0061
174186
# (case 2: the §5.1 attribution lands on the detached trace's own
175187
# openarmature.invocation span). Wired together now that 0061
@@ -286,17 +298,17 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
286298
_UNIT_TESTED_FIXTURES: dict[str, str] = {
287299
fixture_id: reason
288300
for fixture_ids, reason in (
301+
# Fixture-harness catch-up tier 2a wired the trace-shape Langfuse
302+
# fixtures (022/031/032), the invocation-id fixtures (035/036), and the
303+
# attribution fixture (059). 023/024 (Langfuse Generation) are tier 2b;
304+
# 033 (detached multi-trace) is tier 4.
289305
(
290-
("022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"),
291-
"proposal 0031 Langfuse mapping; covered by test_observability_langfuse.py",
306+
("023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"),
307+
"proposal 0031 Langfuse generation/prompt-linkage; covered by test_observability_langfuse.py",
292308
),
293309
(
294-
(
295-
"031-langfuse-subgraph-span-hierarchy",
296-
"032-langfuse-fan-out-per-instance-spans",
297-
"033-langfuse-detached-trace-mode",
298-
),
299-
"proposal 0035/0061 Langfuse span hierarchy; covered by test_observability_langfuse.py",
310+
("033-langfuse-detached-trace-mode",),
311+
"proposal 0035/0061 Langfuse detached-trace mode; covered by test_observability_langfuse.py",
300312
),
301313
(
302314
(
@@ -310,10 +322,6 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
310322
("030-caller-metadata-parallel-branches-per-branch",),
311323
"proposal 0040 per-branch caller metadata; covered by test_observability_otel.py",
312324
),
313-
(
314-
("035-caller-invocation-id-uuid", "036-caller-invocation-id-non-uuid"),
315-
"proposal 0039 invocation_id derivation; covered by test_observability_langfuse_adapter.py",
316-
),
317325
(
318326
("037-langfuse-trace-input-output",),
319327
"proposal 0043 trace input/output; covered by test_observability_langfuse.py",
@@ -327,10 +335,6 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
327335
),
328336
"proposal 0048 get_invocation_metadata; covered by test_observability_metadata.py",
329337
),
330-
(
331-
("059-implementation-attribution-langfuse",),
332-
"proposal 0052 implementation attribution; covered by test_observability_langfuse.py",
333-
),
334338
# Fixture-harness catch-up tier 1 wired the rest of the 0057/0058
335339
# family into _SUPPORTED_FIXTURES; these three stay here, each blocked
336340
# on a spec-side fixture change that python picks up at the v0.16.0 pin
@@ -540,6 +544,18 @@ async def test_observability_fixture(fixture_path: Path) -> None:
540544
await _run_fixture_058(spec)
541545
elif fixture_id == "084-langfuse-session-user-promotion":
542546
await _run_fixture_084(spec)
547+
elif fixture_id in {
548+
"022-langfuse-basic-trace",
549+
"031-langfuse-subgraph-span-hierarchy",
550+
"032-langfuse-fan-out-per-instance-spans",
551+
"059-implementation-attribution-langfuse",
552+
}:
553+
await _run_langfuse_trace_fixture(spec)
554+
elif fixture_id in {
555+
"035-caller-invocation-id-uuid",
556+
"036-caller-invocation-id-non-uuid",
557+
}:
558+
await _run_invocation_id_fixture(spec)
543559
elif fixture_id in {
544560
"012-otel-llm-payload-default-off",
545561
"013-otel-llm-payload-enabled",
@@ -2609,6 +2625,202 @@ async def _run_fixture_084(spec: Mapping[str, Any]) -> None:
26092625
raise AssertionError(f"case {case_name!r}: {e}") from e
26102626

26112627

2628+
_LANGFUSE_MATCHER_SUBKEYS = frozenset({"harness_parameterized", "non_empty_string"})
2629+
2630+
2631+
def _langfuse_value_matches(
2632+
actual: Any,
2633+
expected: Any,
2634+
*,
2635+
bindings: dict[str, Any],
2636+
params: Mapping[str, Any],
2637+
) -> bool:
2638+
"""Match a Langfuse trace/observation value against a fixture expectation:
2639+
an inline placeholder token, the assertion sub-key dict, or plain equality.
2640+
"""
2641+
# The value-matcher idioms are the conformance-adapter §5.10 vocabulary.
2642+
if isinstance(expected, str) and expected.startswith("<") and expected.endswith(">"):
2643+
return _langfuse_placeholder_matches(actual, expected, bindings)
2644+
# A NON-empty mapping whose keys are all matcher sub-keys is an assertion
2645+
# dict; an empty dict (or a dict with other keys) is matched by equality.
2646+
if (
2647+
isinstance(expected, Mapping)
2648+
and expected
2649+
and set(cast("Mapping[str, Any]", expected)).issubset(_LANGFUSE_MATCHER_SUBKEYS)
2650+
):
2651+
return _langfuse_matcher_subkeys_match(actual, cast("Mapping[str, Any]", expected), params)
2652+
return bool(actual == expected)
2653+
2654+
2655+
def _langfuse_placeholder_matches(actual: Any, token: str, bindings: dict[str, Any]) -> bool:
2656+
"""Inline placeholder tokens: ``<any-string>`` (non-empty), ``<uuid-hex>``
2657+
(32-hex dashes-stripped), and first-occurrence binding tokens like
2658+
``<corr_id_1>`` (bind on first sighting, assert equality after -- the
2659+
correlation-id-consistency check). The §5.10 ``<uuid>`` (canonical) token
2660+
is added when a wired fixture first needs it.
2661+
"""
2662+
if token == "<any-string>":
2663+
return isinstance(actual, str) and actual != ""
2664+
if token == "<uuid-hex>":
2665+
return isinstance(actual, str) and re.fullmatch(r"[0-9a-f]{32}", actual) is not None
2666+
if token in bindings:
2667+
return actual == bindings[token]
2668+
if actual is None:
2669+
return False
2670+
bindings[token] = actual
2671+
return True
2672+
2673+
2674+
def _langfuse_matcher_subkeys_match(actual: Any, spec: Mapping[str, Any], params: Mapping[str, Any]) -> bool:
2675+
"""Assertion sub-keys (059): ``non_empty_string`` and ``harness_parameterized``
2676+
(value equals the named harness-injected parameter)."""
2677+
if spec.get("non_empty_string") is True and not (isinstance(actual, str) and actual != ""):
2678+
return False
2679+
if "harness_parameterized" in spec:
2680+
param_name = cast("str", spec["harness_parameterized"])
2681+
if actual != params.get(param_name):
2682+
return False
2683+
return True
2684+
2685+
2686+
def _assert_langfuse_trace_shape(
2687+
trace: Any,
2688+
expected: Mapping[str, Any],
2689+
*,
2690+
bindings: dict[str, Any],
2691+
params: Mapping[str, Any],
2692+
) -> None:
2693+
"""Assert a Langfuse Trace's id / name / metadata / observation tree against
2694+
the fixture's ``expected.langfuse_trace`` block. Each clause is asserted only
2695+
when present (059 asserts metadata only; 022/031/032 assert all four).
2696+
"""
2697+
if "id" in expected:
2698+
# python's in-memory LangfuseTrace.id is the RAW invocation_id (the
2699+
# §8.4.1 verbatim OA-side id); the fixture asserts the DERIVED Langfuse
2700+
# trace id (uuid-hex / sha256[:16]). Bridge via langfuse_trace_id, the
2701+
# impl's own derivation rule (trace_id.py).
2702+
from openarmature.observability.langfuse import langfuse_trace_id
2703+
2704+
derived_id = langfuse_trace_id(trace.id)
2705+
assert _langfuse_value_matches(derived_id, expected["id"], bindings=bindings, params=params), (
2706+
f"derived trace.id {derived_id!r} (from raw {trace.id!r}) did not match {expected['id']!r}"
2707+
)
2708+
if "name" in expected:
2709+
assert _langfuse_value_matches(trace.name, expected["name"], bindings=bindings, params=params), (
2710+
f"trace.name {trace.name!r} did not match {expected['name']!r}"
2711+
)
2712+
for key, val in cast("dict[str, Any]", expected.get("metadata") or {}).items():
2713+
assert _langfuse_value_matches(trace.metadata.get(key), val, bindings=bindings, params=params), (
2714+
f"trace.metadata.{key} {trace.metadata.get(key)!r} did not match {val!r}"
2715+
)
2716+
observations = cast("list[dict[str, Any]] | None", expected.get("observations"))
2717+
if observations is not None:
2718+
_assert_langfuse_observation_tree(trace, observations, bindings=bindings, params=params)
2719+
2720+
2721+
async def _run_langfuse_trace_fixture(spec: Mapping[str, Any]) -> None:
2722+
"""Driver for the trace-shape Langfuse fixtures: 022/031/032 (single-dict)
2723+
and 059 (cases). Each builds a graph via the adapter, records into an
2724+
InMemoryLangfuseClient, and asserts the Trace + observation tree.
2725+
"""
2726+
if "cases" in spec:
2727+
for case in cast("list[dict[str, Any]]", spec["cases"]):
2728+
case_name = cast("str", case["name"])
2729+
try:
2730+
await _run_langfuse_trace_case(case)
2731+
except AssertionError as e:
2732+
raise AssertionError(f"case {case_name!r}: {e}") from e
2733+
else:
2734+
await _run_langfuse_trace_case(spec)
2735+
2736+
2737+
async def _run_langfuse_trace_case(case: Mapping[str, Any]) -> None:
2738+
import openarmature
2739+
from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver
2740+
2741+
_patch_unsupported_directives(case)
2742+
client = InMemoryLangfuseClient()
2743+
lf_kwargs: dict[str, Any] = {"client": client}
2744+
cfg = cast("dict[str, Any]", case.get("langfuse_observer_config") or case.get("langfuse_observer") or {})
2745+
if "disable_state_payload" in cfg:
2746+
lf_kwargs["disable_state_payload"] = bool(cfg["disable_state_payload"])
2747+
if "disable_provider_payload" in cfg:
2748+
lf_kwargs["disable_provider_payload"] = bool(cfg["disable_provider_payload"])
2749+
observer = LangfuseObserver(**lf_kwargs)
2750+
2751+
subgraphs = _compile_subgraphs(case)
2752+
built = build_graph(case, subgraphs=dict(subgraphs), trace=[])
2753+
compiled = built.builder.compile()
2754+
compiled.attach_observer(observer)
2755+
initial_state = built.initial_state(case.get("initial_state", {}))
2756+
try:
2757+
await compiled.invoke(initial_state)
2758+
await compiled.drain()
2759+
finally:
2760+
observer.shutdown()
2761+
2762+
assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}"
2763+
trace = next(iter(client.traces.values()))
2764+
bindings: dict[str, Any] = {}
2765+
params = {"implementation_name": openarmature.__implementation_name__}
2766+
expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"])
2767+
_assert_langfuse_trace_shape(trace, expected, bindings=bindings, params=params)
2768+
2769+
2770+
async def _run_invocation_id_fixture(spec: Mapping[str, Any]) -> None:
2771+
"""Driver for the caller-invocation-id fixtures (035/036). Builds a simple
2772+
calls_llm graph, invokes with ``invocation_id=caller_invocation_id``, and
2773+
asserts the Langfuse ``trace.id`` equals the fixture's pinned derivation
2774+
(python derives it; the harness checks the result) plus 036's raw id in
2775+
``trace.metadata``.
2776+
"""
2777+
for case in cast("list[dict[str, Any]]", spec["cases"]):
2778+
case_name = cast("str", case["name"])
2779+
try:
2780+
await _run_invocation_id_case(case)
2781+
except AssertionError as e:
2782+
raise AssertionError(f"case {case_name!r}: {e}") from e
2783+
2784+
2785+
async def _run_invocation_id_case(case: Mapping[str, Any]) -> None:
2786+
from openarmature.observability.langfuse import (
2787+
InMemoryLangfuseClient,
2788+
LangfuseObserver,
2789+
langfuse_trace_id,
2790+
)
2791+
2792+
graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False)
2793+
client = InMemoryLangfuseClient()
2794+
observer = LangfuseObserver(client=client)
2795+
graph.attach_observer(observer)
2796+
state = _make_state_instance(case, state_cls)
2797+
caller_id = cast("str", case["caller_invocation_id"])
2798+
try:
2799+
await graph.invoke(state, invocation_id=caller_id)
2800+
await graph.drain()
2801+
finally:
2802+
observer.shutdown()
2803+
await provider.aclose()
2804+
2805+
assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}"
2806+
trace = next(iter(client.traces.values()))
2807+
expected_trace = cast("dict[str, Any]", case["expected"]["langfuse_trace"])
2808+
# The fixture's trace.id is the DERIVED Langfuse id; the in-memory recorder
2809+
# keys by the raw invocation_id. Bridge via the impl's langfuse_trace_id.
2810+
derived_id = langfuse_trace_id(trace.id)
2811+
assert derived_id == expected_trace["id"], (
2812+
f"derived trace.id {derived_id!r} (from raw {trace.id!r}) != {expected_trace['id']!r}"
2813+
)
2814+
for key, val in cast("dict[str, Any]", expected_trace.get("metadata") or {}).items():
2815+
actual = trace.metadata.get(key)
2816+
# The real SDK derives trace.id and preserves the raw invocation_id in
2817+
# metadata for reverse lookup; the in-memory recorder instead keeps the
2818+
# raw id AS trace.id. Recover it from there when metadata omits it (036).
2819+
if actual is None and key == "invocation_id":
2820+
actual = trace.id
2821+
assert actual == val, f"trace.metadata.{key} {actual!r} != {val!r}"
2822+
2823+
26122824
# ---------------------------------------------------------------------------
26132825
# Fixture 010 — log correlation
26142826
#
@@ -3544,15 +3756,23 @@ async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict
35443756

35453757

35463758
def _assert_langfuse_observation_tree(
3547-
trace: Any, expected: list[dict[str, Any]], parent_id: str | None = None
3759+
trace: Any,
3760+
expected: list[dict[str, Any]],
3761+
parent_id: str | None = None,
3762+
*,
3763+
bindings: dict[str, Any] | None = None,
3764+
params: Mapping[str, Any] | None = None,
35483765
) -> None:
35493766
"""Recursively match expected observations against the trace's flat
35503767
observation list (linked by parent_observation_id). type + name are
3551-
matched exactly; level / input / output exactly when present;
3552-
metadata is subset-matched."""
3768+
matched exactly; level / input / output exactly when present; metadata is
3769+
subset-matched. When ``bindings``/``params`` are supplied, metadata values
3770+
go through the value-matcher (placeholder tokens + sub-key matchers);
3771+
otherwise they are compared exactly (the tool-fixture path)."""
35533772
# Mutable copy: each matched observation is consumed so two
35543773
# same-shape expected siblings can't both bind to one actual.
35553774
remaining = list(trace.children_of(parent_id))
3775+
use_matcher = bindings is not None and params is not None
35563776
for exp in expected:
35573777
exp_type = cast("str", exp["type"])
35583778
exp_name = cast("str | None", exp.get("name"))
@@ -3574,12 +3794,19 @@ def _assert_langfuse_observation_tree(
35743794
f"{exp_name!r}: output {match.output!r} != {exp['output']!r}"
35753795
)
35763796
for key, val in cast("dict[str, Any]", exp.get("metadata") or {}).items():
3577-
assert match.metadata.get(key) == val, (
3578-
f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}"
3579-
)
3797+
if use_matcher:
3798+
assert _langfuse_value_matches(
3799+
match.metadata.get(key), val, bindings=bindings or {}, params=params or {}
3800+
), f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} did not match {val!r}"
3801+
else:
3802+
assert match.metadata.get(key) == val, (
3803+
f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}"
3804+
)
35803805
children = cast("list[dict[str, Any]] | None", exp.get("children"))
35813806
if children:
3582-
_assert_langfuse_observation_tree(trace, children, parent_id=match.id)
3807+
_assert_langfuse_observation_tree(
3808+
trace, children, parent_id=match.id, bindings=bindings, params=params
3809+
)
35833810

35843811

35853812
async def _run_tool_fixture(spec: Mapping[str, Any]) -> None:

0 commit comments

Comments
 (0)