Skip to content

Commit 106498a

Browse files
Add per-invocation observer event drain primitive (#131)
* Add per-invocation observer event drain primitive CompiledGraph.drain_events_for(invocation_id, *, timeout=5.0) gives a terminal node a way to block until every event dispatched for the in-flight invocation has been delivered to every attached observer, then proceed -- typically followed by a read against a queryable observer accumulator's per-invocation bucket whose state the drain has now caught up to. Without this, the deliver loop may still hold not-yet-dispatched events at the moment the terminal node reads, and the read silently undercounts. The implementation hangs on the existing per-invocation _DrainCounters: drain_events_for snapshots dispatched at call time, registers a (target, Future) pair on a new drain_wakers list, and awaits the Future via asyncio.wait_for. The deliver loop fulfils any waker whose target has been reached after each delivered increment. On timeout the waker is removed from the list and the partial summary is returned -- and crucially, the worker is NOT cancelled. This is the load-bearing difference from the process-wide drain: drain is shutdown semantics and cancels its workers; drain_events_for is in-flight synchronization and leaves them running so the graph keeps serving other invocations. Subgraph descents share the parent's _DrainCounters, so a drain on the outermost invocation_id covers fan-out instance events and parallel-branches branch events for free. Ten unit tests mirror the spec fixtures' case shapes: basic synchronization (028), snapshot semantic / no-deadlock on the calling node's own completed event (029), worker-not-cancelled-on-timeout (030, the load-bearing divergence), invocation-scope isolation (031), fan-out coverage (032), parallel-branches coverage (033), plus zero-timeout non-blocking check, unknown id, negative + NaN boundary rejection. Conformance manifest 0054 flips to implemented since 0.12.0. The six graph-engine fixtures (028-033) stay deferred from the cross-capability parser pending the upcoming conformance-adapter capability spec to ratify the directive vocabulary. Coord context: discuss-per-invocation-event-drain thread, 02-spec-accepted-as-0054 settling the five spec questions (section, name, default timeout, summary shape, resume interaction) and flagging the worker-cancellation divergence explicitly. * Tighten drain_events_for docstring + tests Docstring fixes: the opening sentence said the drain covers events dispatched "before this call returns", but the snapshot is taken at call entry — events dispatched between entry and exit are not in scope. Rewords to "as of this call's entry". The snapshot- semantic paragraph also claimed the calling node's started AND completed events were out of scope, but the engine dispatches started immediately before the node body runs, so started is already in the snapshot and the drain awaits its delivery normally. Only completed is guaranteed out of scope. Test fixes: - test_drain_events_for_timeout_does_not_cancel_worker now runs the follow-up invocation on the SAME compiled graph as the timed-out one, and the decisive contract check is that all NodeEvents from the originally-pending queue land in the observer's delivery list AFTER the timed-out drain returned. The previous version compiled a fresh graph for the follow-up, so the test would have passed even if the original worker had been cancelled. - test_drain_events_for_invocation_scope_isolation now actually performs two serial invocations (the previous version invoked once and drained the same id twice, testing stale-id idempotency rather than per-invocation isolation). Each drain's delivery log entries are partitioned by invocation_id; the assertion is strict (no cross-contamination, no entries outside either partition).
1 parent 7a3e0b2 commit 106498a

7 files changed

Lines changed: 624 additions & 31 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
1616

1717
### Added
1818

19+
- **`CompiledGraph.drain_events_for(invocation_id, *, timeout=5.0) -> DrainSummary`** (proposal 0054, spec graph-engine §6 *Per-invocation drain*, v0.46.0). The architectural pair to proposal 0048's §9.4 queryable observer accumulator lifecycle: a terminal node calling `await graph.drain_events_for(state.invocation_id)` blocks until every event dispatched for that invocation has reached every attached observer, typically followed by a read against a queryable observer accumulator whose bucket the drain has now caught up to. Snapshot semantic: the drain awaits the events dispatched as of call time; new emissions after the call are out of scope. Reuses the existing `DrainSummary` shape verbatim — no new `InvocationDrainSummary` variant. **Load-bearing divergence from `drain()`**: a per-invocation drain timeout MUST NOT cancel the delivery worker, in contrast to `drain()`'s shutdown semantics. The graph stays serving other invocations after the timeout fires; the deliver loop keeps processing the queue. Default timeout is `5.0` seconds; `None` waits indefinitely; `0.0` is a non-blocking check. Negative or `NaN` timeout raises `ValueError` at the API boundary. Unknown `invocation_id` (already drained or never started) returns an empty summary, not an error.
1920
- **`get_invocation_metadata()` read-symmetric API** (proposal 0048, observability §3.4, spec v0.40.0). The canonical spec-idiomatic public name for the §3.4 read access pairs with `set_invocation_metadata()` on the write side: same function object as the historical `current_invocation_metadata`, exposed for callers wishing to use the symmetric `get_/set_` naming. Returns the `MappingProxyType` snapshot of the current async context's view (caller baseline + in-node augments), or the empty mapping outside any active invocation. Read-only — callers MUST NOT mutate it. Both names are now exported from `openarmature.observability`; existing `current_invocation_metadata` callers continue to work unchanged.
2021
- **`docs/concepts/observability.md` §9 *Queryable observer pattern*** documents the convention-only observer-attached read methods that proposal 0048 §9 blesses: how to add a `get_*` read method to a custom observer (§9.1), the async-safety contract for concurrent reads under in-flight delivery (§9.2), the three-channel data-access guidance (typed State / untyped invocation metadata / queryable observer accumulator, §9.3, with a side-by-side table), and the lifecycle / explicit `drop(invocation_id)` discipline (§9.4). No new abstract surface on `Observer` per the spec — the pattern is convention-only and exists to bless the existing observer-state read shape used in production code.
2122
- **Production observability example.** `examples/production-observability/` demonstrates the production-grade observability stack end-to-end: `OTelObserver` + `LangfuseObserver` attached to the same graph (proposal 0031), `trace_input_from_state` / `trace_output_from_state` caller hooks on the Langfuse observer (proposal 0043 §8.4.1) deriving domain dicts from State, the built-in `TimingMiddleware` recording per-node duration via an `on_complete` callback, and `invoke(metadata={...})` carrying multi-tenant identifiers (tenantId / requestId / featureFlag) that both observers pick up at once. `InMemoryLangfuseClient` + `InMemorySpanExporter` capture in-process so the demo prints what each backend would have ingested without needing real production credentials.

conformance.toml

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,50 @@ status = "textual-only"
337337
since = "0.12.0"
338338

339339
# Spec v0.46.0 (proposal 0054). Per-invocation observer event drain
340-
# (``drain_events_for(invocation_id, *, timeout) -> DrainSummary``).
341-
# Bundled into v0.12.0 per the 2026-06-03 decision in the
342-
# proposal-0048-implementation coord thread — 0054 directly resolves
343-
# the §9.4 accumulator-lifecycle synchronization race that 0048's
344-
# queryable observer pattern would otherwise expose. Lands in
345-
# PR 2b (after this PR's 0048 implementation).
340+
# (``CompiledGraph.drain_events_for(invocation_id, *, timeout) ->
341+
# DrainSummary``). Shipped in v0.12.0 — the architectural pair to
342+
# 0048's §9.4 queryable observer accumulator lifecycle. Engine
343+
# implementation: per-invocation waker Futures on the existing
344+
# ``_DrainCounters``; ``deliver_loop`` fulfils waiters whose target
345+
# delivered-count has been reached. Reuses the existing
346+
# ``DrainSummary`` shape verbatim per the
347+
# discuss-per-invocation-event-drain coord thread's Q4 direction.
348+
# Default timeout is 5.0 seconds (recommended-value nudge per the
349+
# coord-thread Q3 direction; spec leaves the default to the
350+
# implementation).
351+
#
352+
# Key divergence from process-wide ``drain()``: per-invocation drain
353+
# MUST NOT cancel the deliver worker on timeout (the graph remains
354+
# active and other invocations may still be in flight); ``drain()``
355+
# cancels because it's a shutdown primitive.
356+
#
357+
# Conformance fixtures (graph-engine/028-033) stay deferred from the
358+
# cross-capability parser pending the upcoming spec conformance-
359+
# adapter capability — same Path A reasoning as the 0048 fixtures.
360+
# Behavior is pinned by unit tests:
361+
# - basic synchronization (mirrors fixture 028):
362+
# ``tests/unit/test_drain.py::
363+
# test_drain_events_for_basic_synchronization``;
364+
# - snapshot semantic / no-deadlock-on-own-completed-event
365+
# (mirrors fixture 029):
366+
# ``::test_drain_events_for_snapshot_semantic_does_not_wait_for_own_completed_event``;
367+
# - worker-NOT-cancelled-on-timeout (mirrors fixture 030, the
368+
# load-bearing divergence-from-drain contract):
369+
# ``::test_drain_events_for_timeout_does_not_cancel_worker``;
370+
# - invocation-scope isolation (mirrors fixture 031):
371+
# ``::test_drain_events_for_invocation_scope_isolation``;
372+
# - fan-out coverage (mirrors fixture 032; subgraph descents share
373+
# parent counters):
374+
# ``::test_drain_events_for_covers_fan_out_instance_events``;
375+
# - parallel-branches coverage (mirrors fixture 033):
376+
# ``::test_drain_events_for_covers_parallel_branches_events``;
377+
# - zero-timeout non-blocking check:
378+
# ``::test_drain_events_for_zero_timeout_is_non_blocking_check``;
379+
# - unknown invocation_id returns clean summary:
380+
# ``::test_drain_events_for_unknown_invocation_returns_clean_summary``;
381+
# - negative + NaN timeout rejected at boundary:
382+
# ``::test_drain_events_for_rejects_negative_timeout``,
383+
# ``::test_drain_events_for_rejects_nan_timeout``.
346384
[proposals."0054"]
347-
status = "not-yet"
385+
status = "implemented"
386+
since = "0.12.0"

docs/concepts/observability.md

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -583,18 +583,14 @@ async def persist(state: PipelineState) -> Mapping[str, Any]:
583583

584584
`drain_events_for` is symmetric with the existing process-wide
585585
`graph.drain()` but scoped to one invocation. Returns the same
586-
`DrainSummary` shape, with the same timeout discipline.
587-
588-
!!! info "drain_events_for ships in v0.12.0 alongside the read API"
589-
`CompiledGraph.drain_events_for` is the spec §6 / proposal 0054
590-
pair to the §9.4 accumulator lifecycle described above. The two
591-
proposals are bundled into the v0.12.0 release cycle as
592-
architecturally paired: without the per-invocation drain, the
593-
accumulator pattern would race against the deliver loop on the
594-
last-event read. If you are reading this from a pre-v0.12.0
595-
install, the primitive is not yet present; the docs document the
596-
pattern's complete shape so the v0.12.0 upgrade is a straight
597-
drop-in.
586+
`DrainSummary` shape with the same timeout discipline, but with one
587+
load-bearing divergence: a per-invocation drain timeout MUST NOT
588+
cancel the delivery worker. `graph.drain()` cancels because it is a
589+
shutdown primitive; per-invocation drain is an in-flight
590+
synchronization primitive, so the graph stays available to serve
591+
other invocations after the timeout fires, and the deliver loop
592+
keeps processing the queue. The default timeout is `5.0` seconds;
593+
pass `None` to wait indefinitely, or `0.0` for a non-blocking check.
598594

599595
## OpenTelemetry mapping (opt-in)
600596

src/openarmature/graph/compiled.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,101 @@ async def drain(self, timeout: float | None = None) -> DrainSummary:
773773

774774
return DrainSummary(undelivered_count=undelivered, timeout_reached=timeout_reached)
775775

776+
# Spec graph-engine §6 *Per-invocation drain* (proposal 0054).
777+
# Symmetric with the process-wide ``drain`` method on the same
778+
# class but scoped to one in-flight invocation, with one
779+
# spec-mandated divergence: the per-invocation primitive MUST
780+
# NOT cancel the deliver worker on timeout (drain is shutdown
781+
# semantics; this is in-flight synchronization). The snapshot
782+
# semantic — events dispatched after the call begins do not
783+
# extend the target — is what keeps an in-node call (e.g., a
784+
# terminal node draining its own invocation before reading a
785+
# queryable observer accumulator) from deadlocking on its own
786+
# ``completed`` event.
787+
async def drain_events_for(
788+
self,
789+
invocation_id: str,
790+
*,
791+
timeout: float | None = 5.0,
792+
) -> DrainSummary:
793+
"""Await delivery of every observer event tagged with
794+
``invocation_id`` that was dispatched as of this call's entry,
795+
optionally bounded by ``timeout``.
796+
797+
Use this from a terminal node body to synchronize on the
798+
observer event stream before reading derived observer state
799+
(a queryable accumulator's per-invocation bucket, a latency
800+
rollup, a token-usage record). The drain blocks until every
801+
event dispatched up to the moment of the call has reached
802+
every attached observer, then returns.
803+
804+
Snapshot semantic: the drain awaits the events dispatched as
805+
of call time. Events emitted after the call begins (notably
806+
the calling node's own ``completed`` event, which fires only
807+
after the node body returns) are out of scope. This is what
808+
allows an in-node call to avoid deadlocking on its own
809+
completed event. The calling node's ``started`` event, by
810+
contrast, fires immediately BEFORE the body runs and IS in
811+
the snapshot — the drain awaits its delivery normally.
812+
813+
``timeout`` is a non-negative duration in seconds (default
814+
``5.0``). ``None`` waits indefinitely. ``timeout=0.0`` is a
815+
non-blocking check: returns immediately whether the snapshot
816+
target was met. Raises :class:`ValueError` on negative or
817+
``NaN`` input.
818+
819+
On timeout the deliver worker is left running. The compiled
820+
graph stays available to serve other invocations after a
821+
per-invocation drain times out; the deliver loop continues
822+
processing the queue, including the events the timed-out
823+
caller failed to await. This is the load-bearing difference
824+
from :meth:`drain`, which cancels its workers.
825+
826+
Returns a :class:`DrainSummary` with ``undelivered_count`` and
827+
``timeout_reached``. On the clean path both are zero / false;
828+
on timeout ``undelivered_count`` is the snapshot target minus
829+
the deliver loop's current ``delivered`` count for this
830+
invocation. Unknown ``invocation_id`` (no active worker, or
831+
the invocation has already drained and the worker has exited)
832+
returns an empty summary — not an error.
833+
834+
Interaction with :meth:`drain`: if process-wide ``drain`` is
835+
called while a per-invocation drain is pending, ``drain``'s
836+
shutdown semantics take precedence. The deliver worker is
837+
cancelled, its remaining events are not delivered, and the
838+
per-invocation waker's target may never be reached. The
839+
per-invocation call then blocks until its own ``timeout``
840+
fires and returns ``timeout_reached=True``. Mixing the two
841+
primitives in the same shutdown path is unusual; use
842+
``drain`` for lifespan / shutdown coordination and
843+
``drain_events_for`` for in-flight synchronization.
844+
"""
845+
if timeout is not None and not (timeout >= 0):
846+
raise ValueError(f"drain_events_for timeout must be non-negative, got {timeout!r}")
847+
848+
target_context: _InvocationContext | None = None
849+
for context in self._active_workers.values():
850+
if context.invocation_id == invocation_id:
851+
target_context = context
852+
break
853+
if target_context is None:
854+
return DrainSummary(undelivered_count=0, timeout_reached=False)
855+
856+
counters = target_context.drain_counters
857+
snapshot_target = counters.dispatched
858+
if counters.delivered >= snapshot_target:
859+
return DrainSummary(undelivered_count=0, timeout_reached=False)
860+
861+
waker: asyncio.Future[None] = asyncio.get_running_loop().create_future()
862+
counters.drain_wakers.append((snapshot_target, waker))
863+
try:
864+
await asyncio.wait_for(waker, timeout=timeout)
865+
except TimeoutError:
866+
counters.drain_wakers = [(t, f) for t, f in counters.drain_wakers if f is not waker]
867+
undelivered = max(0, snapshot_target - counters.delivered)
868+
return DrainSummary(undelivered_count=undelivered, timeout_reached=True)
869+
return DrainSummary(undelivered_count=0, timeout_reached=False)
870+
776871
# ------------------------------------------------------------------
777872
# Public invocation
778873
# ------------------------------------------------------------------

src/openarmature/graph/observer.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,17 @@ class _QueuedItem:
285285
class _DrainCounters:
286286
dispatched: int = 0
287287
delivered: int = 0
288+
# Per spec graph-engine §6 *Per-invocation drain* (proposal 0054):
289+
# ``drain_events_for(invocation_id, *, timeout)`` callers register
290+
# ``(target_delivered_count, Future)`` pairs here; the deliver
291+
# loop fulfils any whose target has been reached after each
292+
# ``delivered`` increment. The list is touched only from the
293+
# single event-loop task running ``deliver_loop`` plus the
294+
# caller of ``drain_events_for`` — no cross-thread access — so a
295+
# plain list is sufficient.
296+
drain_wakers: list[tuple[int, asyncio.Future[None]]] = field(
297+
default_factory=list[tuple[int, asyncio.Future[None]]]
298+
)
288299

289300

290301
# Spec: realizes graph-engine §6 Drain summary return shape (proposal
@@ -877,6 +888,23 @@ async def deliver_loop(
877888
# filtered out for every observer is still considered
878889
# delivered (we did all the work there was to do for it).
879890
counters.delivered += 1
891+
# Per spec §6 *Per-invocation drain* (proposal 0054): wake any
892+
# ``drain_events_for`` waiter whose ``target_delivered_count``
893+
# has been reached. Mutate the list in place; the only other
894+
# toucher is ``drain_events_for`` itself, running in the same
895+
# event-loop task family. The ``not fut.done()`` guard absorbs
896+
# the case where the waiter's own ``asyncio.wait_for`` timed
897+
# out and cancelled the Future before the deliver loop got
898+
# here.
899+
if counters.drain_wakers:
900+
still_pending: list[tuple[int, asyncio.Future[None]]] = []
901+
for target, fut in counters.drain_wakers:
902+
if counters.delivered >= target:
903+
if not fut.done():
904+
fut.set_result(None)
905+
continue
906+
still_pending.append((target, fut))
907+
counters.drain_wakers = still_pending
880908

881909

882910
__all__ = [

tests/conformance/test_fixture_parsing.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -369,28 +369,38 @@ def _id(case: tuple[str, Path]) -> str:
369369
"Proposal 0052 implementation attribution; lands in PR 3 of v0.12.0"
370370
),
371371
# ----- v0.12.0 cycle spec-pin bump (v0.45.0 -> v0.46.0) -------------
372-
# Proposal 0054 (per-invocation observer event drain, v0.46.0) —
373-
# six graph-engine fixtures introduce the accumulator-observer
374-
# behavior + ``invoke_drain_events_for`` node directive. Bundled
375-
# into v0.12.0 alongside 0048 (the §9.4 lifecycle pairing); lands
376-
# in PR 2b of this cycle.
372+
# Proposal 0054 (per-invocation observer event drain, v0.46.0):
373+
# six graph-engine fixtures introduce new directive shapes the
374+
# cross-capability parser does not model (``observers[].behavior``,
375+
# ``nodes.<name>.invoke_drain_events_for``, the accumulator-observer
376+
# contract, the per-node ``node_drain_summaries`` /
377+
# ``node_accumulator_snapshots`` assertion blocks). The python
378+
# implementation already ships
379+
# ``CompiledGraph.drain_events_for(invocation_id, *, timeout)`` in
380+
# v0.12.0 (conformance manifest 0054 = implemented); behavior is
381+
# pinned by ``tests/unit/test_drain.py`` (basic synchronization,
382+
# worker-NOT-cancelled-on-timeout, invocation-scope isolation,
383+
# zero-timeout non-blocking check, unknown id, negative + NaN
384+
# boundary rejection). Fixture-shape activation is queued for a
385+
# future PR slotted after the upcoming spec conformance-adapter
386+
# capability ratifies the directive vocabulary.
377387
"graph-engine/028-drain-events-for-basic-synchronization": (
378-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
388+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
379389
),
380390
"graph-engine/029-drain-events-for-snapshot-semantic": (
381-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
391+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
382392
),
383393
"graph-engine/030-drain-events-for-timeout": (
384-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
394+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
385395
),
386396
"graph-engine/031-drain-events-for-invocation-scope": (
387-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
397+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
388398
),
389399
"graph-engine/032-drain-events-for-fan-out-coverage": (
390-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
400+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
391401
),
392402
"graph-engine/033-drain-events-for-parallel-branches-coverage": (
393-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
403+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
394404
),
395405
}
396406

0 commit comments

Comments
 (0)