Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/openarmature/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from .events import NodeEvent
from .nodes import FunctionNode, Node
from .observer import Observer, RemoveHandle
from .observer import Observer, RemoveHandle, SubscribedObserver
from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy
from .reducers import Reducer, append, last_write_wins, merge
from .state import State
Expand Down Expand Up @@ -64,6 +64,7 @@
"StateValidationError",
"StaticEdge",
"SubgraphNode",
"SubscribedObserver",
"UnreachableNode",
"append",
"last_write_wins",
Expand Down
105 changes: 74 additions & 31 deletions src/openarmature/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
Per spec §4 Error semantics: node, edge, reducer, and routing errors carry
recoverable state; state validation errors do not.

Per spec v0.3.0 §6 Observer hooks: between merge and edge evaluation, the
engine dispatches a `NodeEvent` for the just-completed node onto the
invocation's delivery queue. On node/reducer/state-validation failure, the
event is dispatched (with `error` populated) before the failure propagates.
Routing errors do NOT produce their own event — they arise after the
preceding node's event has already been dispatched.
Per spec v0.6.0 §6 Observer hooks: each node attempt produces a
started/completed event PAIR. The engine dispatches the started event
before invoking the wrapped node function and the completed event after
the reducer merge succeeds (with `post_state` populated) or after the
node, reducer, or state validation fails (with `error` populated).
Routing errors do NOT produce their own event pair — they arise after
the preceding node's completed event has already been dispatched.

`CompiledGraph[StateT]` and `_merge_partial[StateT]` carry the concrete state
subclass through to `invoke()`'s return type, so consumers don't need
Expand Down Expand Up @@ -42,6 +43,8 @@
_DRAIN_SENTINEL,
Observer,
RemoveHandle,
SubscribedObserver,
_coerce_subscribed,
_dispatch,
_InvocationContext,
_QueuedItem,
Expand Down Expand Up @@ -113,44 +116,63 @@ class CompiledGraph[StateT: State]:
# Observer plumbing — see attach_observer/drain. Mutable on a frozen
# dataclass: the list reference is fixed but its contents change.
# Parameterized factories so pyright infers the element types.
_attached_observers: list[Observer] = field(default_factory=list[Observer])
_attached_observers: list[SubscribedObserver] = field(default_factory=list[SubscribedObserver])
# `set` (not list) so a per-task `add_done_callback(self._active_workers.discard)`
# auto-removes completed workers — long-running services that never call
# drain() don't accumulate completed Task references indefinitely.
_active_workers: set[asyncio.Task[None]] = field(default_factory=set[asyncio.Task[None]])

# ------------------------------------------------------------------
# Observer registration (spec v0.3.0 §6)
# Observer registration (spec v0.6.0 §6)
# ------------------------------------------------------------------

def attach_observer(self, observer: Observer) -> RemoveHandle:
def attach_observer(
self,
observer: Observer,
Comment thread
chris-colinsky marked this conversation as resolved.
Comment thread
chris-colinsky marked this conversation as resolved.
*,
phases: Iterable[str] | None = None,
) -> RemoveHandle:
"""Register a graph-attached observer.

Per spec v0.3.0 §6: graph-attached observers fire on every invocation
Per spec v0.6.0 §6: graph-attached observers fire on every invocation
of this graph until removed — including when this graph runs as a
subgraph inside a parent. Returns a `RemoveHandle` whose `.remove()`
method detaches the observer; idempotent.

`phases` selects the phase strings (`"started"`, `"completed"`) the
observer subscribes to; default is both. An empty `phases` set
raises `ValueError` at registration time.

Per spec: changes to the registered set during a graph run do NOT
take effect until the next invocation. The set of observers
delivering events for an in-flight invocation is fixed at the point
the invocation begins.
"""
self._attached_observers.append(observer)
return RemoveHandle(_observers=self._attached_observers, _observer=observer)
subscribed = _coerce_subscribed(observer, phases=phases)
self._attached_observers.append(subscribed)
return RemoveHandle(_observers=self._attached_observers, _observer=subscribed)

async def drain(self) -> None:
"""Await delivery of every observer event produced by prior
invocations of this graph.

Per spec v0.3.0 §6: callers running in short-lived processes (scripts,
Per spec v0.6.0 §6: callers running in short-lived processes (scripts,
serverless functions, CLIs) MUST use drain to avoid losing observer
events that were dispatched but not yet delivered.

Only events dispatched before this call are awaited; events from
invocations started concurrently with drain may or may not be
included. Subgraph events from active invocations are part of the
parent invocation's worker and are covered automatically.

**Unbounded by design.** Drain blocks until every queued event has
been delivered to every subscribed observer. A slow, hung, or
misbehaving observer can therefore hold drain — and the calling
process — indefinitely. If you need a bounded wait, wrap the call
in `asyncio.wait_for` and accept that events still queued when the
deadline elapses will not be delivered::

await asyncio.wait_for(compiled.drain(), timeout=5.0)
"""
if not self._active_workers:
return
Expand All @@ -166,23 +188,27 @@ async def drain(self) -> None:
async def invoke(
self,
initial_state: StateT,
observers: Iterable[Observer] | None = None,
observers: Iterable[Observer | SubscribedObserver] | None = None,
) -> StateT:
"""Run the graph from `initial_state` to END and return the final state.

Optional `observers` are invocation-scoped — they fire only for this
run, after all graph-attached observers (including subgraph-attached
ones for events originating in subgraphs) per spec v0.3.0 §6.
ones for events originating in subgraphs) per spec v0.6.0 §6.

Each entry in `observers` may be either a bare `Observer` callable
(subscribes to both phases) or a `SubscribedObserver` wrapping an
observer with an explicit `phases` set.

Per spec v0.3.0 §6: this method returns as soon as the graph
Per spec v0.6.0 §6: this method returns as soon as the graph
execution loop completes, regardless of whether the observer
delivery queue has finished processing every dispatched event. Use
`await compiled.drain()` if you need delivery-completion guarantees.

Raises one of the runtime error categories from spec §4 on failure.
"""

invocation_scoped = tuple(observers) if observers else ()
invocation_scoped = tuple(_coerce_subscribed(o) for o in (observers or ()))
queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue()
context = _InvocationContext(
queue=queue,
Expand Down Expand Up @@ -271,62 +297,79 @@ async def _step_function_node(
state: StateT,
context: _InvocationContext,
) -> StateT:
"""Run one function-node step: take a step, run, merge, dispatch.

Dispatches a `NodeEvent` exactly once per call:
- On run failure (NodeException): event with error populated.
- On merge failure (ReducerError or StateValidationError): event with
error populated; the original error propagates unchanged after.
- On success: event with post_state populated, then return.
"""Run one function-node step: take a step, dispatch started, run,
merge, dispatch completed.

Per spec v0.6.0 §6: each attempt produces a started/completed pair.
Both events share the same `step`. The completed event carries
`post_state` on success, or `error` on failure (one of run, reducer,
or state-validation). The completed event is dispatched before the
failure propagates.
"""
step = context.take_step()
namespace = context.namespace_prefix + (current,)
pre_state = state

self._dispatch_started(context, current, namespace, step, pre_state)

try:
partial = await node.run(state)
except Exception as e:
wrapped = NodeException(node_name=current, cause=e, recoverable_state=state)
self._dispatch_failure_event(context, current, namespace, step, pre_state, wrapped)
self._dispatch_completed(context, current, namespace, step, pre_state, error=wrapped)
raise wrapped from e

try:
new_state = _merge_partial(state, partial, self.reducers, current)
except (ReducerError, StateValidationError) as e:
self._dispatch_failure_event(context, current, namespace, step, pre_state, e)
self._dispatch_completed(context, current, namespace, step, pre_state, error=e)
raise

self._dispatch_completed(context, current, namespace, step, pre_state, post_state=new_state)
return new_state

@staticmethod
def _dispatch_started(
context: _InvocationContext,
current: str,
namespace: tuple[str, ...],
step: int,
pre_state: State,
) -> None:
_dispatch(
context,
NodeEvent(
node_name=current,
namespace=namespace,
step=step,
phase="started",
pre_state=pre_state,
post_state=new_state,
post_state=None,
error=None,
parent_states=context.parent_states_prefix,
),
)
return new_state

@staticmethod
def _dispatch_failure_event(
def _dispatch_completed(
context: _InvocationContext,
current: str,
namespace: tuple[str, ...],
step: int,
pre_state: State,
error: RuntimeGraphError,
*,
post_state: State | None = None,
error: RuntimeGraphError | None = None,
) -> None:
_dispatch(
context,
NodeEvent(
node_name=current,
namespace=namespace,
step=step,
phase="completed",
pre_state=pre_state,
post_state=None,
post_state=post_state,
error=error,
parent_states=context.parent_states_prefix,
),
Expand Down
39 changes: 28 additions & 11 deletions src/openarmature/graph/events.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Node-boundary observer events.

Per spec v0.3.0 §6 (proposal 0003): a NodeEvent is delivered to registered
observers once per node execution, carrying enough context to reconstruct
where in the (potentially nested) execution path the node sat and what the
state looked like before/after the node's update merged.
Per spec v0.6.0 §6 (proposal 0005): each node attempt produces a
started/completed event PAIR. The engine dispatches the started event
before invoking the wrapped node function and the completed event after
the reducer merge succeeds (with `post_state` populated) or after the
node, reducer, or state validation fails (with `error` populated).

Frozen dataclass — observers receive a snapshot, not a live handle.
"""

from dataclasses import dataclass
from typing import Literal

from .errors import RuntimeGraphError
from .state import State
Expand All @@ -18,8 +20,12 @@
class NodeEvent:
"""A single node-boundary event delivered to observers.

Per spec v0.3.0 §6:
Per spec v0.6.0 §6:

- `phase` is `"started"` (dispatched before the node runs) or
`"completed"` (dispatched after the node returns or raises and the
merge runs/fails). Each node attempt produces exactly one of each
in that order.
- `node_name` is the name under which this node was registered in its
immediate containing graph.
- `namespace` is an ordered sequence of node names from the outermost
Expand All @@ -28,28 +34,39 @@ class NodeEvent:
extends.
- `step` is a monotonically-increasing counter starting at 0, scoped
to a single outermost-invocation. Subgraph-internal nodes increment
the same counter.
the same counter. The started/completed pair for one attempt share
the same step.
- `pre_state` is the state the node received, before reducer merge.
Populated on both phases (identical across the pair).
- `post_state` is the state after the node's partial update merged
successfully. Populated only on success.
successfully. Populated only on `completed` events that succeeded.
- `error` is the wrapped runtime error (NodeException, ReducerError,
or StateValidationError) when the node failed. Read `event.error.category`
for the spec category identifier and `event.error.__cause__` for the
original user/framework exception. Populated only on failure.
or StateValidationError) when the node failed. Populated only on
`completed` events that failed.
- `parent_states` carries one state snapshot per containing graph,
outermost first; for a node in the outermost graph it's an empty
tuple. Invariant: `len(parent_states) == len(namespace) - 1`.
- `attempt_index` is the 0-based index of this attempt among any
retries. `0` for nodes not wrapped by retry middleware.
- `fan_out_index` is the 0-based index of this fan-out instance among
its siblings. `None` for nodes not inside a fan-out.

Exactly one of `post_state` or `error` is populated per event.
Invariants:
- On `started` events, `post_state` and `error` MUST both be None.
- On `completed` events, exactly one of `post_state` and `error` is
populated.
"""

node_name: str
namespace: tuple[str, ...]
step: int
phase: Literal["started", "completed"]
pre_state: State
post_state: State | None
error: RuntimeGraphError | None
parent_states: tuple[State, ...]
attempt_index: int = 0
fan_out_index: int | None = None


__all__ = ["NodeEvent"]
Loading
Loading