From 3d7de3d68054e4f1eb7fff9345ff493e993a6336 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 5 May 2026 00:03:03 -0700 Subject: [PATCH 1/4] graph: spec v0.6 observer pair model (proposal 0005) Replace single-event-per-attempt observer hooks with started/completed pairs. Each node attempt now produces two NodeEvents sharing a step: the started event fires before the wrapped node runs, the completed event fires after the merge succeeds (with post_state) or after the node, reducer, or state validation fails (with error). Adds per-observer phase subscription via a new public SubscribedObserver dataclass. attach_observer accepts an optional phases= kwarg; invoke accepts Observer | SubscribedObserver in its observers list. Empty phase sets raise ValueError at registration time. The delivery worker filters by event.phase against each observer's subscribed phases. NodeEvent gains attempt_index (default 0 until retry middleware lands) and fan_out_index (default None until fan-out runtime lands). Conformance fixtures 012-016 and 018 now pass; 017 still skips pending fan-out runtime (proposal 0005 pipeline-utilities side). --- src/openarmature/graph/__init__.py | 3 +- src/openarmature/graph/compiled.py | 94 ++++++++++++------ src/openarmature/graph/events.py | 39 +++++--- src/openarmature/graph/observer.py | 114 ++++++++++++++++----- tests/conformance/adapter.py | 30 ++++-- tests/conformance/test_conformance.py | 84 ++++++++-------- tests/unit/test_observer.py | 137 ++++++++++++++++++++------ 7 files changed, 359 insertions(+), 142 deletions(-) diff --git a/src/openarmature/graph/__init__.py b/src/openarmature/graph/__init__.py index b051c40..5e98284 100644 --- a/src/openarmature/graph/__init__.py +++ b/src/openarmature/graph/__init__.py @@ -27,7 +27,7 @@ ) from .events import NodeEvent from .nodes import FunctionNode, Node -from .observer import Observer, RemoveHandle +from .observer import Observer, RemoveHandle, SubscribedObserver from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy from .reducers import Reducer, append, last_write_wins, merge from .state import State @@ -64,6 +64,7 @@ "StateValidationError", "StaticEdge", "SubgraphNode", + "SubscribedObserver", "UnreachableNode", "append", "last_write_wins", diff --git a/src/openarmature/graph/compiled.py b/src/openarmature/graph/compiled.py index cd3a028..b201f64 100644 --- a/src/openarmature/graph/compiled.py +++ b/src/openarmature/graph/compiled.py @@ -8,12 +8,13 @@ Per spec §4 Error semantics: node, edge, reducer, and routing errors carry recoverable state; state validation errors do not. -Per spec v0.3.0 §6 Observer hooks: between merge and edge evaluation, the -engine dispatches a `NodeEvent` for the just-completed node onto the -invocation's delivery queue. On node/reducer/state-validation failure, the -event is dispatched (with `error` populated) before the failure propagates. -Routing errors do NOT produce their own event — they arise after the -preceding node's event has already been dispatched. +Per spec v0.6.0 §6 Observer hooks: each node attempt produces a +started/completed event PAIR. The engine dispatches the started event +before invoking the wrapped node function and the completed event after +the reducer merge succeeds (with `post_state` populated) or after the +node, reducer, or state validation fails (with `error` populated). +Routing errors do NOT produce their own event pair — they arise after +the preceding node's completed event has already been dispatched. `CompiledGraph[StateT]` and `_merge_partial[StateT]` carry the concrete state subclass through to `invoke()`'s return type, so consumers don't need @@ -42,6 +43,8 @@ _DRAIN_SENTINEL, Observer, RemoveHandle, + SubscribedObserver, + _coerce_subscribed, _dispatch, _InvocationContext, _QueuedItem, @@ -113,31 +116,41 @@ class CompiledGraph[StateT: State]: # Observer plumbing — see attach_observer/drain. Mutable on a frozen # dataclass: the list reference is fixed but its contents change. # Parameterized factories so pyright infers the element types. - _attached_observers: list[Observer] = field(default_factory=list[Observer]) + _attached_observers: list[SubscribedObserver] = field(default_factory=list[SubscribedObserver]) # `set` (not list) so a per-task `add_done_callback(self._active_workers.discard)` # auto-removes completed workers — long-running services that never call # drain() don't accumulate completed Task references indefinitely. _active_workers: set[asyncio.Task[None]] = field(default_factory=set[asyncio.Task[None]]) # ------------------------------------------------------------------ - # Observer registration (spec v0.3.0 §6) + # Observer registration (spec v0.6.0 §6) # ------------------------------------------------------------------ - def attach_observer(self, observer: Observer) -> RemoveHandle: + def attach_observer( + self, + observer: Observer, + *, + phases: Iterable[str] | None = None, + ) -> RemoveHandle: """Register a graph-attached observer. - Per spec v0.3.0 §6: graph-attached observers fire on every invocation + Per spec v0.6.0 §6: graph-attached observers fire on every invocation of this graph until removed — including when this graph runs as a subgraph inside a parent. Returns a `RemoveHandle` whose `.remove()` method detaches the observer; idempotent. + `phases` selects the phase strings (`"started"`, `"completed"`) the + observer subscribes to; default is both. An empty `phases` set + raises `ValueError` at registration time. + Per spec: changes to the registered set during a graph run do NOT take effect until the next invocation. The set of observers delivering events for an in-flight invocation is fixed at the point the invocation begins. """ - self._attached_observers.append(observer) - return RemoveHandle(_observers=self._attached_observers, _observer=observer) + subscribed = _coerce_subscribed(observer, phases=phases) + self._attached_observers.append(subscribed) + return RemoveHandle(_observers=self._attached_observers, _observer=subscribed) async def drain(self) -> None: """Await delivery of every observer event produced by prior @@ -166,15 +179,19 @@ async def drain(self) -> None: async def invoke( self, initial_state: StateT, - observers: Iterable[Observer] | None = None, + observers: Iterable[Observer | SubscribedObserver] | None = None, ) -> StateT: """Run the graph from `initial_state` to END and return the final state. Optional `observers` are invocation-scoped — they fire only for this run, after all graph-attached observers (including subgraph-attached - ones for events originating in subgraphs) per spec v0.3.0 §6. + ones for events originating in subgraphs) per spec v0.6.0 §6. + + Each entry in `observers` may be either a bare `Observer` callable + (subscribes to both phases) or a `SubscribedObserver` wrapping an + observer with an explicit `phases` set. - Per spec v0.3.0 §6: this method returns as soon as the graph + Per spec v0.6.0 §6: this method returns as soon as the graph execution loop completes, regardless of whether the observer delivery queue has finished processing every dispatched event. Use `await compiled.drain()` if you need delivery-completion guarantees. @@ -182,7 +199,7 @@ async def invoke( Raises one of the runtime error categories from spec §4 on failure. """ - invocation_scoped = tuple(observers) if observers else () + invocation_scoped = tuple(_coerce_subscribed(o) for o in (observers or ())) queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() context = _InvocationContext( queue=queue, @@ -271,53 +288,69 @@ async def _step_function_node( state: StateT, context: _InvocationContext, ) -> StateT: - """Run one function-node step: take a step, run, merge, dispatch. - - Dispatches a `NodeEvent` exactly once per call: - - On run failure (NodeException): event with error populated. - - On merge failure (ReducerError or StateValidationError): event with - error populated; the original error propagates unchanged after. - - On success: event with post_state populated, then return. + """Run one function-node step: take a step, dispatch started, run, + merge, dispatch completed. + + Per spec v0.6.0 §6: each attempt produces a started/completed pair. + Both events share the same `step`. The completed event carries + `post_state` on success, or `error` on failure (one of run, reducer, + or state-validation). The completed event is dispatched before the + failure propagates. """ step = context.take_step() namespace = context.namespace_prefix + (current,) pre_state = state + self._dispatch_started(context, current, namespace, step, pre_state) + try: partial = await node.run(state) except Exception as e: wrapped = NodeException(node_name=current, cause=e, recoverable_state=state) - self._dispatch_failure_event(context, current, namespace, step, pre_state, wrapped) + self._dispatch_completed(context, current, namespace, step, pre_state, error=wrapped) raise wrapped from e try: new_state = _merge_partial(state, partial, self.reducers, current) except (ReducerError, StateValidationError) as e: - self._dispatch_failure_event(context, current, namespace, step, pre_state, e) + self._dispatch_completed(context, current, namespace, step, pre_state, error=e) raise + self._dispatch_completed(context, current, namespace, step, pre_state, post_state=new_state) + return new_state + + @staticmethod + def _dispatch_started( + context: _InvocationContext, + current: str, + namespace: tuple[str, ...], + step: int, + pre_state: State, + ) -> None: _dispatch( context, NodeEvent( node_name=current, namespace=namespace, step=step, + phase="started", pre_state=pre_state, - post_state=new_state, + post_state=None, error=None, parent_states=context.parent_states_prefix, ), ) - return new_state @staticmethod - def _dispatch_failure_event( + def _dispatch_completed( context: _InvocationContext, current: str, namespace: tuple[str, ...], step: int, pre_state: State, - error: RuntimeGraphError, + *, + post_state: State | None = None, + error: RuntimeGraphError | None = None, ) -> None: _dispatch( context, @@ -325,8 +358,9 @@ def _dispatch_failure_event( node_name=current, namespace=namespace, step=step, + phase="completed", pre_state=pre_state, - post_state=None, + post_state=post_state, error=error, parent_states=context.parent_states_prefix, ), diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index 3064af8..7389ec2 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -1,14 +1,16 @@ """Node-boundary observer events. -Per spec v0.3.0 §6 (proposal 0003): a NodeEvent is delivered to registered -observers once per node execution, carrying enough context to reconstruct -where in the (potentially nested) execution path the node sat and what the -state looked like before/after the node's update merged. +Per spec v0.6.0 §6 (proposal 0005): each node attempt produces a +started/completed event PAIR. The engine dispatches the started event +before invoking the wrapped node function and the completed event after +the reducer merge succeeds (with `post_state` populated) or after the +node, reducer, or state validation fails (with `error` populated). Frozen dataclass — observers receive a snapshot, not a live handle. """ from dataclasses import dataclass +from typing import Literal from .errors import RuntimeGraphError from .state import State @@ -18,8 +20,12 @@ class NodeEvent: """A single node-boundary event delivered to observers. - Per spec v0.3.0 §6: + Per spec v0.6.0 §6: + - `phase` is `"started"` (dispatched before the node runs) or + `"completed"` (dispatched after the node returns or raises and the + merge runs/fails). Each node attempt produces exactly one of each + in that order. - `node_name` is the name under which this node was registered in its immediate containing graph. - `namespace` is an ordered sequence of node names from the outermost @@ -28,28 +34,39 @@ class NodeEvent: extends. - `step` is a monotonically-increasing counter starting at 0, scoped to a single outermost-invocation. Subgraph-internal nodes increment - the same counter. + the same counter. The started/completed pair for one attempt share + the same step. - `pre_state` is the state the node received, before reducer merge. + Populated on both phases (identical across the pair). - `post_state` is the state after the node's partial update merged - successfully. Populated only on success. + successfully. Populated only on `completed` events that succeeded. - `error` is the wrapped runtime error (NodeException, ReducerError, - or StateValidationError) when the node failed. Read `event.error.category` - for the spec category identifier and `event.error.__cause__` for the - original user/framework exception. Populated only on failure. + or StateValidationError) when the node failed. Populated only on + `completed` events that failed. - `parent_states` carries one state snapshot per containing graph, outermost first; for a node in the outermost graph it's an empty tuple. Invariant: `len(parent_states) == len(namespace) - 1`. + - `attempt_index` is the 0-based index of this attempt among any + retries. `0` for nodes not wrapped by retry middleware. + - `fan_out_index` is the 0-based index of this fan-out instance among + its siblings. `None` for nodes not inside a fan-out. - Exactly one of `post_state` or `error` is populated per event. + Invariants: + - On `started` events, `post_state` and `error` MUST both be None. + - On `completed` events, exactly one of `post_state` and `error` is + populated. """ node_name: str namespace: tuple[str, ...] step: int + phase: Literal["started", "completed"] pre_state: State post_state: State | None error: RuntimeGraphError | None parent_states: tuple[State, ...] + attempt_index: int = 0 + fan_out_index: int | None = None __all__ = ["NodeEvent"] diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index 9d48757..ce8e6cc 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -1,30 +1,35 @@ -"""Observer hooks: protocol, delivery queue, per-invocation context. +"""Observer hooks: protocol, subscription, delivery queue, per-invocation context. -Per spec v0.3.0 §6 (proposal 0003): a registered observer receives a -`NodeEvent` once per node execution, asynchronously with respect to the -graph's execution loop. The graph never awaits observer processing. +Per spec v0.6.0 §6 (proposal 0005): each node attempt produces a started/ +completed event pair, and observers register with an optional `phases` +set so they can subscribe to one phase or both. The graph never awaits +observer processing. This module defines: - `Observer`: the callable shape an observer satisfies. -- `RemoveHandle`: returned by `CompiledGraph.attach_observer` so the caller - can detach later without reference-equality games. +- `SubscribedObserver`: pairs an `Observer` with the phase set it + subscribes to. Public — users construct one directly when passing + phase-filtered observers to `invoke(observers=...)`. +- `RemoveHandle`: returned by `CompiledGraph.attach_observer` so the + caller can detach later. - `_InvocationContext`: the cross-graph state threaded through one - outermost-invocation, including any nested subgraphs. Carries the queue, - observer chain (graph-attached, outermost → innermost) and the + outermost-invocation, including any nested subgraphs. Carries the + queue, observer chain (graph-attached, outermost → innermost) and the invocation-scoped observers, plus a shared step counter, namespace prefix, and parent-state stack. - `_QueuedItem`: an event paired with its delivery observer list. - `_dispatch`: enqueues an event for the worker to deliver. - `deliver_loop`: the worker coroutine. Reads items from the queue and - calls each observer in order, isolating exceptions via - `warnings.warn` per spec. + calls each observer in order, filtering by subscribed phase and + isolating exceptions via `warnings.warn` per spec. """ from __future__ import annotations import asyncio import warnings +from collections.abc import Iterable from dataclasses import dataclass, field from typing import Protocol @@ -33,9 +38,9 @@ class Observer(Protocol): - """An async callable invoked once per node execution. + """An async callable invoked once per node-boundary event. - Per spec v0.3.0 §6: observers MUST be asynchronous so the delivery + Per spec v0.6.0 §6: observers MUST be asynchronous so the delivery queue can await each one to coordinate completion. Observers MUST NOT alter state, routing, or any other aspect of the graph run. @@ -49,18 +54,73 @@ async def __call__(self, event: NodeEvent, /) -> None: raise NotImplementedError +# Per spec v0.6.0 §6: the two valid phase strings. Used as the default +# subscription set when a caller doesn't restrict by phase. +ALL_PHASES: frozenset[str] = frozenset({"started", "completed"}) + + +@dataclass(frozen=True) +class SubscribedObserver: + """An observer paired with its phase subscription set. + + Per spec v0.6.0 §6: observers register with an optional `phases` + parameter naming the phase strings they want to receive. The default + (`ALL_PHASES`) means "deliver every event." Empty phase sets are + forbidden — passing one raises `ValueError` at registration time per + the spec's "implementations SHOULD raise" guidance, hardened to MUST + here so misconfiguration surfaces immediately. + + Construct one of these directly when handing phase-filtered observers + to `CompiledGraph.invoke(observers=...)`. For the single-observer + `attach_observer` path, pass `phases=` as a keyword argument and the + engine wraps it for you. + """ + + observer: Observer + phases: frozenset[str] = ALL_PHASES + + def __post_init__(self) -> None: + if not self.phases: + raise ValueError("phases must be non-empty; spec §6 forbids empty phase subscriptions") + invalid = self.phases - ALL_PHASES + if invalid: + raise ValueError(f"unknown phase(s): {sorted(invalid)}; allowed: 'started', 'completed'") + + +def _coerce_subscribed( + observer: Observer | SubscribedObserver, + *, + phases: Iterable[str] | None = None, +) -> SubscribedObserver: + """Normalize a registration argument into a `SubscribedObserver`. + + - A bare `Observer` callable becomes a `SubscribedObserver` with + either the supplied `phases` or `ALL_PHASES` (default). + - An existing `SubscribedObserver` passes through unchanged; supplying + a `phases` kwarg in that case is a misuse and raises. + """ + if isinstance(observer, SubscribedObserver): + if phases is not None: + raise ValueError("cannot override phases on a SubscribedObserver; construct a new one") + return observer + return SubscribedObserver( + observer=observer, + phases=frozenset(phases) if phases is not None else ALL_PHASES, + ) + + @dataclass(frozen=True) class RemoveHandle: """Returned by `CompiledGraph.attach_observer`. Call `.remove()` to detach the observer. Idempotent — calling `.remove()` after the observer is already detached is a no-op. - Per spec v0.3.0 §6: changes to the registered observer set during a + Per spec v0.6.0 §6: changes to the registered observer set during a graph run do NOT take effect until the next invocation. """ - _observers: list[Observer] - _observer: Observer + _observers: list[SubscribedObserver] + _observer: SubscribedObserver def remove(self) -> None: try: @@ -81,7 +141,7 @@ class _QueuedItem: """ event: NodeEvent - observers: tuple[Observer, ...] + observers: tuple[SubscribedObserver, ...] # A sentinel value the engine puts on the queue to signal the worker to @@ -103,16 +163,16 @@ class _InvocationContext: queue: asyncio.Queue[_QueuedItem | None] # Graph-attached observers in delivery order: outermost graph first, # nested subgraph attached observers appended as we descend. - graph_attached: tuple[Observer, ...] + graph_attached: tuple[SubscribedObserver, ...] # Set once at the outermost invoke; carried unchanged into subgraphs. - invocation_scoped: tuple[Observer, ...] + invocation_scoped: tuple[SubscribedObserver, ...] # Shared mutable single-element list — a simple way to share an int by # reference across recursive subgraph contexts without leaking a class. step_counter: list[int] = field(default_factory=lambda: [0]) namespace_prefix: tuple[str, ...] = () parent_states_prefix: tuple[State, ...] = () - def full_observers(self) -> tuple[Observer, ...]: + def full_observers(self) -> tuple[SubscribedObserver, ...]: """Return the ordered observer list to deliver for events from this depth. Per spec §6: graph-attached (outermost → innermost), then invocation-scoped (passed to the outermost invoke).""" @@ -122,7 +182,7 @@ def descend_into_subgraph( self, subgraph_node_name: str, parent_state: State, - sub_attached: tuple[Observer, ...], + sub_attached: tuple[SubscribedObserver, ...], ) -> _InvocationContext: """Build the context for a subgraph-as-node call. @@ -164,10 +224,13 @@ def _dispatch(context: _InvocationContext, event: NodeEvent) -> None: async def deliver_loop(queue: asyncio.Queue[_QueuedItem | None]) -> None: """Background worker: read queued events, deliver to observers serially. - Per spec v0.3.0 §6: + Per spec v0.6.0 §6: - No two observers receive the same event concurrently (we await each). - No observer receives event N+1 until everyone has finished N (the loop processes one item fully before pulling the next). + - Observers whose `phases` set excludes the event's phase do NOT + receive it. Phase filter applies at delivery, not dispatch — the + engine still produces both events for every attempt. - Observer exceptions don't propagate, don't break siblings, don't block subsequent events. Reported via `warnings.warn`. @@ -177,9 +240,11 @@ async def deliver_loop(queue: asyncio.Queue[_QueuedItem | None]) -> None: item = await queue.get() if item is None: return - for observer in item.observers: + for subscribed in item.observers: + if item.event.phase not in subscribed.phases: + continue try: - await observer(item.event) + await subscribed.observer(item.event) except Exception as e: warnings.warn( f"observer raised {type(e).__name__}: {e}", @@ -188,14 +253,17 @@ async def deliver_loop(queue: asyncio.Queue[_QueuedItem | None]) -> None: __all__ = [ + "ALL_PHASES", "Observer", "RemoveHandle", + "SubscribedObserver", # Engine-internal but listed so pyright sees them as exported (they're # imported by `compiled.py` and `subgraph.py`). The underscore prefix # is the user-facing "don't import these" signal. "_DRAIN_SENTINEL", "_InvocationContext", "_QueuedItem", + "_coerce_subscribed", "_dispatch", "deliver_loop", ] diff --git a/tests/conformance/adapter.py b/tests/conformance/adapter.py index 67759fb..4f3840b 100644 --- a/tests/conformance/adapter.py +++ b/tests/conformance/adapter.py @@ -267,12 +267,17 @@ class ObserverFixture: observer callable produced by `make_observer_fn` records every event it receives into `events` and (if behavior == "raise") raises after recording. + + `phases` is the optional subscription set parsed from the fixture's + YAML. None means "no `phases:` key was present" — the harness leaves + the engine to default to both phases. """ name: str attach: str # "graph" | "invocation" target: str # "outer" | behavior: str # "record" | "raise" + phases: frozenset[str] | None = None events: list[dict[str, Any]] = field(default_factory=list[dict[str, Any]]) @@ -280,33 +285,38 @@ def _record_event(event: NodeEvent) -> dict[str, Any]: """Convert a NodeEvent into a dict matching the YAML expected shape.""" rec: dict[str, Any] = { "step": event.step, + "phase": event.phase, "node_name": event.node_name, "namespace": list(event.namespace), "pre_state": event.pre_state.model_dump(), "parent_states": [ps.model_dump() for ps in event.parent_states], + "attempt_index": event.attempt_index, } if event.post_state is not None: rec["post_state"] = event.post_state.model_dump() if event.error is not None: rec["error"] = event.error.category + if event.fan_out_index is not None: + rec["fan_out_index"] = event.fan_out_index return rec def make_observer_fn( fixture: ObserverFixture, - delivery: list[tuple[str, int]], + delivery: list[tuple[str, int, str]], ) -> Observer: """Build the async observer callable for an `ObserverFixture`. - Records every event into `fixture.events` and appends `(name, step)` to - the shared `delivery` list (the order observers are called in across the - whole invocation, used to assert `delivery_order`). Raising observers - record + append before raising, so the engine's error isolation can be - verified by checking that subsequent observers/events still get through. + Records every event into `fixture.events` and appends + `(name, step, phase)` to the shared `delivery` list (the order + observers are called in across the whole invocation, used to assert + `delivery_order`). Raising observers record + append before raising, + so the engine's error isolation can be verified by checking that + subsequent observers/events still get through. """ async def observer(event: NodeEvent) -> None: - delivery.append((fixture.name, event.step)) + delivery.append((fixture.name, event.step, event.phase)) fixture.events.append(_record_event(event)) if fixture.behavior == "raise": raise RuntimeError(f"{fixture.name} raised on event at step {event.step}") @@ -316,7 +326,11 @@ async def observer(event: NodeEvent) -> None: def normalize_expected_event(ev: Mapping[str, Any]) -> dict[str, Any]: """Fill in defaults for keys the YAML omits, so equality with the - recorded event dict works as-is.""" + recorded event dict works as-is. Fixtures don't repeat the + `attempt_index: 0` and `parent_states: []` defaults for every event; + the engine emits both unconditionally, so backfill them here. + """ e = dict(ev) e.setdefault("parent_states", []) + e.setdefault("attempt_index", 0) return e diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py index aebfa24..97e67c1 100644 --- a/tests/conformance/test_conformance.py +++ b/tests/conformance/test_conformance.py @@ -18,6 +18,7 @@ NodeException, RoutingError, RuntimeGraphError, + SubscribedObserver, ) from openarmature.graph.observer import Observer @@ -56,26 +57,6 @@ def _fixture_id(path: Path) -> str: ] -def _needs_pair_model(spec: dict[str, Any]) -> bool: - """True if the fixture's expected observer events use the v0.6.0 pair - model (`phase: started/completed`). The current engine emits the - pre-v0.6.0 single-event model; Phase 1 retrofits the pair model. - """ - expected = spec.get("expected") - if not isinstance(expected, dict): - return False - events_by_name = cast("dict[str, Any]", expected).get("observer_events") - if not isinstance(events_by_name, dict): - return False - for events in cast("dict[str, Any]", events_by_name).values(): - if not isinstance(events, list): - continue - for event in cast("list[Any]", events): - if isinstance(event, dict) and "phase" in event: - return True - return False - - # Node directives the legacy adapter doesn't (yet) translate. Phase 1+ will # either expand the adapter or replace it with the typed harness. _UNSUPPORTED_NODE_DIRECTIVES = frozenset( @@ -126,17 +107,9 @@ def scan(graph: Any) -> str | None: async def test_runtime_fixture(fixture_path: Path) -> None: spec = _load(fixture_path) - # Phase 0 — skip fixtures whose expected observer events use the v0.6.0 - # started/completed pair model. Phase 1 (engine retrofit) lands the - # pair model and turns these back on. - if _needs_pair_model(spec): - pytest.skip( - f"{fixture_path.stem}: needs phase 1 (engine pair-model retrofit) " - "— expected observer events carry `phase` field" - ) - # Phase 0 — skip fixtures whose nodes use directives the legacy adapter - # doesn't translate (fan_out, flaky variants, calls_llm, etc.). Each - # directive is gated to the phase that lands its runtime support. + # Skip fixtures whose nodes use directives the legacy adapter doesn't + # translate (fan_out, flaky variants, calls_llm, etc.). Each directive + # is gated to the phase that lands its runtime support. if (hit := _unsupported_directive(spec)) is not None: pytest.skip(f"{fixture_path.stem}: unsupported node directive {hit}") @@ -152,21 +125,33 @@ async def test_runtime_fixture(fixture_path: Path) -> None: compiled = built.builder.compile() initial = built.initial_state(spec.get("initial_state", {})) - # Wire observers per the fixture's `observers:` block (012–015). Each - # observer is recorded by name so we can assert event-by-event after - # invoke + drain. + # Wire observers per the fixture's `observers:` block (012–016, 018). + # Each observer is recorded by name so we can assert event-by-event + # after invoke + drain. `phases:` (018) restricts which started/ + # completed events the observer subscribes to. observer_fixtures: dict[str, ObserverFixture] = {} - delivery: list[tuple[str, int]] = [] - invocation_observers: list[Observer] = [] + delivery: list[tuple[str, int, str]] = [] + invocation_observers: list[Observer | SubscribedObserver] = [] for o in spec.get("observers", []): - ofx = ObserverFixture(name=o["name"], attach=o["attach"], target=o["target"], behavior=o["behavior"]) + phases_list = o.get("phases") + phases = frozenset(phases_list) if phases_list else None + ofx = ObserverFixture( + name=o["name"], + attach=o["attach"], + target=o["target"], + behavior=o["behavior"], + phases=phases, + ) observer_fixtures[ofx.name] = ofx obs = make_observer_fn(ofx, delivery) if ofx.attach == "graph": target_graph = compiled if ofx.target == "outer" else subgraphs[ofx.target] - target_graph.attach_observer(obs) + target_graph.attach_observer(obs, phases=phases) else: - invocation_observers.append(obs) + if phases is not None: + invocation_observers.append(SubscribedObserver(observer=obs, phases=phases)) + else: + invocation_observers.append(obs) # Top-level expected_error: legacy runtime-error fixtures (008, 009). if "expected_error" in spec: @@ -215,7 +200,7 @@ async def test_runtime_fixture(fixture_path: Path) -> None: if "execution_order" in expected: assert built.trace == expected["execution_order"] - # Observer event assertions (012–015 only). + # Observer event assertions (012–016, 018). if "observer_events" in expected: for name, expected_events in expected["observer_events"].items(): actual = observer_fixtures[name].events @@ -225,11 +210,28 @@ async def test_runtime_fixture(fixture_path: Path) -> None: ) if "delivery_order" in expected: - expected_delivery = [(d["observer"], d["step"]) for d in expected["delivery_order"]] + expected_delivery = [(d["observer"], d["step"], d["phase"]) for d in expected["delivery_order"]] assert delivery == expected_delivery, ( f"delivery_order mismatch: actual={delivery}, expected={expected_delivery}" ) + # 018 — registering an observer with an empty `phases` set raises at + # registration time per spec §6. + if expected.get("empty_phases_raises_at_registration"): + with pytest.raises(ValueError): + compiled.attach_observer( + make_observer_fn( + ObserverFixture( + name="probe", + attach="graph", + target="outer", + behavior="record", + ), + [], + ), + phases=frozenset(), + ) + # --------------------------------------------------------------------------- # 007 compile-errors: one parametrized case per entry in the `cases:` table. diff --git a/tests/unit/test_observer.py b/tests/unit/test_observer.py index 875d9a5..f441487 100644 --- a/tests/unit/test_observer.py +++ b/tests/unit/test_observer.py @@ -1,15 +1,17 @@ """Unit tests for the observer delivery queue mechanics. -Per spec v0.3.0 §6: delivery is strictly serial, ordered, and isolates -observer exceptions. These tests exercise the queue/worker pair in -isolation — no graph engine — so behavior bugs surface here rather than -inside fixture failures. +Per spec v0.6.0 §6: delivery is strictly serial, ordered, isolates +observer exceptions, and filters by per-observer phase subscription. +These tests exercise the queue/worker pair in isolation — no graph +engine — so behavior bugs surface here rather than inside fixture +failures. """ import asyncio import warnings +from typing import Literal -from openarmature.graph import Observer, State +from openarmature.graph import Observer, State, SubscribedObserver from openarmature.graph.events import NodeEvent from openarmature.graph.observer import ( _DRAIN_SENTINEL, @@ -25,18 +27,29 @@ class DummyState(State): v: str = "" -def _make_event(name: str, step: int = 0) -> NodeEvent: +def _make_event( + name: str, + step: int = 0, + phase: Literal["started", "completed"] = "completed", +) -> NodeEvent: return NodeEvent( node_name=name, namespace=(name,), step=step, + phase=phase, pre_state=DummyState(), - post_state=DummyState(v=f"after-{name}"), + post_state=DummyState(v=f"after-{name}") if phase == "completed" else None, error=None, parent_states=(), ) +def _wrap(observer: Observer) -> SubscribedObserver: + """Wrap a bare observer for the default both-phases subscription — + most queue-mechanics tests don't care about phase filtering.""" + return SubscribedObserver(observer=observer) + + async def _drain(queue: asyncio.Queue[_QueuedItem | None], worker: asyncio.Task[None]) -> None: queue.put_nowait(_DRAIN_SENTINEL) await worker @@ -53,8 +66,9 @@ async def observer(event: NodeEvent) -> None: queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() worker = asyncio.create_task(deliver_loop(queue)) + subscribed = (_wrap(observer),) for name in ("a", "b", "c"): - queue.put_nowait(_QueuedItem(event=_make_event(name), observers=(observer,))) + queue.put_nowait(_QueuedItem(event=_make_event(name), observers=subscribed)) await _drain(queue, worker) assert received == ["a", "b", "c"] @@ -71,8 +85,9 @@ async def obs2(event: NodeEvent) -> None: queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() worker = asyncio.create_task(deliver_loop(queue)) - queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=(obs1, obs2))) - queue.put_nowait(_QueuedItem(event=_make_event("b"), observers=(obs1, obs2))) + subscribed = (_wrap(obs1), _wrap(obs2)) + queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=subscribed)) + queue.put_nowait(_QueuedItem(event=_make_event("b"), observers=subscribed)) await _drain(queue, worker) # All observers for event A finish before any observer sees event B — @@ -90,7 +105,7 @@ async def boom(_event: NodeEvent) -> None: queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() worker = asyncio.create_task(deliver_loop(queue)) - queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=(boom,))) + queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=(_wrap(boom),))) with warnings.catch_warnings(record=True) as caught: warnings.simplefilter("always") @@ -111,7 +126,7 @@ async def obs2(event: NodeEvent) -> None: queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() worker = asyncio.create_task(deliver_loop(queue)) - queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=(obs1, obs2))) + queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=(_wrap(obs1), _wrap(obs2)))) with warnings.catch_warnings(record=True): warnings.simplefilter("always") @@ -131,8 +146,9 @@ async def silent(event: NodeEvent) -> None: queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() worker = asyncio.create_task(deliver_loop(queue)) + subscribed = (_wrap(always_raises), _wrap(silent)) for name in ("a", "b", "c"): - queue.put_nowait(_QueuedItem(event=_make_event(name), observers=(always_raises, silent))) + queue.put_nowait(_QueuedItem(event=_make_event(name), observers=subscribed)) with warnings.catch_warnings(record=True): warnings.simplefilter("always") @@ -141,6 +157,49 @@ async def silent(event: NodeEvent) -> None: assert received == ["a", "b", "c"] +# ===== Phase filtering (spec v0.6.0 §6) ===== + + +async def test_phase_filter_skips_unsubscribed_phase() -> None: + received: list[tuple[str, str]] = [] + + async def obs(event: NodeEvent) -> None: + received.append((event.node_name, event.phase)) + + queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() + worker = asyncio.create_task(deliver_loop(queue)) + completed_only = (SubscribedObserver(observer=obs, phases=frozenset({"completed"})),) + queue.put_nowait(_QueuedItem(event=_make_event("a", phase="started"), observers=completed_only)) + queue.put_nowait(_QueuedItem(event=_make_event("a", phase="completed"), observers=completed_only)) + await _drain(queue, worker) + + # The observer subscribed to `completed` only — the started event + # was delivered to the queue but filtered at the worker. + assert received == [("a", "completed")] + + +async def test_subscribed_observer_rejects_empty_phases() -> None: + async def obs(_event: NodeEvent) -> None: + pass + + try: + SubscribedObserver(observer=obs, phases=frozenset()) + except ValueError: + return + raise AssertionError("expected ValueError on empty phases") + + +async def test_subscribed_observer_rejects_unknown_phase() -> None: + async def obs(_event: NodeEvent) -> None: + pass + + try: + SubscribedObserver(observer=obs, phases=frozenset({"started", "bogus"})) + except ValueError: + return + raise AssertionError("expected ValueError on unknown phase") + + # ===== Sentinel + termination ===== @@ -152,8 +211,9 @@ async def observer(event: NodeEvent) -> None: queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() worker = asyncio.create_task(deliver_loop(queue)) - queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=(observer,))) - queue.put_nowait(_QueuedItem(event=_make_event("b"), observers=(observer,))) + subscribed = (_wrap(observer),) + queue.put_nowait(_QueuedItem(event=_make_event("a"), observers=subscribed)) + queue.put_nowait(_QueuedItem(event=_make_event("b"), observers=subscribed)) queue.put_nowait(_DRAIN_SENTINEL) await asyncio.wait_for(worker, timeout=1.0) @@ -180,15 +240,22 @@ async def graph_obs(_event: NodeEvent) -> None: async def invocation_obs(_event: NodeEvent) -> None: pass + graph_subscribed = _wrap(graph_obs) + invocation_subscribed = _wrap(invocation_obs) + queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() - ctx = _InvocationContext(queue=queue, graph_attached=(graph_obs,), invocation_scoped=(invocation_obs,)) + ctx = _InvocationContext( + queue=queue, + graph_attached=(graph_subscribed,), + invocation_scoped=(invocation_subscribed,), + ) _dispatch(ctx, _make_event("a")) item = queue.get_nowait() assert item is not None # graph_attached comes first, then invocation_scoped per spec. - assert item.observers == (graph_obs, invocation_obs) + assert item.observers == (graph_subscribed, invocation_subscribed) # ===== _InvocationContext.descend_into_subgraph ===== @@ -204,16 +271,28 @@ async def sub_obs(_event: NodeEvent) -> None: async def invocation_obs(_event: NodeEvent) -> None: pass + outer_subscribed = _wrap(outer_obs) + sub_subscribed = _wrap(sub_obs) + invocation_subscribed = _wrap(invocation_obs) + queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue() - outer = _InvocationContext(queue=queue, graph_attached=(outer_obs,), invocation_scoped=(invocation_obs,)) + outer = _InvocationContext( + queue=queue, + graph_attached=(outer_subscribed,), + invocation_scoped=(invocation_subscribed,), + ) parent = DummyState(v="parent-snapshot") - sub = outer.descend_into_subgraph(subgraph_node_name="sub", parent_state=parent, sub_attached=(sub_obs,)) + sub = outer.descend_into_subgraph( + subgraph_node_name="sub", + parent_state=parent, + sub_attached=(sub_subscribed,), + ) assert sub.queue is queue assert sub.step_counter is outer.step_counter - assert sub.graph_attached == (outer_obs, sub_obs) - assert sub.invocation_scoped == (invocation_obs,) + assert sub.graph_attached == (outer_subscribed, sub_subscribed) + assert sub.invocation_scoped == (invocation_subscribed,) assert sub.namespace_prefix == ("sub",) assert sub.parent_states_prefix == (parent,) @@ -238,21 +317,23 @@ def test_remove_handle_detaches_observer() -> None: async def obs(_event: NodeEvent) -> None: pass - observers: list[Observer] = [obs] - handle = RemoveHandle(_observers=observers, _observer=obs) + subscribed = _wrap(obs) + observers: list[SubscribedObserver] = [subscribed] + handle = RemoveHandle(_observers=observers, _observer=subscribed) - assert obs in observers + assert subscribed in observers handle.remove() - assert obs not in observers + assert subscribed not in observers def test_remove_handle_is_idempotent() -> None: async def obs(_event: NodeEvent) -> None: pass - observers: list[Observer] = [obs] - handle = RemoveHandle(_observers=observers, _observer=obs) + subscribed = _wrap(obs) + observers: list[SubscribedObserver] = [subscribed] + handle = RemoveHandle(_observers=observers, _observer=subscribed) handle.remove() handle.remove() # second call is a no-op, doesn't raise - assert obs not in observers + assert subscribed not in observers From db22865ab4cd626cbd8ad6ed03b72880348f9c29 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 5 May 2026 12:54:10 -0700 Subject: [PATCH 2/4] graph: document drain unbounded-wait risk and wait_for workaround Drain blocks until every queued event has been delivered, so a slow or hung observer can hold the calling process indefinitely. Note this in the drain() docstring with the asyncio.wait_for stop-gap pattern. A spec proposal for a first-class deadline parameter is in flight; this docstring nudge holds until it lands. Also bump the docstring's spec version reference from v0.3.0 to v0.6.0 to match the rest of the file. --- src/openarmature/graph/compiled.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/openarmature/graph/compiled.py b/src/openarmature/graph/compiled.py index b201f64..3a4df9b 100644 --- a/src/openarmature/graph/compiled.py +++ b/src/openarmature/graph/compiled.py @@ -156,7 +156,7 @@ async def drain(self) -> None: """Await delivery of every observer event produced by prior invocations of this graph. - Per spec v0.3.0 §6: callers running in short-lived processes (scripts, + Per spec v0.6.0 §6: callers running in short-lived processes (scripts, serverless functions, CLIs) MUST use drain to avoid losing observer events that were dispatched but not yet delivered. @@ -164,6 +164,15 @@ async def drain(self) -> None: invocations started concurrently with drain may or may not be included. Subgraph events from active invocations are part of the parent invocation's worker and are covered automatically. + + **Unbounded by design.** Drain blocks until every queued event has + been delivered to every subscribed observer. A slow, hung, or + misbehaving observer can therefore hold drain — and the calling + process — indefinitely. If you need a bounded wait, wrap the call + in `asyncio.wait_for` and accept that events still queued when the + deadline elapses will not be delivered:: + + await asyncio.wait_for(compiled.drain(), timeout=5.0) """ if not self._active_workers: return From 4bd3b462b650c84a2d9003d2c6b0d6888e6a3e47 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 5 May 2026 13:16:10 -0700 Subject: [PATCH 3/4] graph: clearer Observer Protocol docstring, idiomatic body MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lead with the structural-Protocol idea so users see they don't need to subclass — any async callable with a matching signature works. Includes a short usage example. Simplify __call__ body to `...` (the PEP 544 idiomatic form for Protocol methods) since the body is never executed. --- src/openarmature/graph/observer.py | 31 ++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index ce8e6cc..e7e96e1 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -38,20 +38,31 @@ class Observer(Protocol): - """An async callable invoked once per node-boundary event. + """The shape of a callable that receives node-boundary events. - Per spec v0.6.0 §6: observers MUST be asynchronous so the delivery - queue can await each one to coordinate completion. Observers MUST - NOT alter state, routing, or any other aspect of the graph run. + `Observer` is a structural Protocol — any async callable matching the + signature qualifies, no subclass required. Plain functions, bound + methods, and class instances with `__call__` all work:: - The parameter is positional-only (`event, /`) so structural conformance - isn't tied to a specific parameter name — implementations can use - `event`, `_event`, or any other name. + async def log_observer(event: NodeEvent) -> None: + print(event.node_name, event.phase) + + compiled.attach_observer(log_observer) + + Per spec v0.6.0 §6: + + - Observers MUST be async so the delivery queue can await each one + and coordinate ordering. The graph itself never awaits observers. + - Observers MUST NOT alter state, routing, or any other aspect of + the graph run — read-only side effects (logging, metrics, span + emission) only. + + The event parameter is positional-only (`event, /`) so structural + conformance doesn't pin you to that name — any of `event`, `_event`, + `e`, etc. matches. """ - async def __call__(self, event: NodeEvent, /) -> None: - """Receive a single node-boundary event.""" - raise NotImplementedError + async def __call__(self, event: NodeEvent, /) -> None: ... # Per spec v0.6.0 §6: the two valid phase strings. Used as the default From 92fb283a3ea9d1a83a44f4d0f56b0d869644eb2f Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 5 May 2026 13:39:52 -0700 Subject: [PATCH 4/4] test: harness preserves empty phases list as frozenset() Switching the truthiness check to `is not None` so an explicit `phases: []` in a fixture produces a `frozenset()` rather than collapsing to `None`. The engine's empty-set check then fires the intended ValueError instead of silently defaulting to ALL_PHASES. No current fixture uses `phases: []` for a regular observer, but the harness should faithfully translate fixture intent so a future fixture with that input doesn't silently mask a registration bug. --- tests/conformance/test_conformance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conformance/test_conformance.py b/tests/conformance/test_conformance.py index 97e67c1..909be0f 100644 --- a/tests/conformance/test_conformance.py +++ b/tests/conformance/test_conformance.py @@ -134,7 +134,7 @@ async def test_runtime_fixture(fixture_path: Path) -> None: invocation_observers: list[Observer | SubscribedObserver] = [] for o in spec.get("observers", []): phases_list = o.get("phases") - phases = frozenset(phases_list) if phases_list else None + phases = frozenset(phases_list) if phases_list is not None else None ofx = ObserverFixture( name=o["name"], attach=o["attach"],