@@ -182,6 +182,11 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
182182 "035-caller-invocation-id-uuid" ,
183183 "036-caller-invocation-id-non-uuid" ,
184184 "059-implementation-attribution-langfuse" ,
185+ # Tier 2b: Langfuse Generation observation (proposal 0031 §8.4.3/§8.4.4)
186+ # -- model / modelParameters / usage / input-output payload (with
187+ # truncation) and prompt-entity linkage.
188+ "023-langfuse-generation-rendering" ,
189+ "024-langfuse-prompt-linkage" ,
185190 # proposal 0052 attribution fixture (case 1) + proposal 0061
186191 # (case 2: the §5.1 attribution lands on the detached trace's own
187192 # openarmature.invocation span). Wired together now that 0061
@@ -298,14 +303,10 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
298303_UNIT_TESTED_FIXTURES : dict [str , str ] = {
299304 fixture_id : reason
300305 for fixture_ids , reason in (
301- # Fixture-harness catch-up tier 2a wired the trace-shape Langfuse
302- # fixtures (022/031/032), the invocation-id fixtures (035/036), and the
303- # attribution fixture (059). 023/024 (Langfuse Generation) are tier 2b;
304- # 033 (detached multi-trace) is tier 4.
305- (
306- ("023-langfuse-generation-rendering" , "024-langfuse-prompt-linkage" ),
307- "proposal 0031 Langfuse generation/prompt-linkage; covered by test_observability_langfuse.py" ,
308- ),
306+ # Fixture-harness catch-up tier 2 wired the trace-shape Langfuse
307+ # fixtures (022/031/032), invocation-id (035/036), attribution (059) in
308+ # 2a, and the Langfuse Generation fixtures (023/024) in 2b. 033 (detached
309+ # multi-trace) is tier 4.
309310 (
310311 ("033-langfuse-detached-trace-mode" ,),
311312 "proposal 0035/0061 Langfuse detached-trace mode; covered by test_observability_langfuse.py" ,
@@ -556,6 +557,11 @@ async def test_observability_fixture(fixture_path: Path) -> None:
556557 "036-caller-invocation-id-non-uuid" ,
557558 }:
558559 await _run_invocation_id_fixture (spec )
560+ elif fixture_id in {
561+ "023-langfuse-generation-rendering" ,
562+ "024-langfuse-prompt-linkage" ,
563+ }:
564+ await _run_langfuse_generation_fixture (spec )
559565 elif fixture_id in {
560566 "012-otel-llm-payload-default-off" ,
561567 "013-otel-llm-payload-enabled" ,
@@ -2649,6 +2655,19 @@ def _langfuse_value_matches(
26492655 and set (cast ("Mapping[str, Any]" , expected )).issubset (_LANGFUSE_MATCHER_SUBKEYS )
26502656 ):
26512657 return _langfuse_matcher_subkeys_match (actual , cast ("Mapping[str, Any]" , expected ), params )
2658+ # A regular NON-empty nested mapping (e.g. 024 metadata.prompt): recurse per
2659+ # key so inner tokens (rendered_hash: <any-string>) still apply. Subset over
2660+ # keys -- every expected key must be present and match; actual MAY carry
2661+ # extras. An empty expected dict falls through to exact equality below
2662+ # (rather than vacuously matching any mapping).
2663+ if isinstance (expected , Mapping ) and expected :
2664+ if not isinstance (actual , Mapping ):
2665+ return False
2666+ actual_map = cast ("Mapping[str, Any]" , actual )
2667+ return all (
2668+ k in actual_map and _langfuse_value_matches (actual_map [k ], v , bindings = bindings , params = params )
2669+ for k , v in cast ("Mapping[str, Any]" , expected ).items ()
2670+ )
26522671 return bool (actual == expected )
26532672
26542673
@@ -2820,6 +2839,60 @@ async def _run_invocation_id_case(case: Mapping[str, Any]) -> None:
28202839 actual = trace .id
28212840 assert actual == val , f"trace.metadata.{ key } { actual !r} != { val !r} "
28222841
2842+ # The fixture's top-level verbatim invocation_id clause (the §5.1
2843+ # caller_invocation_id_verbatim_on_attribute invariant): on the OTel side it
2844+ # is the openarmature.invocation_id span attribute; in the Langfuse runner
2845+ # the verbatim id surfaces as the in-memory recorder's raw trace.id.
2846+ expected_invocation_id = cast ("dict[str, Any]" , case ["expected" ]).get ("invocation_id" )
2847+ if expected_invocation_id is not None :
2848+ assert trace .id == expected_invocation_id , (
2849+ f"verbatim invocation_id: raw trace.id { trace .id !r} != { expected_invocation_id !r} "
2850+ )
2851+
2852+
2853+ async def _run_langfuse_generation_fixture (spec : Mapping [str , Any ]) -> None :
2854+ """Driver for the Langfuse Generation fixtures (023 generation rendering +
2855+ truncation, 024 prompt linkage). Builds a calls_llm graph, records into an
2856+ InMemoryLangfuseClient under the fixture's observer config, and asserts the
2857+ Generation observation nested under the node span.
2858+ """
2859+ for case in cast ("list[dict[str, Any]]" , spec ["cases" ]):
2860+ case_name = cast ("str" , case ["name" ])
2861+ try :
2862+ await _run_langfuse_generation_case (case )
2863+ except AssertionError as e :
2864+ raise AssertionError (f"case { case_name !r} : { e } " ) from e
2865+
2866+
2867+ async def _run_langfuse_generation_case (case : Mapping [str , Any ]) -> None :
2868+ import openarmature
2869+ from openarmature .observability .langfuse import InMemoryLangfuseClient , LangfuseObserver
2870+
2871+ graph , state_cls , provider = _build_simple_llm_graph (case , populate_caller_metadata = False )
2872+ client = InMemoryLangfuseClient ()
2873+ cfg = cast ("dict[str, Any]" , case .get ("langfuse_observer" ) or {})
2874+ lf_kwargs : dict [str , Any ] = {"client" : client }
2875+ if "disable_provider_payload" in cfg :
2876+ lf_kwargs ["disable_provider_payload" ] = bool (cfg ["disable_provider_payload" ])
2877+ if "payload_byte_cap" in cfg :
2878+ lf_kwargs ["payload_byte_cap" ] = int (cfg ["payload_byte_cap" ])
2879+ observer = LangfuseObserver (** lf_kwargs )
2880+ graph .attach_observer (observer )
2881+ state = _make_state_instance (case , state_cls )
2882+ try :
2883+ await graph .invoke (state )
2884+ await graph .drain ()
2885+ finally :
2886+ observer .shutdown ()
2887+ await provider .aclose ()
2888+
2889+ assert len (client .traces ) == 1 , f"expected 1 Langfuse trace; got { len (client .traces )} "
2890+ trace = next (iter (client .traces .values ()))
2891+ bindings : dict [str , Any ] = {}
2892+ params = {"implementation_name" : openarmature .__implementation_name__ }
2893+ expected = cast ("dict[str, Any]" , case ["expected" ]["langfuse_trace" ])
2894+ _assert_langfuse_trace_shape (trace , expected , bindings = bindings , params = params )
2895+
28232896
28242897# ---------------------------------------------------------------------------
28252898# Fixture 010 — log correlation
@@ -3755,6 +3828,72 @@ async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict
37553828 return builder .compile (), state_cls , providers
37563829
37573830
3831+ def _assert_langfuse_generation_fields (
3832+ exp_name : str | None ,
3833+ match : Any ,
3834+ exp : Mapping [str , Any ],
3835+ * ,
3836+ bindings : dict [str , Any ],
3837+ params : Mapping [str , Any ],
3838+ ) -> None :
3839+ """Generation-observation fields beyond the base span shape (023/024):
3840+ model / modelParameters / usage, the input parse-or-truncation shapes, and
3841+ the prompt-entity link. Each is asserted only when present, so it is inert
3842+ for span / tool observations. The placeholder-capable fields go through the
3843+ value-matcher (consistent with metadata); usage is a typed integer record.
3844+ """
3845+ if "model" in exp :
3846+ assert _langfuse_value_matches (match .model , exp ["model" ], bindings = bindings , params = params ), (
3847+ f"{ exp_name !r} : model { match .model !r} did not match { exp ['model' ]!r} "
3848+ )
3849+ if "modelParameters" in exp :
3850+ assert _langfuse_value_matches (
3851+ match .model_parameters , exp ["modelParameters" ], bindings = bindings , params = params
3852+ ), f"{ exp_name !r} : modelParameters { match .model_parameters !r} != { exp ['modelParameters' ]!r} "
3853+ if "usage" in exp :
3854+ u = cast ("dict[str, Any]" , exp ["usage" ])
3855+ got = None if match .usage is None else (match .usage .input , match .usage .output , match .usage .total )
3856+ assert got == (u ["input" ], u ["output" ], u ["total" ]), f"{ exp_name !r} : usage { got !r} != { u !r} "
3857+ if "prompt_entity_link" in exp :
3858+ assert _langfuse_value_matches (
3859+ match .prompt_entity_link , exp ["prompt_entity_link" ], bindings = bindings , params = params
3860+ ), (
3861+ f"{ exp_name !r} : prompt_entity_link { match .prompt_entity_link !r} "
3862+ f"did not match { exp ['prompt_entity_link' ]!r} "
3863+ )
3864+ if exp .get ("prompt_entity_link_absent" ) is True :
3865+ assert match .prompt_entity_link is None , (
3866+ f"{ exp_name !r} : expected no prompt_entity_link; got { match .prompt_entity_link !r} "
3867+ )
3868+ if "input_parses_as_messages" in exp :
3869+ # Under-cap input is the native message list (§8.7); compare directly.
3870+ assert match .input == exp ["input_parses_as_messages" ], (
3871+ f"{ exp_name !r} : input { match .input !r} did not parse as { exp ['input_parses_as_messages' ]!r} "
3872+ )
3873+ if exp .get ("input_is_raw_string_with_marker" ) is True :
3874+ # Over-cap input falls through to the raw truncated string + §5.5.5 marker.
3875+ assert isinstance (match .input , str ) and re .search (r"\[truncated, \d+ bytes total\]" , match .input ), (
3876+ f"{ exp_name !r} : expected a raw truncated string with the marker; got { match .input !r} "
3877+ )
3878+
3879+
3880+ def _obs_selection_matches (obs : Any , exp_metadata : Mapping [str , Any ]) -> bool :
3881+ """Read-only disambiguator for same-(type, name) sibling observations: an
3882+ actual is a candidate when its scalar expected-metadata values match.
3883+
3884+ Only scalars (str / int / float / bool) are used: placeholder tokens are
3885+ shared across siblings (correlation_id) so they don't disambiguate, and
3886+ running the value-matcher here would fire its binding side effects during
3887+ selection; sequences (namespace) are left to the value-matcher's list/tuple
3888+ handling. fan_out_index / step are the fields that actually distinguish.
3889+ """
3890+ for key , val in exp_metadata .items ():
3891+ is_placeholder = isinstance (val , str ) and val .startswith ("<" ) and val .endswith (">" )
3892+ if isinstance (val , (str , int , float )) and not is_placeholder and obs .metadata .get (key ) != val :
3893+ return False
3894+ return True
3895+
3896+
37583897def _assert_langfuse_observation_tree (
37593898 trace : Any ,
37603899 expected : list [dict [str , Any ]],
@@ -3776,8 +3915,18 @@ def _assert_langfuse_observation_tree(
37763915 for exp in expected :
37773916 exp_type = cast ("str" , exp ["type" ])
37783917 exp_name = cast ("str | None" , exp .get ("name" ))
3918+ # Disambiguate same-(type, name) siblings (e.g. 032's per-instance
3919+ # "process" spans) by their scalar metadata, not list/emission order, so
3920+ # the assertions can't bind the wrong sibling if emission order shifts.
3921+ exp_meta = cast ("dict[str, Any]" , exp .get ("metadata" ) or {})
37793922 match = next (
3780- (o for o in remaining if o .type == exp_type and (exp_name is None or o .name == exp_name )),
3923+ (
3924+ o
3925+ for o in remaining
3926+ if o .type == exp_type
3927+ and (exp_name is None or o .name == exp_name )
3928+ and _obs_selection_matches (o , exp_meta )
3929+ ),
37813930 None ,
37823931 )
37833932 assert match is not None , (
@@ -3802,6 +3951,7 @@ def _assert_langfuse_observation_tree(
38023951 assert match .metadata .get (key ) == val , (
38033952 f"{ exp_name !r} : metadata.{ key } { match .metadata .get (key )!r} != { val !r} "
38043953 )
3954+ _assert_langfuse_generation_fields (exp_name , match , exp , bindings = bindings or {}, params = params or {})
38053955 children = cast ("list[dict[str, Any]] | None" , exp .get ("children" ))
38063956 if children :
38073957 _assert_langfuse_observation_tree (
@@ -4146,6 +4296,13 @@ def _materialize_typed_messages(messages_spec: Sequence[Mapping[str, Any]]) -> l
41464296 for m in messages_spec :
41474297 role = m .get ("role" )
41484298 content = m .get ("content" )
4299+ # content_repeat synthesis (023 case 2 / fixture 014, mirroring the OTel
4300+ # _materialize_messages helper): N repetitions of a single char to drive
4301+ # payload truncation. The fixtures use a single-byte ASCII char, so the
4302+ # char count equals the byte count.
4303+ cr = cast ("Mapping[str, Any] | None" , m .get ("content_repeat" ))
4304+ if cr is not None :
4305+ content = cast ("str" , cr ["char" ]) * int (cr ["bytes" ])
41494306 if role == "system" :
41504307 out .append (SystemMessage (content = _require_text_content (role , content )))
41514308 elif role == "user" :
@@ -4179,6 +4336,13 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any:
41794336 rendered = rendered .replace ("{{" + key + "}}" , str (value )).replace ("{{ " + key + " }}" , str (value ))
41804337 messages : list [Message ] = [UserMessage (content = rendered )]
41814338 now = datetime .now (UTC )
4339+ # A backend that exposes a Langfuse Prompt reference (024 case 1,
4340+ # mock_with_langfuse_reference) surfaces it as the langfuse_prompt
4341+ # observability entity; the observer reads it to link the Generation.
4342+ observability_entities : dict [str , Any ] | None = None
4343+ reference = entry .get ("langfuse_prompt_reference" )
4344+ if reference is not None :
4345+ observability_entities = {"langfuse_prompt" : reference }
41824346 return PromptResult (
41834347 name = cast ("str" , entry ["name" ]),
41844348 version = cast ("str" , entry ["version" ]),
@@ -4189,6 +4353,7 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any:
41894353 variables = variables ,
41904354 fetched_at = now ,
41914355 rendered_at = now ,
4356+ observability_entities = observability_entities ,
41924357 )
41934358
41944359
0 commit comments