Skip to content

Commit fa95dc2

Browse files
Wire get_invocation_metadata fixtures (043/045/046) (#191)
* Wire get_invocation_metadata fixtures (043/045/046) Wire the get_invocation_metadata read-access family (proposal 0048) into the YAML conformance harness via a hand-built runner -- the cross-cap adapter doesn't model augment_metadata, capture_invocation_metadata_into, retry_middleware, or direct_call. The runner processes a node's in-node directives in YAML key order (043 augments then captures; 045 attempt 1 captures then augments), drives 045's retry via RetryMiddleware + a category-carrying transient, and handles 046's no-graph direct call. Asserts final_state field equality plus the immutability invariant (the read is a MappingProxyType). Move 043/045/046 to _SUPPORTED_FIXTURES; 044 (fan-out scoping) stays a follow-up -- its fan-out collection of per-instance captures is a distinct shape. Test-only. * Make metadata directive parsing strict From CoPilot review of #191: assert known keys instead of silently dropping unrecognized ones, so a fixture-schema change or a typo fails the conformance harness loudly rather than passing false-green. - retry_middleware: assert only max_attempts / classifier are present. - _apply_metadata_directives: raise on any key outside the recognized directives + the structural keys it legitimately skips.
1 parent 5874173 commit fa95dc2

1 file changed

Lines changed: 189 additions & 7 deletions

File tree

tests/conformance/test_observability.py

Lines changed: 189 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
9494
# v0.42.0 — proposal 0050 call-level-retry per-attempt LLM span
9595
# surface. Single-attempt default: one span, attempt_index 0.
9696
"057-llm-attempt-index-single-attempt-default",
97+
# proposal 0048 get_invocation_metadata read access: 043 roundtrip,
98+
# 045 retry-scoping, 046 outside-invocation. 044 (fan-out) is a follow-up.
99+
"043-get-invocation-metadata-roundtrip",
100+
"045-get-invocation-metadata-retry-scoping",
101+
"046-get-invocation-metadata-outside-invocation",
97102
"001-otel-basic-trace",
98103
"002-otel-subgraph-hierarchy",
99104
"003-otel-error-status",
@@ -316,13 +321,10 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
316321
# conformance runner test_observability_langfuse.py -- see
317322
# _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only).
318323
(
319-
(
320-
"043-get-invocation-metadata-roundtrip",
321-
"044-get-invocation-metadata-fan-out-scoping",
322-
"045-get-invocation-metadata-retry-scoping",
323-
"046-get-invocation-metadata-outside-invocation",
324-
),
325-
"proposal 0048 get_invocation_metadata; covered by test_observability_metadata.py",
324+
("044-get-invocation-metadata-fan-out-scoping",),
325+
"proposal 0048 get_invocation_metadata fan-out scoping; covered by "
326+
"test_observability_metadata.py (043/045/046 now wired into "
327+
"_SUPPORTED_FIXTURES; 044's fan-out collection is a follow-up)",
326328
),
327329
# Fixture-harness catch-up tier 1 wired the rest of the 0057/0058
328330
# family into _SUPPORTED_FIXTURES; these three stay here, each blocked
@@ -576,6 +578,12 @@ async def test_observability_fixture(fixture_path: Path) -> None:
576578
"098-langfuse-tool-observation",
577579
}:
578580
await _run_tool_fixture(spec)
581+
elif fixture_id in {
582+
"043-get-invocation-metadata-roundtrip",
583+
"045-get-invocation-metadata-retry-scoping",
584+
"046-get-invocation-metadata-outside-invocation",
585+
}:
586+
await _run_get_invocation_metadata_fixture(spec)
579587
else:
580588
raise AssertionError(f"no driver for supported fixture {fixture_id!r}")
581589

@@ -1390,6 +1398,180 @@ async def _body(_s: Any) -> dict[str, Any]:
13901398
raise AssertionError(f"case {case_name!r}: {e}") from e
13911399

13921400

1401+
# ---------------------------------------------------------------------------
1402+
# Proposal 0048 read access: get_invocation_metadata (fixtures 043 / 045 / 046).
1403+
# Hand-built -- the cross-cap adapter does not model augment_metadata /
1404+
# capture_invocation_metadata_into / retry_middleware / direct_call. Mirrors the
1405+
# unit tests in test_observability_metadata.py.
1406+
# ---------------------------------------------------------------------------
1407+
1408+
1409+
class _MetadataRetryTransient(Exception):
1410+
# The default retry classifier treats provider_rate_limit as retryable; this
1411+
# is the test's "transient_marker_error" for fixture 045.
1412+
category = "provider_rate_limit"
1413+
1414+
1415+
# Keys _apply_metadata_directives accepts: the three handled directives plus the
1416+
# structural keys it legitimately skips (per-attempt scaffolding + the
1417+
# node-level retry directives consumed by _make_metadata_node_body). Any other
1418+
# key raises, so a typo or a new directive fails the harness loudly.
1419+
_METADATA_DIRECTIVE_KEYS = frozenset(
1420+
{
1421+
"augment_metadata",
1422+
"capture_invocation_metadata_into",
1423+
"raises",
1424+
"attempt",
1425+
"succeeds",
1426+
"retry_middleware",
1427+
"per_attempt_behavior",
1428+
}
1429+
)
1430+
1431+
1432+
def _apply_metadata_directives(
1433+
directives: Mapping[str, Any], types_seen: dict[str, type]
1434+
) -> tuple[dict[str, Any], bool]:
1435+
"""Run a node's (or per-attempt's) in-node metadata directives, returning the
1436+
resulting state update and whether the node should then raise."""
1437+
# Directives run IN KEY ORDER -- 043 augments then captures, 045 attempt 1
1438+
# captures then augments, and the YAML key order encodes that. The capture
1439+
# records the read's type so the immutability invariant can verify it was a
1440+
# MappingProxyType. ``raises`` is terminal (a real node body that raises runs
1441+
# nothing after), so stop at it rather than processing later directives.
1442+
from openarmature.observability.metadata import ( # noqa: PLC0415
1443+
get_invocation_metadata,
1444+
set_invocation_metadata,
1445+
)
1446+
1447+
update: dict[str, Any] = {}
1448+
should_raise = False
1449+
for key, val in directives.items():
1450+
assert key in _METADATA_DIRECTIVE_KEYS, f"unrecognized metadata directive {key!r}"
1451+
if key == "augment_metadata":
1452+
set_invocation_metadata(**cast("dict[str, Any]", val))
1453+
elif key == "capture_invocation_metadata_into":
1454+
read = get_invocation_metadata()
1455+
types_seen[cast("str", val)] = type(read)
1456+
update[cast("str", val)] = dict(read)
1457+
elif key == "raises":
1458+
should_raise = True
1459+
break
1460+
return update, should_raise
1461+
1462+
1463+
def _make_metadata_node_body(node_spec: Mapping[str, Any], types_seen: dict[str, type]) -> Any:
1464+
"""Build a node body that runs the node's metadata directives -- attempt-keyed
1465+
when the node declares ``per_attempt_behavior`` (045), else once per call."""
1466+
per_attempt = cast("list[dict[str, Any]] | None", node_spec.get("per_attempt_behavior"))
1467+
if per_attempt is not None:
1468+
by_attempt = {int(b["attempt"]): b for b in per_attempt}
1469+
attempts: list[int] = []
1470+
1471+
async def _retry_body(_s: Any) -> dict[str, Any]:
1472+
n = len(attempts)
1473+
attempts.append(n)
1474+
update, should_raise = _apply_metadata_directives(by_attempt.get(n, {}), types_seen)
1475+
if should_raise:
1476+
raise _MetadataRetryTransient()
1477+
return update
1478+
1479+
return _retry_body
1480+
1481+
async def _body(_s: Any) -> dict[str, Any]:
1482+
update, should_raise = _apply_metadata_directives(node_spec, types_seen)
1483+
if should_raise:
1484+
raise _MetadataRetryTransient()
1485+
return update
1486+
1487+
return _body
1488+
1489+
1490+
async def _run_get_invocation_metadata_fixture(spec: Mapping[str, Any]) -> None:
1491+
"""Drive every case of a get_invocation_metadata fixture (043 / 045 / 046)."""
1492+
for case in cast("list[dict[str, Any]]", spec["cases"]):
1493+
await _run_get_invocation_metadata_case(case)
1494+
1495+
1496+
async def _run_get_invocation_metadata_case(case: Mapping[str, Any]) -> None:
1497+
"""Assert one case: a bare get_invocation_metadata() call (046), or a graph
1498+
whose final_state captures the in-node reads (043 / 045)."""
1499+
from types import MappingProxyType # noqa: PLC0415
1500+
1501+
from openarmature.observability.metadata import get_invocation_metadata # noqa: PLC0415
1502+
1503+
expected = cast("dict[str, Any]", case["expected"])
1504+
invariants = cast("dict[str, Any]", expected.get("invariants") or {})
1505+
1506+
# Fixture 046: a bare get_invocation_metadata() call outside any invocation.
1507+
direct_call = cast("dict[str, Any] | None", case.get("direct_call"))
1508+
if direct_call is not None:
1509+
result = get_invocation_metadata()
1510+
dc = cast("dict[str, Any]", expected["direct_call_result"])
1511+
assert dict(result) == cast("dict[str, Any]", dc.get("value") or {}), (
1512+
f"direct_call value {dict(result)!r} != {dc.get('value')!r}"
1513+
)
1514+
if dc.get("type") == "immutable_mapping":
1515+
assert isinstance(result, MappingProxyType), "direct_call result is not an immutable mapping"
1516+
# ``exception: null`` -- reaching here means the call did not raise.
1517+
return
1518+
1519+
# Fixtures 043 / 045: build the graph, invoke with caller metadata, assert
1520+
# final_state field equality + the immutability invariant.
1521+
from openarmature.graph import END, GraphBuilder # noqa: PLC0415
1522+
from openarmature.graph.middleware import RetryConfig, RetryMiddleware # noqa: PLC0415
1523+
1524+
from .adapter import build_state_cls # noqa: PLC0415
1525+
1526+
types_seen: dict[str, type] = {}
1527+
state_cls = build_state_cls("MetadataFixtureState", cast("dict[str, Any]", case["state"]["fields"]))
1528+
builder = GraphBuilder(state_cls)
1529+
for node_name, node_spec_any in cast("dict[str, Any]", case["nodes"]).items():
1530+
node_spec = cast("dict[str, Any]", node_spec_any)
1531+
body = _make_metadata_node_body(node_spec, types_seen)
1532+
retry_cfg = cast("dict[str, Any] | None", node_spec.get("retry_middleware"))
1533+
if retry_cfg is not None:
1534+
# Fail loud if the fixture grows a semantics-bearing knob the runner
1535+
# doesn't model, rather than silently dropping it.
1536+
unexpected = set(retry_cfg) - {"max_attempts", "classifier"}
1537+
assert not unexpected, f"retry_middleware: unhandled keys {sorted(unexpected)}"
1538+
# The fixture's abstract ``classifier`` (transient_marker) maps to the
1539+
# default retry classifier: _MetadataRetryTransient carries the
1540+
# provider_rate_limit category, which that classifier retries. Only
1541+
# max_attempts is read off the directive.
1542+
builder.add_node(
1543+
node_name,
1544+
body,
1545+
middleware=[
1546+
RetryMiddleware(
1547+
RetryConfig(max_attempts=int(retry_cfg["max_attempts"]), backoff=lambda _i: 0.0)
1548+
)
1549+
],
1550+
)
1551+
else:
1552+
builder.add_node(node_name, body)
1553+
for edge in cast("list[dict[str, str]]", case["edges"]):
1554+
target = END if edge["to"] == "END" else edge["to"]
1555+
builder.add_edge(edge["from"], target)
1556+
builder.set_entry(cast("str", case["entry"]))
1557+
graph = builder.compile()
1558+
1559+
final = await graph.invoke(
1560+
state_cls(**cast("dict[str, Any]", case.get("initial_state") or {})),
1561+
metadata=cast("dict[str, Any] | None", case.get("caller_metadata")),
1562+
)
1563+
await graph.drain()
1564+
1565+
for field_name, expected_value in cast("dict[str, Any]", expected.get("final_state") or {}).items():
1566+
actual = getattr(final, field_name)
1567+
assert actual == expected_value, f"final_state.{field_name}: {actual!r} != {expected_value!r}"
1568+
1569+
if invariants.get("read_returns_immutable_mapping"):
1570+
assert types_seen and all(t is MappingProxyType for t in types_seen.values()), (
1571+
f"read_returns_immutable_mapping: captured read types {types_seen!r} not all MappingProxyType"
1572+
)
1573+
1574+
13931575
def _normalize_attr_value(value: Any) -> Any:
13941576
"""OTel attribute values can be tuple or list shapes for sequence
13951577
types depending on how they were set; normalize for comparison."""

0 commit comments

Comments
 (0)