Skip to content

Commit 6c2cf60

Browse files
Wire 039 case 3 conformance, defer cases 1+2
Activate the nested-lineage fixture (039) in the Langfuse conformance runner. Case 3 (a fan-out inside a serial subgraph wrapper) is built by a dedicated hand-built runner -- the generic adapter cannot construct nested fan-out graphs -- and asserted against the fixture's expected trace. A ContextVar-scoped negative check enforces proposal 0045's MUST-NOT: an augmented key absent from an observation's expected metadata must be absent in the actual, which the subset matcher alone cannot catch. Cases 1 and 2 are temporarily deferred via _DEFERRED_CASES. Both need a shared observer fix: dispatch keys do not encode the enclosing fan-out instance, so a dispatch inside an outer instance collides across instances. Tracked as a separate effort.
1 parent 529a668 commit 6c2cf60

1 file changed

Lines changed: 191 additions & 20 deletions

File tree

tests/conformance/test_observability_langfuse.py

Lines changed: 191 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import copy
1919
import json
2020
from collections.abc import Callable, Mapping, Sequence
21+
from contextvars import ContextVar
2122
from datetime import UTC, datetime
2223
from pathlib import Path
2324
from typing import Any, cast
@@ -26,7 +27,7 @@
2627
import pytest
2728
import yaml
2829

29-
from openarmature.graph import END, GraphBuilder
30+
from openarmature.graph import END, ExplicitMapping, GraphBuilder
3031
from openarmature.llm import OpenAIProvider
3132
from openarmature.llm.response import RuntimeConfig
3233
from openarmature.observability.langfuse import (
@@ -108,17 +109,14 @@
108109
# proposal 0044) that inner branch nodes parent under, ported from the
109110
# OTel observer's parallel_branches_branch_spans machinery.
110111
"030-caller-metadata-parallel-branches-per-branch",
111-
# 039 (nested-lineage augmentation, proposal 0045) stays deferred: the
112-
# three cases need harness extensions the existing primitives lack.
113-
# Cases 1 + 3 (nested fan-out / fan-out-in-serial) need the fan-out
114-
# augment middleware to read items_field from the executing subgraph's
115-
# RUNTIME state (the outer instance's threaded inner_seed), not the
116-
# build-time initial_state the current _make_augment_instance_middleware
117-
# captures. Case 2 (pb-inside-fan-out) needs a new
118-
# augment_metadata_from_outer_item factory AND depends on 030's
119-
# per-branch dispatch span landing first. 0045's contract IS exercised
120-
# at unit level via the OTel observer's
121-
# ``test_nested_fan_out_augmentation_reaches_outer_instance_dispatch_span``.
112+
# 039 (nested-lineage augmentation, proposal 0045): the LangfuseObserver
113+
# gained prefix-general fan-out-instance dispatch (so a fan-out under a
114+
# serial wrapper parents correctly) and skips shared-parent NODEs in the
115+
# augmentation walk (0045 §3.4 MUST-NOT). Case 3 (fan-out in a serial
116+
# subgraph) is wired via the dedicated hand-built _build_039_graph runner;
117+
# cases 1 + 2 are TEMPORARILY deferred via _DEFERRED_CASES pending the
118+
# shared nested-dispatch-keying fix (see that note).
119+
"039-nested-lineage-augmentation",
122120
}
123121
)
124122

@@ -127,10 +125,24 @@
127125
# ``(fixture_stem, case_name)``. The case-loop in the runner ``continue``s
128126
# past matching cases — NOT ``pytest.skip``, which would skip the whole
129127
# fixture's test invocation and hide the surrounding cases that DO run.
130-
# Currently empty; the harness covers every activated case. Kept as a
131-
# named hook so future per-case deferrals don't need to re-introduce the
132-
# pattern.
133-
_DEFERRED_CASES: frozenset[tuple[str, str]] = frozenset()
128+
_DEFERRED_CASES: frozenset[tuple[str, str]] = frozenset(
129+
{
130+
# 039 cases 1 + 2 are TEMPORARILY deferred pending one deeper observer
131+
# fix shared by both: dispatch keys
132+
# (fan_out_instance_observations / parallel_branches_branch_spans) are
133+
# namespace-local and do NOT encode the enclosing fan-out instance, so a
134+
# dispatch INSIDE an outer fan-out instance collides across instances --
135+
# case 1's inner instance dispatch and case 2's per-branch dispatch both
136+
# reparent the second outer instance's events under the first's dispatch.
137+
# The fix (thread the enclosing fan_out_index_chain / branch_name_chain
138+
# into the dispatch keys, across synthesis + resolution + the
139+
# augmentation walk, in both observers) is its own focused effort + spec
140+
# coordination. Case 3 (single fan-out level under a serial wrapper) does
141+
# not nest dispatches, so it is wired. See _build_039_graph.
142+
("039-nested-lineage-augmentation", "inner_fan_out_augmenter_propagates_to_outer_dispatch_span"),
143+
("039-nested-lineage-augmentation", "parallel_branch_augmenter_propagates_to_outer_fan_out_instance"),
144+
}
145+
)
134146

135147

136148
# Mocks the spec fixture 037 references for ``trace_input_from_state`` /
@@ -469,11 +481,11 @@ async def test_langfuse_fixture(fixture_path: Path) -> None:
469481
if fixture_inner_subgraphs is not None and "inner_subgraphs" not in case:
470482
case["inner_subgraphs"] = fixture_inner_subgraphs
471483
try:
472-
await _run_case(case)
484+
await _run_case(case, fixture_stem=fixture_stem)
473485
except AssertionError as e:
474486
raise AssertionError(f"case {case_name!r}: {e}") from e
475487
else:
476-
await _run_case(spec)
488+
await _run_case(spec, fixture_stem=fixture_stem)
477489

478490

479491
def _has_topology_constructs(case: Mapping[str, Any]) -> bool:
@@ -640,6 +652,151 @@ def _build_inner_subgraph_with_llm(
640652
return builder.compile()
641653

642654

655+
# Fixture 039 (nested-lineage augmentation) declares nested fan-out graphs the
656+
# generic cross-cap adapter can't construct (a fan-out inside a subgraph wrapper
657+
# / another fan-out, and a per-item sub-field as the inner fan-out's items
658+
# source). Each case is hand-built here against the engine's GraphBuilder --
659+
# mirroring the dedicated 044 builder on the OTel side -- then driven through the
660+
# shared observer + assertion path. The expected langfuse_trace in the YAML
661+
# stays the oracle.
662+
_FIXTURE_039 = "039-nested-lineage-augmentation"
663+
664+
665+
def _build_039_graph(
666+
case: Mapping[str, Any],
667+
*,
668+
provider: OpenAIProvider | None,
669+
prompt_manager: PromptManager | None,
670+
) -> tuple[Any, Any]:
671+
"""Dispatch a 039 case to its hand-built graph; return (graph, factory)."""
672+
name = cast("str", case.get("name"))
673+
if name == "fan_out_in_serial_subgraph_augmenter_propagates_to_wrapper_span":
674+
return _build_039_case3(case, provider=provider, prompt_manager=prompt_manager)
675+
raise NotImplementedError(f"039 case not yet wired: {name!r}")
676+
677+
678+
def _build_039_case3(
679+
case: Mapping[str, Any],
680+
*,
681+
provider: OpenAIProvider | None,
682+
prompt_manager: PromptManager | None,
683+
) -> tuple[Any, Any]:
684+
# Case 3: a serial subgraph wrapper (`wrap`) descends into `wrapped_fan_out`,
685+
# whose `pick` fan-out runs per-product; each instance augments note=<id>.
686+
# The wrapper span must carry the augmentation (last-writer) per 0045's
687+
# lineage-aware rule, the fan-out NODE must not.
688+
# The fan-out places each outer product into per_product's item_field slot;
689+
# the augment middleware reads <id> from it. per_product's declared state
690+
# ({picked}) lacks the slot, so inject it (mirrors _synthesize_fan_out_
691+
# aggregation on the generic 029 path).
692+
assert provider is not None, "039 cases declare mock_llm, so the provider must be set"
693+
per_product_spec = copy.deepcopy(cast("dict[str, Any]", case["inner_subgraphs"]["per_product"]))
694+
per_product_spec.setdefault("state", {}).setdefault("fields", {}).setdefault(
695+
"oa_fan_out_item", {"type": "dict", "default": {}}
696+
)
697+
per_product = _build_inner_subgraph_with_llm(
698+
per_product_spec,
699+
provider=provider,
700+
prompt_manager=prompt_manager,
701+
render_variables={},
702+
)
703+
wrap_state_cls = build_state_cls(
704+
"Wrapped039C3",
705+
{
706+
"picks": {"type": "list", "reducer": "append", "default": []},
707+
"products": {"type": "list<dict>", "default": []},
708+
"oa_fan_out_item": {"type": "dict", "default": {}},
709+
},
710+
)
711+
wrap_builder: GraphBuilder[Any] = GraphBuilder(wrap_state_cls)
712+
wrap_builder.set_entry("pick")
713+
wrap_builder.add_fan_out_node(
714+
"pick",
715+
subgraph=per_product,
716+
items_field="products",
717+
item_field="oa_fan_out_item",
718+
collect_field="picked",
719+
target_field="picks",
720+
instance_middleware=[_make_augment_instance_middleware({"note": "id"}, "oa_fan_out_item")],
721+
)
722+
wrap_builder.add_edge("pick", END)
723+
wrapped_fan_out = wrap_builder.compile()
724+
725+
outer_state_cls = build_state_cls(
726+
"Outer039C3",
727+
{"result": {"type": "list", "default": []}, "products": {"type": "list<dict>", "default": []}},
728+
)
729+
outer_builder: GraphBuilder[Any] = GraphBuilder(outer_state_cls)
730+
outer_builder.set_entry("wrap")
731+
outer_builder.add_subgraph_node(
732+
"wrap",
733+
wrapped_fan_out,
734+
ExplicitMapping(inputs={"products": "products"}, outputs={"result": "picks"}),
735+
)
736+
outer_builder.add_edge("wrap", END)
737+
graph = outer_builder.compile()
738+
initial = cast("dict[str, Any]", case.get("initial_state") or {})
739+
return graph, (lambda: outer_state_cls(**initial))
740+
741+
742+
# Proposal 0045 §3.4: a key set via set_invocation_metadata inside a fan-out
743+
# instance / parallel-branches branch lands ONLY on the dispatch ancestors on
744+
# the augmenter's call-stack path -- NOT on the shared fan-out/pb NODE, sibling
745+
# instances, or (inside a dispatch) the Trace. The tree asserter is subset-based
746+
# (extra keys tolerated), which can't catch a MUST-NOT violation, so 039
747+
# additionally enforces that augmented keys absent from an observation's expected
748+
# metadata are absent in the actual. Scoped to 039 via this ContextVar so the
749+
# established subset semantics for the other fixtures are unchanged.
750+
_AUGMENT_KEYS_UNDER_TEST: ContextVar[frozenset[str]] = ContextVar(
751+
"augment_keys_under_test", default=frozenset()
752+
)
753+
754+
755+
def _collect_augment_keys(case: Mapping[str, Any]) -> frozenset[str]:
756+
"""Collect the metadata keys augment directives set, anywhere in the case's
757+
topology (fan-out / parallel-branches augment blocks at any nesting)."""
758+
keys: set[str] = set()
759+
directives = ("augment_metadata_from_field", "augment_metadata_from_outer_item", "augment_metadata")
760+
761+
def _harvest(block: Any) -> None:
762+
if not isinstance(block, dict):
763+
return
764+
for directive in directives:
765+
mapping = cast("dict[str, Any]", block).get(directive)
766+
if isinstance(mapping, dict):
767+
keys.update(cast("dict[str, Any]", mapping).keys())
768+
769+
def _walk(spec: Mapping[str, Any]) -> None:
770+
for node in cast("dict[str, Any]", spec.get("nodes") or {}).values():
771+
if not isinstance(node, dict):
772+
continue
773+
node_dict = cast("dict[str, Any]", node)
774+
_harvest(node_dict.get("fan_out"))
775+
pb = cast("dict[str, Any] | None", node_dict.get("parallel_branches"))
776+
for branch in cast("dict[str, Any]", (pb or {}).get("branches") or {}).values():
777+
_harvest(branch)
778+
for collection in ("subgraphs", "inner_subgraphs"):
779+
for sub in cast("dict[str, Any]", spec.get(collection) or {}).values():
780+
if isinstance(sub, dict):
781+
_walk(cast("Mapping[str, Any]", sub))
782+
783+
_walk(case)
784+
return frozenset(keys)
785+
786+
787+
def _assert_augment_keys_not_leaked(
788+
label: str, actual: Mapping[str, Any], expected: Mapping[str, Any]
789+
) -> None:
790+
# Proposal 0045 §3.4 MUST-NOT: an augmented key absent from the expected
791+
# metadata (a shared fan-out/pb NODE, a sibling, or the Trace inside a
792+
# dispatch) MUST also be absent in the actual. Complements the subset matcher.
793+
for key in _AUGMENT_KEYS_UNDER_TEST.get():
794+
if key not in expected:
795+
assert key not in actual, (
796+
f"{label}: MUST NOT carry augmented key {key!r} (proposal 0045 §3.4); got {actual.get(key)!r}"
797+
)
798+
799+
643800
def _resolve_detached_wrapper_names(case: Mapping[str, Any]) -> frozenset[str]:
644801
"""Translate fixture-level ``detached_subgraphs`` (a list of SUBGRAPH
645802
IDENTITY names) into the set of WRAPPER NODE names the observer keys
@@ -662,7 +819,11 @@ def _resolve_detached_wrapper_names(case: Mapping[str, Any]) -> frozenset[str]:
662819
return frozenset(wrappers)
663820

664821

665-
async def _run_case(case: Mapping[str, Any]) -> None:
822+
async def _run_case(case: Mapping[str, Any], *, fixture_stem: str | None = None) -> None:
823+
# 039 additionally enforces proposal 0045's MUST-NOT scoping (an augmented
824+
# key absent from an observation's expected metadata must be absent in the
825+
# actual); other fixtures keep the established subset semantics.
826+
_AUGMENT_KEYS_UNDER_TEST.set(_collect_augment_keys(case) if fixture_stem == _FIXTURE_039 else frozenset())
666827
# ---- Mock LLM transport (if the graph has an LLM call)
667828
mock_responses = cast("list[dict[str, Any]] | None", case.get("mock_llm"))
668829
transport = _build_mock_llm_handler(mock_responses) if mock_responses else None
@@ -692,7 +853,13 @@ async def _run_case(case: Mapping[str, Any]) -> None:
692853
# ``adapter.build_graph`` machinery for subgraph / fan_out shapes;
693854
# LLM/prompt fixtures (022/023/024) use the simpler hand-rolled
694855
# per-node build that knows about ``calls_llm`` / ``renders_prompt``.
695-
if _has_topology_constructs(case):
856+
if fixture_stem == _FIXTURE_039:
857+
# 039's nested fan-out graphs are hand-built (the generic adapter can't
858+
# construct them); see _build_039_graph.
859+
graph, initial_state_factory = _build_039_graph(
860+
case, provider=provider, prompt_manager=prompt_manager
861+
)
862+
elif _has_topology_constructs(case):
696863
# The topology fixtures (031/032/033) use inner-node test-seam
697864
# directives the cross-capability adapter doesn't translate
698865
# (``update_pure_from_state`` computes a value the assertions
@@ -1325,6 +1492,7 @@ def _assert_trace(
13251492
f"trace.metadata.invocation_id: raw trace.id {trace.id!r} != {expected_invocation_id!r}"
13261493
)
13271494
_assert_metadata_subset("trace.metadata", trace.metadata, expected_metadata)
1495+
_assert_augment_keys_not_leaked("trace.metadata", trace.metadata, expected_metadata)
13281496
# Proposal 0043 (§8.4.1 trace.input/output sourcing). Fixtures that
13291497
# opt in supply these as YAML maps; older fixtures leave them absent.
13301498
if "input" in expected:
@@ -1452,6 +1620,9 @@ def _assert_observation(
14521620
)
14531621
expected_metadata = cast("dict[str, Any]", expected.get("metadata") or {})
14541622
_assert_metadata_subset(f"observation[{actual.name}].metadata", actual.metadata, expected_metadata)
1623+
_assert_augment_keys_not_leaked(
1624+
f"observation[{actual.name}].metadata", actual.metadata, expected_metadata
1625+
)
14551626

14561627
expected_children = cast("list[dict[str, Any]]", expected.get("children") or [])
14571628
actual_children = trace.children_of(actual.id)

0 commit comments

Comments
 (0)