Skip to content

Commit 75740a9

Browse files
graph: spec v0.3 observer hooks (proposal 0003) (#5)
* graph: spec v0.3 observer hooks (proposal 0003) Implement spec v0.3 / proposal 0003: node-boundary observer hooks. A `NodeEvent` is dispatched once per node execution onto a per- invocation delivery queue that runs concurrently with the graph's execution loop. Public surface: - `NodeEvent` (frozen dataclass): node_name, namespace tuple, step, pre/post state, error category+instance, parent_states tuple. - `Observer` Protocol: async callable receiving a NodeEvent; parameter is positional-only so conformance isn't tied to a parameter name. - `CompiledGraph.attach_observer(fn) -> RemoveHandle` for graph- attached observers; `invoke(state, observers=...)` for invocation- scoped. - `CompiledGraph.drain()` awaits delivery of every event dispatched by prior invocations of this graph. Delivery semantics per spec §6: - Strictly serial within an invocation: no two observers process the same event concurrently; no observer sees event N+1 until everyone has finished N. Order is graph-attached (outermost → innermost), then invocation-scoped, both in registration order. - Async-from-graph: invoke() returns when the graph reaches END regardless of queue state. Use drain() for short-lived processes. - Observer exceptions are caught and reported via warnings.warn — they don't break siblings, subsequent events, or the graph run. - Subgraph-internal events bubble up: the subgraph wrapper itself does NOT generate an event (per fixture 013); only its inner nodes do. Step counter spans the subgraph boundary; namespace and parent_states extend. Bumps openarmature to 0.4.0 and the spec submodule to v0.3.1. * graph: address PR #5 review CodeQL noise (10 threads): replace bare `...` Protocol/stub bodies with explicit forms. `Observer.__call__` gets a docstring + `raise NotImplementedError` (so pyright accepts the declared return type, same pattern as Protocol bodies fixed in #4); test stub observers get `pass`. Add an explanatory comment to `RemoveHandle.remove`'s `try/except ValueError: pass` documenting the idempotency intent the docstring already promises. Logic (2 threads): - `_active_workers` switched from list to set and each per- invocation worker now registers add_done_callback( self._active_workers.discard). Long-running services that never call drain() no longer accumulate completed Task references indefinitely. drain() simplifies to a one-line asyncio.gather over a snapshot of the set. - SubgraphNode branch in _invoke now wraps non-RuntimeGraphError exceptions from node.run(state, context=context) as NodeException tagged with the wrapper's name. Projection errors (project_in / project_out) and subgraph state-class init errors (e.g. Pydantic ValidationError) were previously propagating raw, bypassing the spec §4 error contract. Already- wrapped errors from inside the subgraph's _invoke pass through unchanged so the inner node's identity stays attached. Adds test_subgraph_projection_error_wrapped_as_node_exception covering the case.
1 parent d53fefc commit 75740a9

15 files changed

Lines changed: 1026 additions & 80 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Python reference implementation of [OpenArmature](https://github.com/LunarCommand/openarmature-spec) — a workflow framework for LLM pipelines and tool-calling agents.
44

5-
**Status:** alpha. The graph engine module is implemented against spec v0.1.1; the other modules in the charter are not yet built.
5+
**Status:** alpha. The graph engine module is implemented against spec v0.3.1; the other modules in the charter are not yet built.
66

77
## Install
88

pyproject.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "openarmature"
7-
version = "0.3.0"
7+
version = "0.4.0"
88
description = "Workflow framework for LLM pipelines and tool-calling agents."
99
readme = "README.md"
1010
requires-python = ">=3.12"
@@ -20,7 +20,7 @@ Repository = "https://github.com/LunarCommand/openarmature-python"
2020
Specification = "https://github.com/LunarCommand/openarmature-spec"
2121

2222
[tool.openarmature]
23-
spec_version = "0.2.0"
23+
spec_version = "0.3.1"
2424

2525
[dependency-groups]
2626
dev = [
@@ -41,10 +41,17 @@ include = ["src", "tests"]
4141
pythonVersion = "3.12"
4242
typeCheckingMode = "strict"
4343
reportMissingTypeStubs = false
44+
# Underscore-prefixed names are conventionally "package-private" — used
45+
# across sibling modules within `openarmature.graph` but not part of the
46+
# public API (which is enforced by `__all__`). Pyright's strict default
47+
# treats them as module-private; turn it off so legitimate cross-module
48+
# package-private imports don't need per-line ignores.
49+
reportPrivateUsage = false
4450

4551
[tool.ruff]
4652
line-length = 110
4753
target-version = "py312"
54+
extend-exclude = ["openarmature-spec"]
4855

4956
[tool.ruff.lint]
5057
select = ["E", "F", "I", "B", "UP"]

src/openarmature/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""OpenArmature — workflow framework for LLM pipelines and tool-calling agents."""
22

3-
__version__ = "0.3.0"
4-
__spec_version__ = "0.2.0"
3+
__version__ = "0.4.0"
4+
__spec_version__ = "0.3.1"

src/openarmature/graph/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
StateValidationError,
2626
UnreachableNode,
2727
)
28+
from .events import NodeEvent
2829
from .nodes import FunctionNode, Node
30+
from .observer import Observer, RemoveHandle
2931
from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy
3032
from .reducers import Reducer, append, last_write_wins, merge
3133
from .state import State
@@ -48,11 +50,14 @@
4850
"MappingReferencesUndeclaredField",
4951
"MultipleOutgoingEdges",
5052
"Node",
53+
"NodeEvent",
5154
"NodeException",
5255
"NoDeclaredEntry",
56+
"Observer",
5357
"ProjectionStrategy",
5458
"Reducer",
5559
"ReducerError",
60+
"RemoveHandle",
5661
"RoutingError",
5762
"RuntimeGraphError",
5863
"State",

src/openarmature/graph/compiled.py

Lines changed: 221 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,21 @@
88
Per spec §4 Error semantics: node, edge, reducer, and routing errors carry
99
recoverable state; state validation errors do not.
1010
11+
Per spec v0.3.0 §6 Observer hooks: between merge and edge evaluation, the
12+
engine dispatches a `NodeEvent` for the just-completed node onto the
13+
invocation's delivery queue. On node/reducer/state-validation failure, the
14+
event is dispatched (with `error` populated) before the failure propagates.
15+
Routing errors do NOT produce their own event — they arise after the
16+
preceding node's event has already been dispatched.
17+
1118
`CompiledGraph[StateT]` and `_merge_partial[StateT]` carry the concrete state
1219
subclass through to `invoke()`'s return type, so consumers don't need
1320
`cast(MyState, ...)` at the call site.
1421
"""
1522

16-
from collections.abc import Mapping
17-
from dataclasses import dataclass
23+
import asyncio
24+
from collections.abc import Iterable, Mapping
25+
from dataclasses import dataclass, field
1826
from typing import Any
1927

2028
from pydantic import ValidationError
@@ -25,11 +33,23 @@
2533
NodeException,
2634
ReducerError,
2735
RoutingError,
36+
RuntimeGraphError,
2837
StateValidationError,
2938
)
39+
from .events import NodeEvent
3040
from .nodes import Node
41+
from .observer import (
42+
_DRAIN_SENTINEL,
43+
Observer,
44+
RemoveHandle,
45+
_dispatch,
46+
_InvocationContext,
47+
_QueuedItem,
48+
deliver_loop,
49+
)
3150
from .reducers import Reducer
3251
from .state import State
52+
from .subgraph import SubgraphNode
3353

3454

3555
def _merge_partial[StateT: State](
@@ -77,38 +97,156 @@ def _merge_partial[StateT: State](
7797

7898
@dataclass(frozen=True)
7999
class CompiledGraph[StateT: State]:
80-
"""An immutable, executable graph produced by `GraphBuilder.compile()`."""
100+
"""An immutable, executable graph produced by `GraphBuilder.compile()`.
101+
102+
The compile-time topology (state class, entry, nodes, edges, reducers) is
103+
immutable. Two mutable lists ride alongside for observer plumbing —
104+
`_attached_observers` and `_active_workers` — neither of which affect the
105+
compiled topology and both of which are scoped to the same instance.
106+
"""
81107

82108
state_cls: type[StateT]
83109
entry: str
84110
nodes: Mapping[str, Node[StateT]]
85111
edges: Mapping[str, StaticEdge | ConditionalEdge[StateT]]
86112
reducers: Mapping[str, Reducer]
113+
# Observer plumbing — see attach_observer/drain. Mutable on a frozen
114+
# dataclass: the list reference is fixed but its contents change.
115+
# Parameterized factories so pyright infers the element types.
116+
_attached_observers: list[Observer] = field(default_factory=list[Observer])
117+
# `set` (not list) so a per-task `add_done_callback(self._active_workers.discard)`
118+
# auto-removes completed workers — long-running services that never call
119+
# drain() don't accumulate completed Task references indefinitely.
120+
_active_workers: set[asyncio.Task[None]] = field(default_factory=set[asyncio.Task[None]])
121+
122+
# ------------------------------------------------------------------
123+
# Observer registration (spec v0.3.0 §6)
124+
# ------------------------------------------------------------------
125+
126+
def attach_observer(self, observer: Observer) -> RemoveHandle:
127+
"""Register a graph-attached observer.
128+
129+
Per spec v0.3.0 §6: graph-attached observers fire on every invocation
130+
of this graph until removed — including when this graph runs as a
131+
subgraph inside a parent. Returns a `RemoveHandle` whose `.remove()`
132+
method detaches the observer; idempotent.
133+
134+
Per spec: changes to the registered set during a graph run do NOT
135+
take effect until the next invocation. The set of observers
136+
delivering events for an in-flight invocation is fixed at the point
137+
the invocation begins.
138+
"""
139+
self._attached_observers.append(observer)
140+
return RemoveHandle(_observers=self._attached_observers, _observer=observer)
87141

88-
async def invoke(self, initial_state: StateT) -> StateT:
142+
async def drain(self) -> None:
143+
"""Await delivery of every observer event produced by prior
144+
invocations of this graph.
145+
146+
Per spec v0.3.0 §6: callers running in short-lived processes (scripts,
147+
serverless functions, CLIs) MUST use drain to avoid losing observer
148+
events that were dispatched but not yet delivered.
149+
150+
Only events dispatched before this call are awaited; events from
151+
invocations started concurrently with drain may or may not be
152+
included. Subgraph events from active invocations are part of the
153+
parent invocation's worker and are covered automatically.
154+
"""
155+
if not self._active_workers:
156+
return
157+
# Snapshot the set: each worker's done-callback removes itself
158+
# from `_active_workers`, so iterating it directly while gather
159+
# awaits would mutate during iteration.
160+
await asyncio.gather(*list(self._active_workers), return_exceptions=True)
161+
162+
# ------------------------------------------------------------------
163+
# Public invocation
164+
# ------------------------------------------------------------------
165+
166+
async def invoke(
167+
self,
168+
initial_state: StateT,
169+
observers: Iterable[Observer] | None = None,
170+
) -> StateT:
89171
"""Run the graph from `initial_state` to END and return the final state.
90172
173+
Optional `observers` are invocation-scoped — they fire only for this
174+
run, after all graph-attached observers (including subgraph-attached
175+
ones for events originating in subgraphs) per spec v0.3.0 §6.
176+
177+
Per spec v0.3.0 §6: this method returns as soon as the graph
178+
execution loop completes, regardless of whether the observer
179+
delivery queue has finished processing every dispatched event. Use
180+
`await compiled.drain()` if you need delivery-completion guarantees.
181+
91182
Raises one of the runtime error categories from spec §4 on failure.
92183
"""
93184

185+
invocation_scoped = tuple(observers) if observers else ()
186+
queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue()
187+
context = _InvocationContext(
188+
queue=queue,
189+
graph_attached=tuple(self._attached_observers),
190+
invocation_scoped=invocation_scoped,
191+
)
192+
worker = asyncio.create_task(deliver_loop(queue))
193+
self._active_workers.add(worker)
194+
# Auto-prune: when the worker completes (after the sentinel is
195+
# processed), remove it from the active set so long-running
196+
# services don't leak Task references between drain() calls.
197+
worker.add_done_callback(self._active_workers.discard)
198+
try:
199+
return await self._invoke(initial_state, context)
200+
finally:
201+
# Sentinel terminates the worker after it processes events
202+
# already on the queue (including any error event we just
203+
# dispatched on the failure path). Drain semantics live on
204+
# `.drain()` — we do NOT await the worker here, per spec.
205+
queue.put_nowait(_DRAIN_SENTINEL)
206+
207+
# ------------------------------------------------------------------
208+
# Internal invocation (used by SubgraphNode for nested execution)
209+
# ------------------------------------------------------------------
210+
211+
async def _invoke(
212+
self,
213+
initial_state: StateT,
214+
context: _InvocationContext,
215+
) -> StateT:
216+
"""Execution loop that dispatches events through the supplied context.
217+
218+
Public `invoke()` builds a fresh root context. Subgraph-as-node
219+
execution calls `_invoke` directly with a context derived from the
220+
parent's, so the queue, step counter, and observer chain thread
221+
through the boundary.
222+
"""
223+
94224
state = initial_state
95225
current = self.entry
96226

97227
while True:
98228
node = self.nodes[current]
99229

100-
# Run the node. Wrap user exceptions as NodeException with
101-
# recoverable_state = state at point of failure (pre-update).
102-
try:
103-
partial = await node.run(state)
104-
except Exception as e:
105-
raise NodeException(node_name=current, cause=e, recoverable_state=state) from e
106-
107-
# Merge partial into state via reducers (may raise ReducerError or
108-
# StateValidationError; both already carry the right context).
109-
state = _merge_partial(state, partial, self.reducers, current)
230+
if isinstance(node, SubgraphNode):
231+
# Subgraph wrappers are transparent to the observer protocol
232+
# (per fixture 013): no event is dispatched for the wrapper
233+
# itself, the step counter does not advance for it, and any
234+
# `RuntimeGraphError` bubbling up from the subgraph's
235+
# _invoke is already wrapped with the inner node's identity
236+
# — pass it through. Other exceptions (projection errors,
237+
# subgraph state-class init errors) escape the spec §4
238+
# categories, so we wrap them as NodeException tagged with
239+
# the wrapper's name.
240+
try:
241+
partial = await node.run(state, context=context)
242+
except RuntimeGraphError:
243+
raise
244+
except Exception as e:
245+
raise NodeException(node_name=current, cause=e, recoverable_state=state) from e
246+
state = _merge_partial(state, partial, self.reducers, current)
247+
else:
248+
state = await self._step_function_node(node, current, state, context)
110249

111-
# Evaluate the outgoing edge against the post-update state.
112250
edge = self.edges[current]
113251
if isinstance(edge, StaticEdge):
114252
target: str | EndSentinel = edge.target
@@ -125,3 +263,71 @@ async def invoke(self, initial_state: StateT) -> StateT:
125263
raise RoutingError(source_node=current, returned=target, recoverable_state=state)
126264

127265
current = target
266+
267+
async def _step_function_node(
268+
self,
269+
node: Node[StateT],
270+
current: str,
271+
state: StateT,
272+
context: _InvocationContext,
273+
) -> StateT:
274+
"""Run one function-node step: take a step, run, merge, dispatch.
275+
276+
Dispatches a `NodeEvent` exactly once per call:
277+
- On run failure (NodeException): event with error populated.
278+
- On merge failure (ReducerError or StateValidationError): event with
279+
error populated; the original error propagates unchanged after.
280+
- On success: event with post_state populated, then return.
281+
"""
282+
step = context.take_step()
283+
namespace = context.namespace_prefix + (current,)
284+
pre_state = state
285+
286+
try:
287+
partial = await node.run(state)
288+
except Exception as e:
289+
wrapped = NodeException(node_name=current, cause=e, recoverable_state=state)
290+
self._dispatch_failure_event(context, current, namespace, step, pre_state, wrapped)
291+
raise wrapped from e
292+
293+
try:
294+
new_state = _merge_partial(state, partial, self.reducers, current)
295+
except (ReducerError, StateValidationError) as e:
296+
self._dispatch_failure_event(context, current, namespace, step, pre_state, e)
297+
raise
298+
299+
_dispatch(
300+
context,
301+
NodeEvent(
302+
node_name=current,
303+
namespace=namespace,
304+
step=step,
305+
pre_state=pre_state,
306+
post_state=new_state,
307+
error=None,
308+
parent_states=context.parent_states_prefix,
309+
),
310+
)
311+
return new_state
312+
313+
@staticmethod
314+
def _dispatch_failure_event(
315+
context: _InvocationContext,
316+
current: str,
317+
namespace: tuple[str, ...],
318+
step: int,
319+
pre_state: State,
320+
error: RuntimeGraphError,
321+
) -> None:
322+
_dispatch(
323+
context,
324+
NodeEvent(
325+
node_name=current,
326+
namespace=namespace,
327+
step=step,
328+
pre_state=pre_state,
329+
post_state=None,
330+
error=error,
331+
parent_states=context.parent_states_prefix,
332+
),
333+
)

0 commit comments

Comments
 (0)