Skip to content

Commit 850a806

Browse files
Add per-invocation observer event drain primitive
CompiledGraph.drain_events_for(invocation_id, *, timeout=5.0) gives a terminal node a way to block until every event dispatched for the in-flight invocation has been delivered to every attached observer, then proceed -- typically followed by a read against a queryable observer accumulator's per-invocation bucket whose state the drain has now caught up to. Without this, the deliver loop may still hold not-yet-dispatched events at the moment the terminal node reads, and the read silently undercounts. The implementation hangs on the existing per-invocation _DrainCounters: drain_events_for snapshots dispatched at call time, registers a (target, Future) pair on a new drain_wakers list, and awaits the Future via asyncio.wait_for. The deliver loop fulfils any waker whose target has been reached after each delivered increment. On timeout the waker is removed from the list and the partial summary is returned -- and crucially, the worker is NOT cancelled. This is the load-bearing difference from the process-wide drain: drain is shutdown semantics and cancels its workers; drain_events_for is in-flight synchronization and leaves them running so the graph keeps serving other invocations. Subgraph descents share the parent's _DrainCounters, so a drain on the outermost invocation_id covers fan-out instance events and parallel-branches branch events for free. Ten unit tests mirror the spec fixtures' case shapes: basic synchronization (028), snapshot semantic / no-deadlock on the calling node's own completed event (029), worker-not-cancelled-on-timeout (030, the load-bearing divergence), invocation-scope isolation (031), fan-out coverage (032), parallel-branches coverage (033), plus zero-timeout non-blocking check, unknown id, negative + NaN boundary rejection. Conformance manifest 0054 flips to implemented since 0.12.0. The six graph-engine fixtures (028-033) stay deferred from the cross-capability parser pending the upcoming conformance-adapter capability spec to ratify the directive vocabulary. Coord context: discuss-per-invocation-event-drain thread, 02-spec-accepted-as-0054 settling the five spec questions (section, name, default timeout, summary shape, resume interaction) and flagging the worker-cancellation divergence explicitly.
1 parent 7a3e0b2 commit 850a806

7 files changed

Lines changed: 589 additions & 31 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
1616

1717
### Added
1818

19+
- **`CompiledGraph.drain_events_for(invocation_id, *, timeout=5.0) -> DrainSummary`** (proposal 0054, spec graph-engine §6 *Per-invocation drain*, v0.46.0). The architectural pair to proposal 0048's §9.4 queryable observer accumulator lifecycle: a terminal node calling `await graph.drain_events_for(state.invocation_id)` blocks until every event dispatched for that invocation has reached every attached observer, typically followed by a read against a queryable observer accumulator whose bucket the drain has now caught up to. Snapshot semantic: the drain awaits the events dispatched as of call time; new emissions after the call are out of scope. Reuses the existing `DrainSummary` shape verbatim — no new `InvocationDrainSummary` variant. **Load-bearing divergence from `drain()`**: a per-invocation drain timeout MUST NOT cancel the delivery worker, in contrast to `drain()`'s shutdown semantics. The graph stays serving other invocations after the timeout fires; the deliver loop keeps processing the queue. Default timeout is `5.0` seconds; `None` waits indefinitely; `0.0` is a non-blocking check. Negative or `NaN` timeout raises `ValueError` at the API boundary. Unknown `invocation_id` (already drained or never started) returns an empty summary, not an error.
1920
- **`get_invocation_metadata()` read-symmetric API** (proposal 0048, observability §3.4, spec v0.40.0). The canonical spec-idiomatic public name for the §3.4 read access pairs with `set_invocation_metadata()` on the write side: same function object as the historical `current_invocation_metadata`, exposed for callers wishing to use the symmetric `get_/set_` naming. Returns the `MappingProxyType` snapshot of the current async context's view (caller baseline + in-node augments), or the empty mapping outside any active invocation. Read-only — callers MUST NOT mutate it. Both names are now exported from `openarmature.observability`; existing `current_invocation_metadata` callers continue to work unchanged.
2021
- **`docs/concepts/observability.md` §9 *Queryable observer pattern*** documents the convention-only observer-attached read methods that proposal 0048 §9 blesses: how to add a `get_*` read method to a custom observer (§9.1), the async-safety contract for concurrent reads under in-flight delivery (§9.2), the three-channel data-access guidance (typed State / untyped invocation metadata / queryable observer accumulator, §9.3, with a side-by-side table), and the lifecycle / explicit `drop(invocation_id)` discipline (§9.4). No new abstract surface on `Observer` per the spec — the pattern is convention-only and exists to bless the existing observer-state read shape used in production code.
2122
- **Production observability example.** `examples/production-observability/` demonstrates the production-grade observability stack end-to-end: `OTelObserver` + `LangfuseObserver` attached to the same graph (proposal 0031), `trace_input_from_state` / `trace_output_from_state` caller hooks on the Langfuse observer (proposal 0043 §8.4.1) deriving domain dicts from State, the built-in `TimingMiddleware` recording per-node duration via an `on_complete` callback, and `invoke(metadata={...})` carrying multi-tenant identifiers (tenantId / requestId / featureFlag) that both observers pick up at once. `InMemoryLangfuseClient` + `InMemorySpanExporter` capture in-process so the demo prints what each backend would have ingested without needing real production credentials.

conformance.toml

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,50 @@ status = "textual-only"
337337
since = "0.12.0"
338338

339339
# Spec v0.46.0 (proposal 0054). Per-invocation observer event drain
340-
# (``drain_events_for(invocation_id, *, timeout) -> DrainSummary``).
341-
# Bundled into v0.12.0 per the 2026-06-03 decision in the
342-
# proposal-0048-implementation coord thread — 0054 directly resolves
343-
# the §9.4 accumulator-lifecycle synchronization race that 0048's
344-
# queryable observer pattern would otherwise expose. Lands in
345-
# PR 2b (after this PR's 0048 implementation).
340+
# (``CompiledGraph.drain_events_for(invocation_id, *, timeout) ->
341+
# DrainSummary``). Shipped in v0.12.0 — the architectural pair to
342+
# 0048's §9.4 queryable observer accumulator lifecycle. Engine
343+
# implementation: per-invocation waker Futures on the existing
344+
# ``_DrainCounters``; ``deliver_loop`` fulfils waiters whose target
345+
# delivered-count has been reached. Reuses the existing
346+
# ``DrainSummary`` shape verbatim per the
347+
# discuss-per-invocation-event-drain coord thread's Q4 direction.
348+
# Default timeout is 5.0 seconds (recommended-value nudge per the
349+
# coord-thread Q3 direction; spec leaves the default to the
350+
# implementation).
351+
#
352+
# Key divergence from process-wide ``drain()``: per-invocation drain
353+
# MUST NOT cancel the deliver worker on timeout (the graph remains
354+
# active and other invocations may still be in flight); ``drain()``
355+
# cancels because it's a shutdown primitive.
356+
#
357+
# Conformance fixtures (graph-engine/028-033) stay deferred from the
358+
# cross-capability parser pending the upcoming spec conformance-
359+
# adapter capability — same Path A reasoning as the 0048 fixtures.
360+
# Behavior is pinned by unit tests:
361+
# - basic synchronization (mirrors fixture 028):
362+
# ``tests/unit/test_drain.py::
363+
# test_drain_events_for_basic_synchronization``;
364+
# - snapshot semantic / no-deadlock-on-own-completed-event
365+
# (mirrors fixture 029):
366+
# ``::test_drain_events_for_snapshot_semantic_does_not_wait_for_own_completed_event``;
367+
# - worker-NOT-cancelled-on-timeout (mirrors fixture 030, the
368+
# load-bearing divergence-from-drain contract):
369+
# ``::test_drain_events_for_timeout_does_not_cancel_worker``;
370+
# - invocation-scope isolation (mirrors fixture 031):
371+
# ``::test_drain_events_for_invocation_scope_isolation``;
372+
# - fan-out coverage (mirrors fixture 032; subgraph descents share
373+
# parent counters):
374+
# ``::test_drain_events_for_covers_fan_out_instance_events``;
375+
# - parallel-branches coverage (mirrors fixture 033):
376+
# ``::test_drain_events_for_covers_parallel_branches_events``;
377+
# - zero-timeout non-blocking check:
378+
# ``::test_drain_events_for_zero_timeout_is_non_blocking_check``;
379+
# - unknown invocation_id returns clean summary:
380+
# ``::test_drain_events_for_unknown_invocation_returns_clean_summary``;
381+
# - negative + NaN timeout rejected at boundary:
382+
# ``::test_drain_events_for_rejects_negative_timeout``,
383+
# ``::test_drain_events_for_rejects_nan_timeout``.
346384
[proposals."0054"]
347-
status = "not-yet"
385+
status = "implemented"
386+
since = "0.12.0"

docs/concepts/observability.md

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -583,18 +583,14 @@ async def persist(state: PipelineState) -> Mapping[str, Any]:
583583

584584
`drain_events_for` is symmetric with the existing process-wide
585585
`graph.drain()` but scoped to one invocation. Returns the same
586-
`DrainSummary` shape, with the same timeout discipline.
587-
588-
!!! info "drain_events_for ships in v0.12.0 alongside the read API"
589-
`CompiledGraph.drain_events_for` is the spec §6 / proposal 0054
590-
pair to the §9.4 accumulator lifecycle described above. The two
591-
proposals are bundled into the v0.12.0 release cycle as
592-
architecturally paired: without the per-invocation drain, the
593-
accumulator pattern would race against the deliver loop on the
594-
last-event read. If you are reading this from a pre-v0.12.0
595-
install, the primitive is not yet present; the docs document the
596-
pattern's complete shape so the v0.12.0 upgrade is a straight
597-
drop-in.
586+
`DrainSummary` shape with the same timeout discipline, but with one
587+
load-bearing divergence: a per-invocation drain timeout MUST NOT
588+
cancel the delivery worker. `graph.drain()` cancels because it is a
589+
shutdown primitive; per-invocation drain is an in-flight
590+
synchronization primitive, so the graph stays available to serve
591+
other invocations after the timeout fires, and the deliver loop
592+
keeps processing the queue. The default timeout is `5.0` seconds;
593+
pass `None` to wait indefinitely, or `0.0` for a non-blocking check.
598594

599595
## OpenTelemetry mapping (opt-in)
600596

src/openarmature/graph/compiled.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,98 @@ async def drain(self, timeout: float | None = None) -> DrainSummary:
773773

774774
return DrainSummary(undelivered_count=undelivered, timeout_reached=timeout_reached)
775775

776+
# Spec graph-engine §6 *Per-invocation drain* (proposal 0054).
777+
# Symmetric with the process-wide ``drain`` method on the same
778+
# class but scoped to one in-flight invocation, with one
779+
# spec-mandated divergence: the per-invocation primitive MUST
780+
# NOT cancel the deliver worker on timeout (drain is shutdown
781+
# semantics; this is in-flight synchronization). The snapshot
782+
# semantic — events dispatched after the call begins do not
783+
# extend the target — is what keeps an in-node call (e.g., a
784+
# terminal node draining its own invocation before reading a
785+
# queryable observer accumulator) from deadlocking on its own
786+
# ``completed`` event.
787+
async def drain_events_for(
788+
self,
789+
invocation_id: str,
790+
*,
791+
timeout: float | None = 5.0,
792+
) -> DrainSummary:
793+
"""Await delivery of every observer event tagged with
794+
``invocation_id`` that was dispatched before this call returns,
795+
optionally bounded by ``timeout``.
796+
797+
Use this from a terminal node body to synchronize on the
798+
observer event stream before reading derived observer state
799+
(a queryable accumulator's per-invocation bucket, a latency
800+
rollup, a token-usage record). The drain blocks until every
801+
event dispatched up to the moment of the call has reached
802+
every attached observer, then returns.
803+
804+
Snapshot semantic: the drain awaits the events dispatched as
805+
of call time. Events emitted after the call begins (including
806+
the calling node's own ``started`` / ``completed`` pair) are
807+
out of scope. This is what allows an in-node call to avoid
808+
deadlocking on its own completed event.
809+
810+
``timeout`` is a non-negative duration in seconds (default
811+
``5.0``). ``None`` waits indefinitely. ``timeout=0.0`` is a
812+
non-blocking check: returns immediately whether the snapshot
813+
target was met. Raises :class:`ValueError` on negative or
814+
``NaN`` input.
815+
816+
On timeout the deliver worker is left running. The compiled
817+
graph stays available to serve other invocations after a
818+
per-invocation drain times out; the deliver loop continues
819+
processing the queue, including the events the timed-out
820+
caller failed to await. This is the load-bearing difference
821+
from :meth:`drain`, which cancels its workers.
822+
823+
Returns a :class:`DrainSummary` with ``undelivered_count`` and
824+
``timeout_reached``. On the clean path both are zero / false;
825+
on timeout ``undelivered_count`` is the snapshot target minus
826+
the deliver loop's current ``delivered`` count for this
827+
invocation. Unknown ``invocation_id`` (no active worker, or
828+
the invocation has already drained and the worker has exited)
829+
returns an empty summary — not an error.
830+
831+
Interaction with :meth:`drain`: if process-wide ``drain`` is
832+
called while a per-invocation drain is pending, ``drain``'s
833+
shutdown semantics take precedence. The deliver worker is
834+
cancelled, its remaining events are not delivered, and the
835+
per-invocation waker's target may never be reached. The
836+
per-invocation call then blocks until its own ``timeout``
837+
fires and returns ``timeout_reached=True``. Mixing the two
838+
primitives in the same shutdown path is unusual; use
839+
``drain`` for lifespan / shutdown coordination and
840+
``drain_events_for`` for in-flight synchronization.
841+
"""
842+
if timeout is not None and not (timeout >= 0):
843+
raise ValueError(f"drain_events_for timeout must be non-negative, got {timeout!r}")
844+
845+
target_context: _InvocationContext | None = None
846+
for context in self._active_workers.values():
847+
if context.invocation_id == invocation_id:
848+
target_context = context
849+
break
850+
if target_context is None:
851+
return DrainSummary(undelivered_count=0, timeout_reached=False)
852+
853+
counters = target_context.drain_counters
854+
snapshot_target = counters.dispatched
855+
if counters.delivered >= snapshot_target:
856+
return DrainSummary(undelivered_count=0, timeout_reached=False)
857+
858+
waker: asyncio.Future[None] = asyncio.get_running_loop().create_future()
859+
counters.drain_wakers.append((snapshot_target, waker))
860+
try:
861+
await asyncio.wait_for(waker, timeout=timeout)
862+
except TimeoutError:
863+
counters.drain_wakers = [(t, f) for t, f in counters.drain_wakers if f is not waker]
864+
undelivered = max(0, snapshot_target - counters.delivered)
865+
return DrainSummary(undelivered_count=undelivered, timeout_reached=True)
866+
return DrainSummary(undelivered_count=0, timeout_reached=False)
867+
776868
# ------------------------------------------------------------------
777869
# Public invocation
778870
# ------------------------------------------------------------------

src/openarmature/graph/observer.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,17 @@ class _QueuedItem:
285285
class _DrainCounters:
286286
dispatched: int = 0
287287
delivered: int = 0
288+
# Per spec graph-engine §6 *Per-invocation drain* (proposal 0054):
289+
# ``drain_events_for(invocation_id, *, timeout)`` callers register
290+
# ``(target_delivered_count, Future)`` pairs here; the deliver
291+
# loop fulfils any whose target has been reached after each
292+
# ``delivered`` increment. The list is touched only from the
293+
# single event-loop task running ``deliver_loop`` plus the
294+
# caller of ``drain_events_for`` — no cross-thread access — so a
295+
# plain list is sufficient.
296+
drain_wakers: list[tuple[int, asyncio.Future[None]]] = field(
297+
default_factory=list[tuple[int, asyncio.Future[None]]]
298+
)
288299

289300

290301
# Spec: realizes graph-engine §6 Drain summary return shape (proposal
@@ -877,6 +888,23 @@ async def deliver_loop(
877888
# filtered out for every observer is still considered
878889
# delivered (we did all the work there was to do for it).
879890
counters.delivered += 1
891+
# Per spec §6 *Per-invocation drain* (proposal 0054): wake any
892+
# ``drain_events_for`` waiter whose ``target_delivered_count``
893+
# has been reached. Mutate the list in place; the only other
894+
# toucher is ``drain_events_for`` itself, running in the same
895+
# event-loop task family. The ``not fut.done()`` guard absorbs
896+
# the case where the waiter's own ``asyncio.wait_for`` timed
897+
# out and cancelled the Future before the deliver loop got
898+
# here.
899+
if counters.drain_wakers:
900+
still_pending: list[tuple[int, asyncio.Future[None]]] = []
901+
for target, fut in counters.drain_wakers:
902+
if counters.delivered >= target:
903+
if not fut.done():
904+
fut.set_result(None)
905+
continue
906+
still_pending.append((target, fut))
907+
counters.drain_wakers = still_pending
880908

881909

882910
__all__ = [

tests/conformance/test_fixture_parsing.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -369,28 +369,38 @@ def _id(case: tuple[str, Path]) -> str:
369369
"Proposal 0052 implementation attribution; lands in PR 3 of v0.12.0"
370370
),
371371
# ----- v0.12.0 cycle spec-pin bump (v0.45.0 -> v0.46.0) -------------
372-
# Proposal 0054 (per-invocation observer event drain, v0.46.0) —
373-
# six graph-engine fixtures introduce the accumulator-observer
374-
# behavior + ``invoke_drain_events_for`` node directive. Bundled
375-
# into v0.12.0 alongside 0048 (the §9.4 lifecycle pairing); lands
376-
# in PR 2b of this cycle.
372+
# Proposal 0054 (per-invocation observer event drain, v0.46.0):
373+
# six graph-engine fixtures introduce new directive shapes the
374+
# cross-capability parser does not model (``observers[].behavior``,
375+
# ``nodes.<name>.invoke_drain_events_for``, the accumulator-observer
376+
# contract, the per-node ``node_drain_summaries`` /
377+
# ``node_accumulator_snapshots`` assertion blocks). The python
378+
# implementation already ships
379+
# ``CompiledGraph.drain_events_for(invocation_id, *, timeout)`` in
380+
# v0.12.0 (conformance manifest 0054 = implemented); behavior is
381+
# pinned by ``tests/unit/test_drain.py`` (basic synchronization,
382+
# worker-NOT-cancelled-on-timeout, invocation-scope isolation,
383+
# zero-timeout non-blocking check, unknown id, negative + NaN
384+
# boundary rejection). Fixture-shape activation is queued for a
385+
# future PR slotted after the upcoming spec conformance-adapter
386+
# capability ratifies the directive vocabulary.
377387
"graph-engine/028-drain-events-for-basic-synchronization": (
378-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
388+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
379389
),
380390
"graph-engine/029-drain-events-for-snapshot-semantic": (
381-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
391+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
382392
),
383393
"graph-engine/030-drain-events-for-timeout": (
384-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
394+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
385395
),
386396
"graph-engine/031-drain-events-for-invocation-scope": (
387-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
397+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
388398
),
389399
"graph-engine/032-drain-events-for-fan-out-coverage": (
390-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
400+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
391401
),
392402
"graph-engine/033-drain-events-for-parallel-branches-coverage": (
393-
"Proposal 0054 per-invocation drain; lands in PR 2b of v0.12.0"
403+
"Proposal 0054 fixture-shape models pending; contract pinned by unit tests"
394404
),
395405
}
396406

0 commit comments

Comments
 (0)