Skip to content

Commit 3d7de3d

Browse files
graph: spec v0.6 observer pair model (proposal 0005)
Replace single-event-per-attempt observer hooks with started/completed pairs. Each node attempt now produces two NodeEvents sharing a step: the started event fires before the wrapped node runs, the completed event fires after the merge succeeds (with post_state) or after the node, reducer, or state validation fails (with error). Adds per-observer phase subscription via a new public SubscribedObserver dataclass. attach_observer accepts an optional phases= kwarg; invoke accepts Observer | SubscribedObserver in its observers list. Empty phase sets raise ValueError at registration time. The delivery worker filters by event.phase against each observer's subscribed phases. NodeEvent gains attempt_index (default 0 until retry middleware lands) and fan_out_index (default None until fan-out runtime lands). Conformance fixtures 012-016 and 018 now pass; 017 still skips pending fan-out runtime (proposal 0005 pipeline-utilities side).
1 parent 616dac5 commit 3d7de3d

7 files changed

Lines changed: 359 additions & 142 deletions

File tree

src/openarmature/graph/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
)
2828
from .events import NodeEvent
2929
from .nodes import FunctionNode, Node
30-
from .observer import Observer, RemoveHandle
30+
from .observer import Observer, RemoveHandle, SubscribedObserver
3131
from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy
3232
from .reducers import Reducer, append, last_write_wins, merge
3333
from .state import State
@@ -64,6 +64,7 @@
6464
"StateValidationError",
6565
"StaticEdge",
6666
"SubgraphNode",
67+
"SubscribedObserver",
6768
"UnreachableNode",
6869
"append",
6970
"last_write_wins",

src/openarmature/graph/compiled.py

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
Per spec §4 Error semantics: node, edge, reducer, and routing errors carry
99
recoverable state; state validation errors do not.
1010
11-
Per spec v0.3.0 §6 Observer hooks: between merge and edge evaluation, the
12-
engine dispatches a `NodeEvent` for the just-completed node onto the
13-
invocation's delivery queue. On node/reducer/state-validation failure, the
14-
event is dispatched (with `error` populated) before the failure propagates.
15-
Routing errors do NOT produce their own event — they arise after the
16-
preceding node's event has already been dispatched.
11+
Per spec v0.6.0 §6 Observer hooks: each node attempt produces a
12+
started/completed event PAIR. The engine dispatches the started event
13+
before invoking the wrapped node function and the completed event after
14+
the reducer merge succeeds (with `post_state` populated) or after the
15+
node, reducer, or state validation fails (with `error` populated).
16+
Routing errors do NOT produce their own event pair — they arise after
17+
the preceding node's completed event has already been dispatched.
1718
1819
`CompiledGraph[StateT]` and `_merge_partial[StateT]` carry the concrete state
1920
subclass through to `invoke()`'s return type, so consumers don't need
@@ -42,6 +43,8 @@
4243
_DRAIN_SENTINEL,
4344
Observer,
4445
RemoveHandle,
46+
SubscribedObserver,
47+
_coerce_subscribed,
4548
_dispatch,
4649
_InvocationContext,
4750
_QueuedItem,
@@ -113,31 +116,41 @@ class CompiledGraph[StateT: State]:
113116
# Observer plumbing — see attach_observer/drain. Mutable on a frozen
114117
# dataclass: the list reference is fixed but its contents change.
115118
# Parameterized factories so pyright infers the element types.
116-
_attached_observers: list[Observer] = field(default_factory=list[Observer])
119+
_attached_observers: list[SubscribedObserver] = field(default_factory=list[SubscribedObserver])
117120
# `set` (not list) so a per-task `add_done_callback(self._active_workers.discard)`
118121
# auto-removes completed workers — long-running services that never call
119122
# drain() don't accumulate completed Task references indefinitely.
120123
_active_workers: set[asyncio.Task[None]] = field(default_factory=set[asyncio.Task[None]])
121124

122125
# ------------------------------------------------------------------
123-
# Observer registration (spec v0.3.0 §6)
126+
# Observer registration (spec v0.6.0 §6)
124127
# ------------------------------------------------------------------
125128

126-
def attach_observer(self, observer: Observer) -> RemoveHandle:
129+
def attach_observer(
130+
self,
131+
observer: Observer,
132+
*,
133+
phases: Iterable[str] | None = None,
134+
) -> RemoveHandle:
127135
"""Register a graph-attached observer.
128136
129-
Per spec v0.3.0 §6: graph-attached observers fire on every invocation
137+
Per spec v0.6.0 §6: graph-attached observers fire on every invocation
130138
of this graph until removed — including when this graph runs as a
131139
subgraph inside a parent. Returns a `RemoveHandle` whose `.remove()`
132140
method detaches the observer; idempotent.
133141
142+
`phases` selects the phase strings (`"started"`, `"completed"`) the
143+
observer subscribes to; default is both. An empty `phases` set
144+
raises `ValueError` at registration time.
145+
134146
Per spec: changes to the registered set during a graph run do NOT
135147
take effect until the next invocation. The set of observers
136148
delivering events for an in-flight invocation is fixed at the point
137149
the invocation begins.
138150
"""
139-
self._attached_observers.append(observer)
140-
return RemoveHandle(_observers=self._attached_observers, _observer=observer)
151+
subscribed = _coerce_subscribed(observer, phases=phases)
152+
self._attached_observers.append(subscribed)
153+
return RemoveHandle(_observers=self._attached_observers, _observer=subscribed)
141154

142155
async def drain(self) -> None:
143156
"""Await delivery of every observer event produced by prior
@@ -166,23 +179,27 @@ async def drain(self) -> None:
166179
async def invoke(
167180
self,
168181
initial_state: StateT,
169-
observers: Iterable[Observer] | None = None,
182+
observers: Iterable[Observer | SubscribedObserver] | None = None,
170183
) -> StateT:
171184
"""Run the graph from `initial_state` to END and return the final state.
172185
173186
Optional `observers` are invocation-scoped — they fire only for this
174187
run, after all graph-attached observers (including subgraph-attached
175-
ones for events originating in subgraphs) per spec v0.3.0 §6.
188+
ones for events originating in subgraphs) per spec v0.6.0 §6.
189+
190+
Each entry in `observers` may be either a bare `Observer` callable
191+
(subscribes to both phases) or a `SubscribedObserver` wrapping an
192+
observer with an explicit `phases` set.
176193
177-
Per spec v0.3.0 §6: this method returns as soon as the graph
194+
Per spec v0.6.0 §6: this method returns as soon as the graph
178195
execution loop completes, regardless of whether the observer
179196
delivery queue has finished processing every dispatched event. Use
180197
`await compiled.drain()` if you need delivery-completion guarantees.
181198
182199
Raises one of the runtime error categories from spec §4 on failure.
183200
"""
184201

185-
invocation_scoped = tuple(observers) if observers else ()
202+
invocation_scoped = tuple(_coerce_subscribed(o) for o in (observers or ()))
186203
queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue()
187204
context = _InvocationContext(
188205
queue=queue,
@@ -271,62 +288,79 @@ async def _step_function_node(
271288
state: StateT,
272289
context: _InvocationContext,
273290
) -> StateT:
274-
"""Run one function-node step: take a step, run, merge, dispatch.
275-
276-
Dispatches a `NodeEvent` exactly once per call:
277-
- On run failure (NodeException): event with error populated.
278-
- On merge failure (ReducerError or StateValidationError): event with
279-
error populated; the original error propagates unchanged after.
280-
- On success: event with post_state populated, then return.
291+
"""Run one function-node step: take a step, dispatch started, run,
292+
merge, dispatch completed.
293+
294+
Per spec v0.6.0 §6: each attempt produces a started/completed pair.
295+
Both events share the same `step`. The completed event carries
296+
`post_state` on success, or `error` on failure (one of run, reducer,
297+
or state-validation). The completed event is dispatched before the
298+
failure propagates.
281299
"""
282300
step = context.take_step()
283301
namespace = context.namespace_prefix + (current,)
284302
pre_state = state
285303

304+
self._dispatch_started(context, current, namespace, step, pre_state)
305+
286306
try:
287307
partial = await node.run(state)
288308
except Exception as e:
289309
wrapped = NodeException(node_name=current, cause=e, recoverable_state=state)
290-
self._dispatch_failure_event(context, current, namespace, step, pre_state, wrapped)
310+
self._dispatch_completed(context, current, namespace, step, pre_state, error=wrapped)
291311
raise wrapped from e
292312

293313
try:
294314
new_state = _merge_partial(state, partial, self.reducers, current)
295315
except (ReducerError, StateValidationError) as e:
296-
self._dispatch_failure_event(context, current, namespace, step, pre_state, e)
316+
self._dispatch_completed(context, current, namespace, step, pre_state, error=e)
297317
raise
298318

319+
self._dispatch_completed(context, current, namespace, step, pre_state, post_state=new_state)
320+
return new_state
321+
322+
@staticmethod
323+
def _dispatch_started(
324+
context: _InvocationContext,
325+
current: str,
326+
namespace: tuple[str, ...],
327+
step: int,
328+
pre_state: State,
329+
) -> None:
299330
_dispatch(
300331
context,
301332
NodeEvent(
302333
node_name=current,
303334
namespace=namespace,
304335
step=step,
336+
phase="started",
305337
pre_state=pre_state,
306-
post_state=new_state,
338+
post_state=None,
307339
error=None,
308340
parent_states=context.parent_states_prefix,
309341
),
310342
)
311-
return new_state
312343

313344
@staticmethod
314-
def _dispatch_failure_event(
345+
def _dispatch_completed(
315346
context: _InvocationContext,
316347
current: str,
317348
namespace: tuple[str, ...],
318349
step: int,
319350
pre_state: State,
320-
error: RuntimeGraphError,
351+
*,
352+
post_state: State | None = None,
353+
error: RuntimeGraphError | None = None,
321354
) -> None:
322355
_dispatch(
323356
context,
324357
NodeEvent(
325358
node_name=current,
326359
namespace=namespace,
327360
step=step,
361+
phase="completed",
328362
pre_state=pre_state,
329-
post_state=None,
363+
post_state=post_state,
330364
error=error,
331365
parent_states=context.parent_states_prefix,
332366
),

src/openarmature/graph/events.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
"""Node-boundary observer events.
22
3-
Per spec v0.3.0 §6 (proposal 0003): a NodeEvent is delivered to registered
4-
observers once per node execution, carrying enough context to reconstruct
5-
where in the (potentially nested) execution path the node sat and what the
6-
state looked like before/after the node's update merged.
3+
Per spec v0.6.0 §6 (proposal 0005): each node attempt produces a
4+
started/completed event PAIR. The engine dispatches the started event
5+
before invoking the wrapped node function and the completed event after
6+
the reducer merge succeeds (with `post_state` populated) or after the
7+
node, reducer, or state validation fails (with `error` populated).
78
89
Frozen dataclass — observers receive a snapshot, not a live handle.
910
"""
1011

1112
from dataclasses import dataclass
13+
from typing import Literal
1214

1315
from .errors import RuntimeGraphError
1416
from .state import State
@@ -18,8 +20,12 @@
1820
class NodeEvent:
1921
"""A single node-boundary event delivered to observers.
2022
21-
Per spec v0.3.0 §6:
23+
Per spec v0.6.0 §6:
2224
25+
- `phase` is `"started"` (dispatched before the node runs) or
26+
`"completed"` (dispatched after the node returns or raises and the
27+
merge runs/fails). Each node attempt produces exactly one of each
28+
in that order.
2329
- `node_name` is the name under which this node was registered in its
2430
immediate containing graph.
2531
- `namespace` is an ordered sequence of node names from the outermost
@@ -28,28 +34,39 @@ class NodeEvent:
2834
extends.
2935
- `step` is a monotonically-increasing counter starting at 0, scoped
3036
to a single outermost-invocation. Subgraph-internal nodes increment
31-
the same counter.
37+
the same counter. The started/completed pair for one attempt share
38+
the same step.
3239
- `pre_state` is the state the node received, before reducer merge.
40+
Populated on both phases (identical across the pair).
3341
- `post_state` is the state after the node's partial update merged
34-
successfully. Populated only on success.
42+
successfully. Populated only on `completed` events that succeeded.
3543
- `error` is the wrapped runtime error (NodeException, ReducerError,
36-
or StateValidationError) when the node failed. Read `event.error.category`
37-
for the spec category identifier and `event.error.__cause__` for the
38-
original user/framework exception. Populated only on failure.
44+
or StateValidationError) when the node failed. Populated only on
45+
`completed` events that failed.
3946
- `parent_states` carries one state snapshot per containing graph,
4047
outermost first; for a node in the outermost graph it's an empty
4148
tuple. Invariant: `len(parent_states) == len(namespace) - 1`.
49+
- `attempt_index` is the 0-based index of this attempt among any
50+
retries. `0` for nodes not wrapped by retry middleware.
51+
- `fan_out_index` is the 0-based index of this fan-out instance among
52+
its siblings. `None` for nodes not inside a fan-out.
4253
43-
Exactly one of `post_state` or `error` is populated per event.
54+
Invariants:
55+
- On `started` events, `post_state` and `error` MUST both be None.
56+
- On `completed` events, exactly one of `post_state` and `error` is
57+
populated.
4458
"""
4559

4660
node_name: str
4761
namespace: tuple[str, ...]
4862
step: int
63+
phase: Literal["started", "completed"]
4964
pre_state: State
5065
post_state: State | None
5166
error: RuntimeGraphError | None
5267
parent_states: tuple[State, ...]
68+
attempt_index: int = 0
69+
fan_out_index: int | None = None
5370

5471

5572
__all__ = ["NodeEvent"]

0 commit comments

Comments
 (0)