Skip to content

Commit cc6d258

Browse files
Extend conformance harness for drain fixtures
Adds the slow-observer directive (sleep_ms_per_event, int form or dict form with first_invocation / subsequent_invocations keys), drain timeout passthrough (invoke.drain.timeout_seconds), DrainSummary assertions (timeout_reached, undelivered_count, undelivered_count_min), invariants block (drain_returned_within_timeout, graph_state_intact_after_timeout, drain_waited_for_all_events), and multi-invocation invocations: array handling for fixture 024's cross-invocation cleanliness contract. Per-event observer comparison switches from full equality to a key- subset check so fixtures that omit pre_state / post_state (the drain fixtures) do not fail on incidental keys present in the recorded event. Pydantic fixture-parsing models extended for the new directives so fixture parse-tests cover the new shapes.
1 parent 94e0bee commit cc6d258

5 files changed

Lines changed: 256 additions & 6 deletions

File tree

tests/conformance/adapter.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,13 +764,24 @@ class ObserverFixture:
764764
`phases` is the optional subscription set parsed from the fixture's
765765
YAML. None means "no `phases:` key was present" — the harness leaves
766766
the engine to default to both phases.
767+
768+
`sleep_ms_per_event` configures the slow-observer directive (proposal
769+
0010 §6 Drain conformance). When `None`, the observer runs at full
770+
speed. An int means a constant sleep per event. A dict with
771+
`first_invocation` / `subsequent_invocations` keys is invocation-
772+
counter-aware: the first invocation through this observer uses the
773+
`first_invocation` value, every subsequent invocation uses the
774+
`subsequent_invocations` value. `invocation_counter` is bumped by the
775+
harness between invocations.
767776
"""
768777

769778
name: str
770779
attach: str # "graph" | "invocation"
771780
target: str # "outer" | <subgraph name>
772781
behavior: str # "record" | "raise"
773782
phases: frozenset[str] | None = None
783+
sleep_ms_per_event: int | Mapping[str, int] | None = None
784+
invocation_counter: list[int] = field(default_factory=lambda: [0])
774785
events: list[dict[str, Any]] = field(default_factory=list[dict[str, Any]])
775786

776787

@@ -796,6 +807,25 @@ def _record_event(event: NodeEvent) -> dict[str, Any]:
796807
return rec
797808

798809

810+
def _resolve_sleep_ms(fixture: ObserverFixture) -> int:
811+
"""Resolve the per-event sleep duration in ms for the slow-observer
812+
directive (proposal 0010 §6 Drain). `None` and `0` mean no sleep;
813+
an int form is constant; a dict form selects by `invocation_counter`.
814+
"""
815+
spec = fixture.sleep_ms_per_event
816+
if spec is None:
817+
return 0
818+
if isinstance(spec, int):
819+
return spec
820+
# Dict form with first_invocation / subsequent_invocations keys —
821+
# used by fixture 024 to slow only the first invocation so the
822+
# second drain runs cleanly. `invocation_counter[0]` is bumped by
823+
# the harness between `compiled.invoke()` calls.
824+
if fixture.invocation_counter[0] == 0:
825+
return int(spec.get("first_invocation", 0))
826+
return int(spec.get("subsequent_invocations", 0))
827+
828+
799829
def make_observer_fn(
800830
fixture: ObserverFixture,
801831
delivery: list[tuple[str, int, str]],
@@ -808,9 +838,17 @@ def make_observer_fn(
808838
`delivery_order`). Raising observers record + append before raising,
809839
so the engine's error isolation can be verified by checking that
810840
subsequent observers/events still get through.
841+
842+
Honors `fixture.sleep_ms_per_event` per the proposal 0010 slow-
843+
observer directive: each event awaits `asyncio.sleep(ms / 1000)`
844+
BEFORE recording, so a drain timeout that cancels mid-sleep leaves
845+
the event unrecorded and the counter shows it as undelivered.
811846
"""
812847

813848
async def observer(event: NodeEvent) -> None:
849+
sleep_ms = _resolve_sleep_ms(fixture)
850+
if sleep_ms > 0:
851+
await asyncio.sleep(sleep_ms / 1000.0)
814852
delivery.append((fixture.name, event.step, event.phase))
815853
fixture.events.append(_record_event(event))
816854
if fixture.behavior == "raise":

tests/conformance/harness/directives.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,13 +596,19 @@ class ObserverSpec(_ForbidExtras):
596596
``raise`` (raise to verify error isolation).
597597
- ``phases`` (optional, spec v0.6 §6) — subset of ``{"started",
598598
"completed"}`` for per-observer phase subscription.
599+
- ``sleep_ms_per_event`` (proposal 0010 §6 Drain conformance) — the
600+
slow-observer directive. An int means a constant sleep per
601+
event; a dict with ``first_invocation`` / ``subsequent_invocations``
602+
keys selects per invocation index (used by fixture 024 to slow
603+
only the first invocation).
599604
"""
600605

601606
name: str
602607
attach: Literal["graph", "invocation"]
603608
target: str
604609
behavior: Literal["record", "raise"]
605610
phases: list[Literal["started", "completed"]] | None = None
611+
sleep_ms_per_event: int | dict[str, int] | None = None
606612

607613

608614
# ---------------------------------------------------------------------------

tests/conformance/harness/expectations.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ class GraphEngineExpected(_ForbidExtras):
5050
observer_event_invariants: dict[str, Any] | None = None
5151
# 020 — proposal-0012 fixture: assertions about edge-resolution
5252
# failure event shapes. Permissive dict until Phase 1.
53+
# 022–024 (proposal 0010 §6 Drain) — drain-summary invariants
54+
# (drain_returned_within_timeout, graph_state_intact_after_timeout,
55+
# drain_waited_for_all_events) ride on the same field.
5356
invariants: dict[str, Any] | None = None
57+
# 022–025 (proposal 0010 §6 Drain) — DrainSummary assertions
58+
# (timeout_reached, undelivered_count, undelivered_count_min).
59+
drain_summary: dict[str, Any] | None = None
5460
# 015 — invoke() returns normally; obs_raiser's exceptions surface to
5561
# warnings rather than propagate.
5662
no_propagated_error: bool | None = None

tests/conformance/harness/fixtures.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,23 @@ class GraphFixture(_ForbidExtras):
243243
provider: dict[str, Any] | None = None
244244
mock_llm: list[MockResponse] | None = None
245245
caller_global_otel_active: bool | None = None
246-
invocations: int | None = None
246+
# Two shapes:
247+
# - ``int``: run-count for observability multi-run fixtures (legacy).
248+
# - ``list[dict]``: per-invocation specs for proposal 0010 §6 Drain
249+
# cross-invocation cleanliness fixtures (e.g., fixture 024). Each
250+
# entry carries its own ``initial_state``, ``drain``, ``expected``.
251+
invocations: int | list[dict[str, Any]] | None = None
252+
# Proposal 0010 §6 Drain — the ``invoke`` directive wraps the
253+
# ``drain.timeout_seconds`` parameter for single-invocation
254+
# drain-timeout fixtures (022, 023, 025). Multi-invocation fixture
255+
# 024 uses the ``invocations`` array above instead.
256+
invoke: dict[str, Any] | None = None
257+
# Proposal 0010 §6 Drain — top-level invariants applied across all
258+
# invocations of a multi-invocation fixture (e.g.,
259+
# ``second_invocation_drain_independent_of_first`` on fixture 024).
260+
# Single-invocation fixtures put their invariants under
261+
# ``expected.invariants`` (the field already on ExpectedBlock).
262+
invariants: dict[str, Any] | None = None
247263

248264

249265
# ---------------------------------------------------------------------------

tests/conformance/test_conformance.py

Lines changed: 189 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from __future__ import annotations
99

10+
import time
1011
from collections.abc import Mapping
1112
from pathlib import Path
1213
from typing import Any, cast
@@ -244,6 +245,7 @@ async def _run_runtime_case(spec: Mapping[str, Any], fixture_id: str) -> None:
244245
target=o["target"],
245246
behavior=o["behavior"],
246247
phases=phases,
248+
sleep_ms_per_event=o.get("sleep_ms_per_event"),
247249
)
248250
observer_fixtures[ofx.name] = ofx
249251
obs = make_observer_fn(ofx, delivery)
@@ -281,6 +283,112 @@ async def _run_runtime_case(spec: Mapping[str, Any], fixture_id: str) -> None:
281283
assert built.trace == expected_err["execution_order"]
282284
return
283285

286+
# Proposal 0010 §6 Drain — multi-invocation fixture form
287+
# (`invocations:` array; fixture 024). Each entry runs as its own
288+
# `invoke` + `drain` against the same compiled graph + observers, so
289+
# the cross-invocation cleanliness contract can be asserted end-to-
290+
# end. Observers' `invocation_counter` bumps between entries so the
291+
# dict-form `sleep_ms_per_event` can vary per invocation.
292+
if "invocations" in spec:
293+
for inv_idx, inv in enumerate(cast("list[dict[str, Any]]", spec["invocations"])):
294+
inv_initial = built.initial_state(inv.get("initial_state", {}))
295+
drain_block: dict[str, Any] = inv.get("drain") or {}
296+
ts = drain_block.get("timeout_seconds")
297+
inv_timeout: float | None = float(ts) if ts is not None else None
298+
inv_expected: dict[str, Any] = inv.get("expected") or {}
299+
300+
# Reset per-invocation observer state and bump the counter
301+
# so dict-form `sleep_ms_per_event` selects the right value.
302+
for ofx in observer_fixtures.values():
303+
if inv_idx > 0:
304+
ofx.invocation_counter[0] = inv_idx
305+
ofx.events.clear()
306+
if inv_idx > 0:
307+
# Drop the per-invocation delivery trace; subsequent
308+
# invocations assert against a fresh recorder.
309+
delivery.clear()
310+
# `built.trace` is shared across the fixture; clear it
311+
# between invocations so per-invocation execution_order
312+
# assertions don't accumulate.
313+
built.trace.clear()
314+
315+
inv_final = await compiled.invoke(inv_initial, observers=invocation_observers)
316+
inv_drain_start = time.monotonic()
317+
inv_drain_summary = await compiled.drain(timeout=inv_timeout)
318+
inv_drain_elapsed = time.monotonic() - inv_drain_start
319+
320+
if "final_state" in inv_expected:
321+
assert inv_final.model_dump() == inv_expected["final_state"], (
322+
f"invocation {inv_idx} final_state mismatch"
323+
)
324+
if "execution_order" in inv_expected:
325+
assert built.trace == inv_expected["execution_order"], (
326+
f"invocation {inv_idx} execution_order mismatch"
327+
)
328+
329+
ds_expected: dict[str, Any] | None = inv_expected.get("drain_summary")
330+
if ds_expected is not None:
331+
if "timeout_reached" in ds_expected:
332+
assert inv_drain_summary.timeout_reached == ds_expected["timeout_reached"], (
333+
f"invocation {inv_idx} drain_summary.timeout_reached: "
334+
f"actual={inv_drain_summary.timeout_reached}, "
335+
f"expected={ds_expected['timeout_reached']}"
336+
)
337+
if "undelivered_count" in ds_expected:
338+
assert inv_drain_summary.undelivered_count == ds_expected["undelivered_count"], (
339+
f"invocation {inv_idx} drain_summary.undelivered_count: "
340+
f"actual={inv_drain_summary.undelivered_count}, "
341+
f"expected={ds_expected['undelivered_count']}"
342+
)
343+
if "undelivered_count_min" in ds_expected:
344+
assert inv_drain_summary.undelivered_count >= ds_expected["undelivered_count_min"], (
345+
f"invocation {inv_idx} drain_summary.undelivered_count below min: "
346+
f"actual={inv_drain_summary.undelivered_count}, "
347+
f"min={ds_expected['undelivered_count_min']}"
348+
)
349+
350+
if "observer_events" in inv_expected:
351+
obs_events_map = cast("dict[str, list[dict[str, Any]]]", inv_expected["observer_events"])
352+
for name, expected_events in obs_events_map.items():
353+
actual = observer_fixtures[name].events
354+
normalized = [normalize_expected_event(ev) for ev in expected_events]
355+
assert len(actual) == len(normalized), (
356+
f"invocation {inv_idx} observer event count mismatch for {name!r}: "
357+
f"actual={len(actual)}, expected={len(normalized)}"
358+
)
359+
for i, (a, e) in enumerate(zip(actual, normalized, strict=True)):
360+
for key, expected_value in e.items():
361+
assert key in a, (
362+
f"invocation {inv_idx} observer {name!r} event {i} "
363+
f"missing key {key!r}: actual={a}"
364+
)
365+
assert a[key] == expected_value, (
366+
f"invocation {inv_idx} observer {name!r} event {i} "
367+
f"key {key!r} mismatch: actual={a[key]!r}, expected={expected_value!r}"
368+
)
369+
370+
# Per-invocation invariants (e.g.,
371+
# `drain_returned_within_timeout` on the timed first
372+
# invocation in fixture 024).
373+
inv_invariants: dict[str, Any] = inv_expected.get("invariants") or {}
374+
if inv_invariants.get("drain_returned_within_timeout"):
375+
assert inv_timeout is not None
376+
assert inv_drain_elapsed < inv_timeout + 0.4, (
377+
f"invocation {inv_idx} drain returned outside timeout window: "
378+
f"elapsed={inv_drain_elapsed:.3f}s, timeout={inv_timeout}s"
379+
)
380+
381+
# Top-level invariants (e.g.,
382+
# `second_invocation_drain_independent_of_first` on fixture 024)
383+
# apply after all invocations complete.
384+
top_invariants: dict[str, Any] = spec.get("invariants") or {}
385+
if top_invariants.get("second_invocation_drain_independent_of_first"):
386+
# The fact that we reached this point with all per-invocation
387+
# assertions passing IS the proof of cross-invocation
388+
# independence; the assertion is structural.
389+
assert len(compiled._active_workers) == 0
390+
return
391+
284392
expected = spec["expected"]
285393

286394
# Observer-fixture-with-error (014): the run is expected to raise, and
@@ -295,22 +403,98 @@ async def _run_runtime_case(spec: Mapping[str, Any], fixture_id: str) -> None:
295403
if "message" in nested_error:
296404
assert str(excinfo.value.__cause__) == nested_error["message"]
297405
else:
298-
# Happy path (001–006, 011, 012, 013, 015).
406+
# Happy path (001–006, 011, 012, 013, 015, 022–025).
407+
# `invoke.drain.timeout_seconds` is the proposal 0010 drain
408+
# timeout directive; absent for legacy fixtures, present for
409+
# 022/023/025. The captured `drain_summary` is asserted below.
410+
invoke_block: dict[str, Any] = spec.get("invoke") or {}
411+
drain_block_raw = invoke_block.get("drain")
412+
timeout: float | None
413+
if isinstance(drain_block_raw, dict):
414+
drain_block_typed = cast("dict[str, Any]", drain_block_raw)
415+
ts = drain_block_typed.get("timeout_seconds")
416+
timeout = float(ts) if ts is not None else None
417+
else:
418+
timeout = None
299419
final = await compiled.invoke(initial, observers=invocation_observers)
300-
await compiled.drain()
420+
# Bracket the drain call to assert the `drain_returned_within_timeout`
421+
# invariant when the fixture declares it.
422+
drain_start = time.monotonic()
423+
drain_summary = await compiled.drain(timeout=timeout)
424+
drain_elapsed = time.monotonic() - drain_start
301425
if "final_state" in expected:
302426
assert final.model_dump() == expected["final_state"]
303427
if "execution_order" in expected:
304428
assert built.trace == expected["execution_order"]
305429

306-
# Observer event assertions (012–016, 018).
430+
# Proposal 0010 §6 Drain — DrainSummary assertions. The fixture
431+
# MAY assert exact `undelivered_count` or a lower-bound
432+
# `undelivered_count_min` (timing-dependent fixtures use min).
433+
ds_expected: dict[str, Any] | None = expected.get("drain_summary")
434+
if ds_expected is not None:
435+
if "timeout_reached" in ds_expected:
436+
assert drain_summary.timeout_reached == ds_expected["timeout_reached"], (
437+
f"drain_summary.timeout_reached mismatch: "
438+
f"actual={drain_summary.timeout_reached}, expected={ds_expected['timeout_reached']}"
439+
)
440+
if "undelivered_count" in ds_expected:
441+
assert drain_summary.undelivered_count == ds_expected["undelivered_count"], (
442+
f"drain_summary.undelivered_count mismatch: "
443+
f"actual={drain_summary.undelivered_count}, expected={ds_expected['undelivered_count']}"
444+
)
445+
if "undelivered_count_min" in ds_expected:
446+
assert drain_summary.undelivered_count >= ds_expected["undelivered_count_min"], (
447+
f"drain_summary.undelivered_count below min: "
448+
f"actual={drain_summary.undelivered_count}, "
449+
f"min={ds_expected['undelivered_count_min']}"
450+
)
451+
452+
# Proposal 0010 §6 Drain — invariants block. Each invariant flag
453+
# is a fixture-level assertion the harness verifies separately
454+
# from drain_summary.
455+
invariants: dict[str, Any] = expected.get("invariants") or {}
456+
if invariants.get("drain_returned_within_timeout"):
457+
assert timeout is not None, "drain_returned_within_timeout invariant requires a timeout"
458+
# Allow generous slack for cancellation settlement + CI
459+
# scheduler variance — gather(return_exceptions=True) on
460+
# cancelled workers settles within an event-loop tick.
461+
assert drain_elapsed < timeout + 0.4, (
462+
f"drain returned outside timeout window: elapsed={drain_elapsed:.3f}s, timeout={timeout}s"
463+
)
464+
if invariants.get("graph_state_intact_after_timeout"):
465+
# `_active_workers` cleaned after cancelled workers settled.
466+
assert len(compiled._active_workers) == 0, (
467+
f"graph state not clean after timeout: {len(compiled._active_workers)} workers remaining"
468+
)
469+
if invariants.get("drain_waited_for_all_events"):
470+
# No timeout supplied; drain blocked until all observer
471+
# work completed. The summary already asserts undelivered=0;
472+
# this invariant adds a positive lower-bound on duration
473+
# (drain took at least most of the observer's work).
474+
assert drain_summary.undelivered_count == 0
475+
assert drain_summary.timeout_reached is False
476+
477+
# Observer event assertions (012–016, 018, 022–025).
478+
# Per-event comparison projects the recorded event down to the keys
479+
# the fixture specifies. Fixtures that exercise state machinery
480+
# (012–016, 018) include `pre_state` / `post_state`; drain fixtures
481+
# (023, 025) assert only on phase/step/namespace/node_name shape and
482+
# MUST NOT fail because the recorded event happens to carry state.
307483
if "observer_events" in expected:
308484
for name, expected_events in expected["observer_events"].items():
309485
actual = observer_fixtures[name].events
310486
normalized = [normalize_expected_event(ev) for ev in expected_events]
311-
assert actual == normalized, (
312-
f"observer events mismatch for {name!r}: actual={actual}, expected={normalized}"
487+
assert len(actual) == len(normalized), (
488+
f"observer event count mismatch for {name!r}: "
489+
f"actual={len(actual)}, expected={len(normalized)}"
313490
)
491+
for i, (a, e) in enumerate(zip(actual, normalized, strict=True)):
492+
for key, expected_value in e.items():
493+
assert key in a, f"observer {name!r} event {i} missing key {key!r}: actual={a}"
494+
assert a[key] == expected_value, (
495+
f"observer {name!r} event {i} key {key!r} mismatch: "
496+
f"actual={a[key]!r}, expected={expected_value!r}"
497+
)
314498

315499
if "delivery_order" in expected:
316500
expected_delivery = [(d["observer"], d["step"], d["phase"]) for d in expected["delivery_order"]]

0 commit comments

Comments
 (0)