Skip to content

Commit c332657

Browse files
graph: spec v0.6 observer pair model (proposal 0005) (#9)
* graph: spec v0.6 observer pair model (proposal 0005) Replace single-event-per-attempt observer hooks with started/completed pairs. Each node attempt now produces two NodeEvents sharing a step: the started event fires before the wrapped node runs, the completed event fires after the merge succeeds (with post_state) or after the node, reducer, or state validation fails (with error). Adds per-observer phase subscription via a new public SubscribedObserver dataclass. attach_observer accepts an optional phases= kwarg; invoke accepts Observer | SubscribedObserver in its observers list. Empty phase sets raise ValueError at registration time. The delivery worker filters by event.phase against each observer's subscribed phases. NodeEvent gains attempt_index (default 0 until retry middleware lands) and fan_out_index (default None until fan-out runtime lands). Conformance fixtures 012-016 and 018 now pass; 017 still skips pending fan-out runtime (proposal 0005 pipeline-utilities side). * graph: document drain unbounded-wait risk and wait_for workaround Drain blocks until every queued event has been delivered, so a slow or hung observer can hold the calling process indefinitely. Note this in the drain() docstring with the asyncio.wait_for stop-gap pattern. A spec proposal for a first-class deadline parameter is in flight; this docstring nudge holds until it lands. Also bump the docstring's spec version reference from v0.3.0 to v0.6.0 to match the rest of the file. * graph: clearer Observer Protocol docstring, idiomatic body Lead with the structural-Protocol idea so users see they don't need to subclass — any async callable with a matching signature works. Includes a short usage example. Simplify __call__ body to `...` (the PEP 544 idiomatic form for Protocol methods) since the body is never executed. * test: harness preserves empty phases list as frozenset() Switching the truthiness check to `is not None` so an explicit `phases: []` in a fixture produces a `frozenset()` rather than collapsing to `None`. The engine's empty-set check then fires the intended ValueError instead of silently defaulting to ALL_PHASES. No current fixture uses `phases: []` for a regular observer, but the harness should faithfully translate fixture intent so a future fixture with that input doesn't silently mask a registration bug.
1 parent 616dac5 commit c332657

7 files changed

Lines changed: 388 additions & 151 deletions

File tree

src/openarmature/graph/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
)
2828
from .events import NodeEvent
2929
from .nodes import FunctionNode, Node
30-
from .observer import Observer, RemoveHandle
30+
from .observer import Observer, RemoveHandle, SubscribedObserver
3131
from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy
3232
from .reducers import Reducer, append, last_write_wins, merge
3333
from .state import State
@@ -64,6 +64,7 @@
6464
"StateValidationError",
6565
"StaticEdge",
6666
"SubgraphNode",
67+
"SubscribedObserver",
6768
"UnreachableNode",
6869
"append",
6970
"last_write_wins",

src/openarmature/graph/compiled.py

Lines changed: 74 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88
Per spec §4 Error semantics: node, edge, reducer, and routing errors carry
99
recoverable state; state validation errors do not.
1010
11-
Per spec v0.3.0 §6 Observer hooks: between merge and edge evaluation, the
12-
engine dispatches a `NodeEvent` for the just-completed node onto the
13-
invocation's delivery queue. On node/reducer/state-validation failure, the
14-
event is dispatched (with `error` populated) before the failure propagates.
15-
Routing errors do NOT produce their own event — they arise after the
16-
preceding node's event has already been dispatched.
11+
Per spec v0.6.0 §6 Observer hooks: each node attempt produces a
12+
started/completed event PAIR. The engine dispatches the started event
13+
before invoking the wrapped node function and the completed event after
14+
the reducer merge succeeds (with `post_state` populated) or after the
15+
node, reducer, or state validation fails (with `error` populated).
16+
Routing errors do NOT produce their own event pair — they arise after
17+
the preceding node's completed event has already been dispatched.
1718
1819
`CompiledGraph[StateT]` and `_merge_partial[StateT]` carry the concrete state
1920
subclass through to `invoke()`'s return type, so consumers don't need
@@ -42,6 +43,8 @@
4243
_DRAIN_SENTINEL,
4344
Observer,
4445
RemoveHandle,
46+
SubscribedObserver,
47+
_coerce_subscribed,
4548
_dispatch,
4649
_InvocationContext,
4750
_QueuedItem,
@@ -113,44 +116,63 @@ class CompiledGraph[StateT: State]:
113116
# Observer plumbing — see attach_observer/drain. Mutable on a frozen
114117
# dataclass: the list reference is fixed but its contents change.
115118
# Parameterized factories so pyright infers the element types.
116-
_attached_observers: list[Observer] = field(default_factory=list[Observer])
119+
_attached_observers: list[SubscribedObserver] = field(default_factory=list[SubscribedObserver])
117120
# `set` (not list) so a per-task `add_done_callback(self._active_workers.discard)`
118121
# auto-removes completed workers — long-running services that never call
119122
# drain() don't accumulate completed Task references indefinitely.
120123
_active_workers: set[asyncio.Task[None]] = field(default_factory=set[asyncio.Task[None]])
121124

122125
# ------------------------------------------------------------------
123-
# Observer registration (spec v0.3.0 §6)
126+
# Observer registration (spec v0.6.0 §6)
124127
# ------------------------------------------------------------------
125128

126-
def attach_observer(self, observer: Observer) -> RemoveHandle:
129+
def attach_observer(
130+
self,
131+
observer: Observer,
132+
*,
133+
phases: Iterable[str] | None = None,
134+
) -> RemoveHandle:
127135
"""Register a graph-attached observer.
128136
129-
Per spec v0.3.0 §6: graph-attached observers fire on every invocation
137+
Per spec v0.6.0 §6: graph-attached observers fire on every invocation
130138
of this graph until removed — including when this graph runs as a
131139
subgraph inside a parent. Returns a `RemoveHandle` whose `.remove()`
132140
method detaches the observer; idempotent.
133141
142+
`phases` selects the phase strings (`"started"`, `"completed"`) the
143+
observer subscribes to; default is both. An empty `phases` set
144+
raises `ValueError` at registration time.
145+
134146
Per spec: changes to the registered set during a graph run do NOT
135147
take effect until the next invocation. The set of observers
136148
delivering events for an in-flight invocation is fixed at the point
137149
the invocation begins.
138150
"""
139-
self._attached_observers.append(observer)
140-
return RemoveHandle(_observers=self._attached_observers, _observer=observer)
151+
subscribed = _coerce_subscribed(observer, phases=phases)
152+
self._attached_observers.append(subscribed)
153+
return RemoveHandle(_observers=self._attached_observers, _observer=subscribed)
141154

142155
async def drain(self) -> None:
143156
"""Await delivery of every observer event produced by prior
144157
invocations of this graph.
145158
146-
Per spec v0.3.0 §6: callers running in short-lived processes (scripts,
159+
Per spec v0.6.0 §6: callers running in short-lived processes (scripts,
147160
serverless functions, CLIs) MUST use drain to avoid losing observer
148161
events that were dispatched but not yet delivered.
149162
150163
Only events dispatched before this call are awaited; events from
151164
invocations started concurrently with drain may or may not be
152165
included. Subgraph events from active invocations are part of the
153166
parent invocation's worker and are covered automatically.
167+
168+
**Unbounded by design.** Drain blocks until every queued event has
169+
been delivered to every subscribed observer. A slow, hung, or
170+
misbehaving observer can therefore hold drain — and the calling
171+
process — indefinitely. If you need a bounded wait, wrap the call
172+
in `asyncio.wait_for` and accept that events still queued when the
173+
deadline elapses will not be delivered::
174+
175+
await asyncio.wait_for(compiled.drain(), timeout=5.0)
154176
"""
155177
if not self._active_workers:
156178
return
@@ -166,23 +188,27 @@ async def drain(self) -> None:
166188
async def invoke(
167189
self,
168190
initial_state: StateT,
169-
observers: Iterable[Observer] | None = None,
191+
observers: Iterable[Observer | SubscribedObserver] | None = None,
170192
) -> StateT:
171193
"""Run the graph from `initial_state` to END and return the final state.
172194
173195
Optional `observers` are invocation-scoped — they fire only for this
174196
run, after all graph-attached observers (including subgraph-attached
175-
ones for events originating in subgraphs) per spec v0.3.0 §6.
197+
ones for events originating in subgraphs) per spec v0.6.0 §6.
198+
199+
Each entry in `observers` may be either a bare `Observer` callable
200+
(subscribes to both phases) or a `SubscribedObserver` wrapping an
201+
observer with an explicit `phases` set.
176202
177-
Per spec v0.3.0 §6: this method returns as soon as the graph
203+
Per spec v0.6.0 §6: this method returns as soon as the graph
178204
execution loop completes, regardless of whether the observer
179205
delivery queue has finished processing every dispatched event. Use
180206
`await compiled.drain()` if you need delivery-completion guarantees.
181207
182208
Raises one of the runtime error categories from spec §4 on failure.
183209
"""
184210

185-
invocation_scoped = tuple(observers) if observers else ()
211+
invocation_scoped = tuple(_coerce_subscribed(o) for o in (observers or ()))
186212
queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue()
187213
context = _InvocationContext(
188214
queue=queue,
@@ -271,62 +297,79 @@ async def _step_function_node(
271297
state: StateT,
272298
context: _InvocationContext,
273299
) -> StateT:
274-
"""Run one function-node step: take a step, run, merge, dispatch.
275-
276-
Dispatches a `NodeEvent` exactly once per call:
277-
- On run failure (NodeException): event with error populated.
278-
- On merge failure (ReducerError or StateValidationError): event with
279-
error populated; the original error propagates unchanged after.
280-
- On success: event with post_state populated, then return.
300+
"""Run one function-node step: take a step, dispatch started, run,
301+
merge, dispatch completed.
302+
303+
Per spec v0.6.0 §6: each attempt produces a started/completed pair.
304+
Both events share the same `step`. The completed event carries
305+
`post_state` on success, or `error` on failure (one of run, reducer,
306+
or state-validation). The completed event is dispatched before the
307+
failure propagates.
281308
"""
282309
step = context.take_step()
283310
namespace = context.namespace_prefix + (current,)
284311
pre_state = state
285312

313+
self._dispatch_started(context, current, namespace, step, pre_state)
314+
286315
try:
287316
partial = await node.run(state)
288317
except Exception as e:
289318
wrapped = NodeException(node_name=current, cause=e, recoverable_state=state)
290-
self._dispatch_failure_event(context, current, namespace, step, pre_state, wrapped)
319+
self._dispatch_completed(context, current, namespace, step, pre_state, error=wrapped)
291320
raise wrapped from e
292321

293322
try:
294323
new_state = _merge_partial(state, partial, self.reducers, current)
295324
except (ReducerError, StateValidationError) as e:
296-
self._dispatch_failure_event(context, current, namespace, step, pre_state, e)
325+
self._dispatch_completed(context, current, namespace, step, pre_state, error=e)
297326
raise
298327

328+
self._dispatch_completed(context, current, namespace, step, pre_state, post_state=new_state)
329+
return new_state
330+
331+
@staticmethod
332+
def _dispatch_started(
333+
context: _InvocationContext,
334+
current: str,
335+
namespace: tuple[str, ...],
336+
step: int,
337+
pre_state: State,
338+
) -> None:
299339
_dispatch(
300340
context,
301341
NodeEvent(
302342
node_name=current,
303343
namespace=namespace,
304344
step=step,
345+
phase="started",
305346
pre_state=pre_state,
306-
post_state=new_state,
347+
post_state=None,
307348
error=None,
308349
parent_states=context.parent_states_prefix,
309350
),
310351
)
311-
return new_state
312352

313353
@staticmethod
314-
def _dispatch_failure_event(
354+
def _dispatch_completed(
315355
context: _InvocationContext,
316356
current: str,
317357
namespace: tuple[str, ...],
318358
step: int,
319359
pre_state: State,
320-
error: RuntimeGraphError,
360+
*,
361+
post_state: State | None = None,
362+
error: RuntimeGraphError | None = None,
321363
) -> None:
322364
_dispatch(
323365
context,
324366
NodeEvent(
325367
node_name=current,
326368
namespace=namespace,
327369
step=step,
370+
phase="completed",
328371
pre_state=pre_state,
329-
post_state=None,
372+
post_state=post_state,
330373
error=error,
331374
parent_states=context.parent_states_prefix,
332375
),

src/openarmature/graph/events.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
"""Node-boundary observer events.
22
3-
Per spec v0.3.0 §6 (proposal 0003): a NodeEvent is delivered to registered
4-
observers once per node execution, carrying enough context to reconstruct
5-
where in the (potentially nested) execution path the node sat and what the
6-
state looked like before/after the node's update merged.
3+
Per spec v0.6.0 §6 (proposal 0005): each node attempt produces a
4+
started/completed event PAIR. The engine dispatches the started event
5+
before invoking the wrapped node function and the completed event after
6+
the reducer merge succeeds (with `post_state` populated) or after the
7+
node, reducer, or state validation fails (with `error` populated).
78
89
Frozen dataclass — observers receive a snapshot, not a live handle.
910
"""
1011

1112
from dataclasses import dataclass
13+
from typing import Literal
1214

1315
from .errors import RuntimeGraphError
1416
from .state import State
@@ -18,8 +20,12 @@
1820
class NodeEvent:
1921
"""A single node-boundary event delivered to observers.
2022
21-
Per spec v0.3.0 §6:
23+
Per spec v0.6.0 §6:
2224
25+
- `phase` is `"started"` (dispatched before the node runs) or
26+
`"completed"` (dispatched after the node returns or raises and the
27+
merge runs/fails). Each node attempt produces exactly one of each
28+
in that order.
2329
- `node_name` is the name under which this node was registered in its
2430
immediate containing graph.
2531
- `namespace` is an ordered sequence of node names from the outermost
@@ -28,28 +34,39 @@ class NodeEvent:
2834
extends.
2935
- `step` is a monotonically-increasing counter starting at 0, scoped
3036
to a single outermost-invocation. Subgraph-internal nodes increment
31-
the same counter.
37+
the same counter. The started/completed pair for one attempt share
38+
the same step.
3239
- `pre_state` is the state the node received, before reducer merge.
40+
Populated on both phases (identical across the pair).
3341
- `post_state` is the state after the node's partial update merged
34-
successfully. Populated only on success.
42+
successfully. Populated only on `completed` events that succeeded.
3543
- `error` is the wrapped runtime error (NodeException, ReducerError,
36-
or StateValidationError) when the node failed. Read `event.error.category`
37-
for the spec category identifier and `event.error.__cause__` for the
38-
original user/framework exception. Populated only on failure.
44+
or StateValidationError) when the node failed. Populated only on
45+
`completed` events that failed.
3946
- `parent_states` carries one state snapshot per containing graph,
4047
outermost first; for a node in the outermost graph it's an empty
4148
tuple. Invariant: `len(parent_states) == len(namespace) - 1`.
49+
- `attempt_index` is the 0-based index of this attempt among any
50+
retries. `0` for nodes not wrapped by retry middleware.
51+
- `fan_out_index` is the 0-based index of this fan-out instance among
52+
its siblings. `None` for nodes not inside a fan-out.
4253
43-
Exactly one of `post_state` or `error` is populated per event.
54+
Invariants:
55+
- On `started` events, `post_state` and `error` MUST both be None.
56+
- On `completed` events, exactly one of `post_state` and `error` is
57+
populated.
4458
"""
4559

4660
node_name: str
4761
namespace: tuple[str, ...]
4862
step: int
63+
phase: Literal["started", "completed"]
4964
pre_state: State
5065
post_state: State | None
5166
error: RuntimeGraphError | None
5267
parent_states: tuple[State, ...]
68+
attempt_index: int = 0
69+
fan_out_index: int | None = None
5370

5471

5572
__all__ = ["NodeEvent"]

0 commit comments

Comments
 (0)