-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobserver.py
More file actions
739 lines (654 loc) · 33.8 KB
/
Copy pathobserver.py
File metadata and controls
739 lines (654 loc) · 33.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
"""Observer hooks: protocol, subscription, delivery queue, per-invocation context.
Each node attempt produces a started/completed event pair, and
observers register with an optional ``phases`` set so they can
subscribe to one phase or both. The graph never awaits
observer processing.
This module defines:
- `Observer`: the callable shape an observer satisfies.
- `SubscribedObserver`: pairs an `Observer` with the phase set it
subscribes to. Public; users construct one directly when passing
phase-filtered observers to `invoke(observers=...)`.
- `RemoveHandle`: returned by `CompiledGraph.attach_observer` so the
caller can detach later.
- `_InvocationContext`: the cross-graph state threaded through one
outermost-invocation, including any nested subgraphs. Carries the
queue, observer chain (graph-attached, outermost → innermost) and the
invocation-scoped observers, plus a shared step counter, namespace
prefix, and parent-state stack.
- `_QueuedItem`: an event paired with its delivery observer list.
- `_dispatch`: enqueues an event for the worker to deliver.
- `deliver_loop`: the worker coroutine. Reads items from the queue and
calls each observer in order, filtering by subscribed phase and
isolating exceptions via `warnings.warn` per spec.
"""
from __future__ import annotations
import asyncio
import inspect
import warnings
from collections.abc import Iterable
from dataclasses import dataclass, field
from typing import Any, Literal, Protocol
from .events import NodeEvent
from .state import State
class Observer(Protocol):
"""The shape of a callable that receives node-boundary events.
`Observer` is a structural Protocol; any async callable matching the
signature qualifies, no subclass required. Plain functions, bound
methods, and class instances with `__call__` all work::
async def log_observer(event: NodeEvent) -> None:
print(event.node_name, event.phase)
compiled.attach_observer(log_observer)
Contract:
- Observers MUST be async so the delivery queue can await each
one and coordinate ordering. The graph itself never awaits
observers.
- Observers MUST NOT alter state, routing, or any other aspect
of the graph run. Read-only side effects (logging, metrics,
span emission) only.
The event parameter is positional-only (`event, /`) so structural
conformance doesn't pin you to that name; any of `event`, `_event`,
`e`, etc. matches.
Optional ``prepare_sync`` extension
-----------------------------------
An observer MAY additionally define a synchronous method::
def prepare_sync(self, event: NodeEvent, /) -> None: ...
that the engine calls IN THE ENGINE TASK, BEFORE queueing the
event for the async ``__call__``. This exists for observers that
need to set up state (e.g., open a span and stash a handle in a
ContextVar) that the engine itself must read synchronously
before running the node body (otherwise logs emitted on the
first line of the body wouldn't see the right span).
``prepare_sync`` is **opt-in via ``hasattr``**; no subclass or
Protocol method required. Observers that don't define it skip
the synchronous prep entirely; observers that do define it run
only for ``"started"``-phase events, with errors warned-not-
propagated (same isolation contract as the async path).
"""
async def __call__(self, event: NodeEvent, /) -> None: ...
# Per spec v0.6.0 §6: the two valid phase strings. Used as the default
# subscription set when a caller doesn't restrict by phase.
# Default subscription — what a bare ``Observer`` callable receives
# without an explicit ``phases`` argument. Stays ``{"started",
# "completed"}`` so legacy observers don't unexpectedly receive
# checkpoint events. Subscribing to ``"checkpoint_saved"`` is opt-in.
ALL_PHASES: frozenset[str] = frozenset({"started", "completed"})
# All phase values the engine produces (per spec graph-engine §6 +
# pipeline-utilities §10.8 + proposal 0014 §6 cross-ref). Used by
# the registration-time validator to reject typos like
# ``phases={"complete"}``.
#
# The two synthetic phases (``checkpoint_saved`` and
# ``checkpoint_migrated``) repurpose the ``NodeEvent`` shape for
# non-node events — see the ``NodeEvent`` docstring for conventions.
# Both are opt-in via explicit ``phases={...}``; the default
# subscription ``ALL_PHASES`` above is ``{"started", "completed"}``
# only, so legacy observers never receive them.
KNOWN_PHASES: frozenset[str] = frozenset({"started", "completed", "checkpoint_saved", "checkpoint_migrated"})
@dataclass(frozen=True)
class SubscribedObserver:
"""An observer paired with its phase subscription set.
Observers register with an optional ``phases`` parameter naming
the phase strings they want to receive. The default is
``ALL_PHASES``, historically named when there were only two
phases; it now means "the default subscription"
(``{"started", "completed"}``). The ``"checkpoint_saved"`` phase
is opt-in: subscribe to it explicitly via
``phases={"checkpoint_saved"}`` (or include it in a custom set).
``KNOWN_PHASES`` is the full "every phase the engine can produce"
set used by the registration-time validator.
Empty phase sets are forbidden; passing one raises
``ValueError`` at registration time so misconfiguration surfaces
immediately.
Construct one of these directly when handing phase-filtered
observers to ``CompiledGraph.invoke(observers=...)``. For the
single-observer ``attach_observer`` path, pass ``phases=`` as a
keyword argument and the engine wraps it for you.
"""
observer: Observer
phases: frozenset[str] = ALL_PHASES
def __post_init__(self) -> None:
if not self.phases:
raise ValueError("phases must be non-empty")
invalid = self.phases - KNOWN_PHASES
if invalid:
raise ValueError(f"unknown phase(s): {sorted(invalid)}; allowed: {sorted(KNOWN_PHASES)}")
def _coerce_subscribed(
observer: Observer | SubscribedObserver,
*,
phases: Iterable[str] | None = None,
) -> SubscribedObserver:
"""Normalize a registration argument into a `SubscribedObserver`.
- A bare `Observer` callable becomes a `SubscribedObserver` with
either the supplied `phases` or `ALL_PHASES` (the default
subscription, `{"started", "completed"}`; subscribing to
`"checkpoint_saved"` is opt-in via an explicit ``phases``).
- An existing `SubscribedObserver` passes through unchanged; supplying
a `phases` kwarg in that case is a misuse and raises.
"""
if isinstance(observer, SubscribedObserver):
if phases is not None:
raise ValueError("cannot override phases on a SubscribedObserver; construct a new one")
return observer
return SubscribedObserver(
observer=observer,
phases=frozenset(phases) if phases is not None else ALL_PHASES,
)
@dataclass(frozen=True)
class RemoveHandle:
"""Returned by ``CompiledGraph.attach_observer``. Call
``.remove()`` to detach the observer. Idempotent: calling
``.remove()`` after the observer is already detached is a no-op.
Changes to the registered observer set during a graph run do NOT
take effect until the next invocation.
"""
_observers: list[SubscribedObserver]
_observer: SubscribedObserver
def remove(self) -> None:
"""Detach the observer from its compiled graph. Idempotent: a
second call is a no-op rather than an error. The change takes
effect on the next ``invoke()``; in-flight invocations keep
the observer set they started with."""
try:
self._observers.remove(self._observer)
except ValueError:
# Idempotency: the observer is already detached. Per the
# docstring, a second .remove() call is a no-op rather than
# an error.
pass
@dataclass(frozen=True)
class _QueuedItem:
"""An event paired with the exact ordered observer list that should
receive it. The list is computed at dispatch time so events from
different depths in nested subgraphs carry the correct observer chain
without the worker needing to know the graph topology.
"""
event: NodeEvent
observers: tuple[SubscribedObserver, ...]
# A sentinel value the engine puts on the queue to signal the worker to
# return after draining the events ahead of it. None is unambiguous —
# observers receive `NodeEvent` instances, never None.
_DRAIN_SENTINEL = None
# Spec: realizes graph-engine §6 Drain undelivered-count bookkeeping
# (proposal 0010). Per-invocation mutable counters; `_dispatch` bumps
# `dispatched` after a successful `queue.put_nowait`; `deliver_loop`
# bumps `delivered` after the per-event observer for-loop completes.
# `undelivered = dispatched - delivered` at any point in time — and
# specifically at `CompiledGraph.drain()` cancellation time when the
# timeout has elapsed and pending workers' counters get summed into
# the returned `DrainSummary`.
@dataclass
class _DrainCounters:
dispatched: int = 0
delivered: int = 0
# Spec: realizes graph-engine §6 Drain summary return shape (proposal
# 0010). The two declared fields are the spec-mandated minimum;
# implementations MAY add richer detail in future PRs (per-observer
# counts, sampled event metadata) without breaking the v0.19.0 shape.
@dataclass(frozen=True)
class DrainSummary:
"""Outcome of a `CompiledGraph.drain()` call.
Returned from `drain()` regardless of whether a `timeout` was
supplied. When no timeout was supplied, or the timeout did not
fire, `undelivered_count == 0` and `timeout_reached is False`.
When the timeout fired, `undelivered_count` reports the number of
events that were dispatched to the delivery worker but not fully
delivered to every subscribed observer before cancellation, and
`timeout_reached is True`.
The spec-mandated minimum is these two fields. Implementations MAY
extend the shape with diagnostic detail (per-observer counts,
sampled event metadata) in subsequent versions; v0.19.0 ships the
minimum.
"""
undelivered_count: int
timeout_reached: bool
# Spec: realizes pipeline-utilities §10.11 per-instance progress
# tracking in the engine. These are the MUTABLE internal-state
# counterparts to the FROZEN public ``FanOutProgress`` /
# ``FanOutInstanceProgress`` shapes the saved CheckpointRecord exposes.
# ``_maybe_save_checkpoint`` projects this mutable state into the
# frozen public shape when building a record.
@dataclass
class _FanOutInstanceState:
"""Mutable per-instance state inside a fan-out, updated by the
engine as the instance progresses. ``state`` transitions
not_started -> in_flight -> completed.
- ``result`` holds the per-instance contribution to the fan-out
accumulator, set when ``state == "completed"``. Per spec
§10.11 this is "the value contributed to the ``target_field``
bucket" (success path) or "the error entry contributed to the
``errors_field`` bucket" (collect-mode failure). The harness
projects this into the frozen ``FanOutInstanceProgress.result``
verbatim.
- ``result_is_error`` distinguishes success contributions
(``False``) from collect-mode error contributions (``True``).
Internal flag — not exposed on the public
``FanOutInstanceProgress`` shape because the spec presents
``result`` as a single typed entry per the parent state schema.
``FanOutNode.run_with_context`` consults this on resume to
route the rolled-forward contribution through the
``errors_field`` bucket rather than ``target_field``.
- ``extra_outputs`` holds the per-instance values for the fan-out's
``extra_outputs`` mapping (parent-field -> sub-field) so that
per-instance resume preserves the FULL per-instance contribution
(not just the ``target_field`` slice). Internal — not exposed on
the public ``FanOutInstanceProgress`` shape because the spec
describes ``result`` as a single accumulator entry.
- ``completed_inner_positions`` accumulates ``NodePosition`` entries
from inner nodes that complete inside this instance's subgraph
execution. Captures the instance's progress for observational
purposes when an in_flight save snapshot fires; not used as a
resume re-entry point (the instance re-enters at its subgraph's
declared entry node per §10.7).
"""
state: Literal["completed", "in_flight", "not_started"] = "not_started"
result: Any = None
result_is_error: bool = False
extra_outputs: dict[str, Any] = field(default_factory=dict[str, Any])
completed_inner_positions: list[Any] = field(default_factory=list[Any]) # list[NodePosition]
@dataclass
class _FanOutExecutionState:
"""Mutable per-fan-out execution state. One entry per in-flight
fan-out node in the invocation; lives on
``_InvocationContext.fan_out_progress_state`` keyed by
``(namespace, fan_out_node_name)``. The namespace component
disambiguates same-named fan-outs in different subgraph descents.
"""
fan_out_node_name: str
namespace: tuple[str, ...]
instance_count: int
instances: list[_FanOutInstanceState]
@dataclass
class _InvocationContext:
"""Per-invocation state threaded through the engine and into subgraphs.
Mutable: the step counter increments. The observer chain extends as
subgraphs are entered. New child contexts are produced via
`descend_into_subgraph` and share the same queue + step counter; the
namespace and parent-state stacks are extended by-value.
"""
queue: asyncio.Queue[_QueuedItem | None]
# Graph-attached observers in delivery order: outermost graph first,
# nested subgraph attached observers appended as we descend.
graph_attached: tuple[SubscribedObserver, ...]
# Set once at the outermost invoke; carried unchanged into subgraphs.
invocation_scoped: tuple[SubscribedObserver, ...]
# Shared mutable single-element list — a simple way to share an int by
# reference across recursive subgraph contexts without leaking a class.
step_counter: list[int] = field(default_factory=lambda: [0])
namespace_prefix: tuple[str, ...] = ()
parent_states_prefix: tuple[State, ...] = ()
# Per observability §5.3 + the coord-thread `clarify-subgraph-name-
# semantics` resolution. Parallel to ``namespace_prefix`` — index
# ``i`` is the compiled-subgraph identity for the wrapper at
# ``namespace_prefix[i]``, or ``None`` for wrappers constructed
# without an identity. Used by observers to emit
# ``metadata.subgraph_name`` (Langfuse) and
# ``openarmature.subgraph.name`` (OTel) on the wrapper observation
# / span at each depth. The chain shape lets nested subgraphs
# carry distinct identities at distinct depths even though
# v0.10.0's conformance fixtures only exercise single-level
# nesting.
subgraph_identities: tuple[str | None, ...] = ()
# Per pipeline-utilities §9 + graph-engine §6: nodes inside a
# fan-out instance fire events tagged with the instance's 0-based
# index. Set when descending into a fan-out instance, inherited
# unchanged through any further subgraph descents inside that
# instance, and absent (None) for nodes outside any fan-out.
fan_out_index: int | None = None
# ----------------------------------------------------------------
# Checkpointing fields (spec pipeline-utilities §10)
#
# ``invocation_id`` and ``correlation_id`` are minted once at the
# outermost ``invoke`` call (or restored from a saved record on
# resume) and propagated unchanged through every descent. The
# checkpointer reference is set when a backend is registered; it
# is intentionally **None inside fan-out instances** so per-instance
# internal saves are gated off (§10.7 atomic-restart). The mutable
# ``completed_positions`` list is shared across descents so the
# save call sites can append the just-completed position before
# the engine's next step. ``resume_skip_set`` is a frozen set of
# namespace tuples whose corresponding nodes have already
# completed in a prior run and MUST be skipped on this resumed
# invocation.
# ----------------------------------------------------------------
invocation_id: str = ""
correlation_id: str = ""
checkpointer: Any = None # Checkpointer | None; typed Any to avoid an
# import cycle between graph and checkpoint packages.
completed_positions: list[Any] = field(default_factory=list[Any]) # list[NodePosition]
resume_skip_set: frozenset[tuple[str, ...]] = field(default_factory=lambda: frozenset[tuple[str, ...]]())
# The invocation_id we LOADED FROM on a resumed run — distinct from
# ``invocation_id`` (the freshly-minted id for this resumed run per
# §10.4 step 4). ``None`` outside the resume path. Threaded through
# so inner-descent state-validation failures can populate
# CheckpointRecordInvalid with the source record's id.
resume_invocation: str | None = None
# Resume-with-saved-inner-state plumbing: when the loaded record's
# latest save fired from inside a subgraph (parent_states populated),
# the engine restores the OUTER state from parent_states[0] but ALSO
# needs the saved inner state(s) when re-descending into the
# in-flight subgraph(s). This map is keyed by descent depth — depth
# 1 = first subgraph level, depth 2 = nested two deep, etc. The
# subgraph descent path consumes (pops) its matching depth before
# falling back to the normal projection. After consumption, fresh
# descents at the same depth project as usual. Shared mutable dict
# propagates across descents.
pending_resume_states: dict[int, Any] = field(default_factory=dict[int, Any])
# Per spec §10.11: mutable per-fan-out progress tracking. Keyed by
# ``(namespace, fan_out_node_name)`` — disambiguates same-named
# fan-outs in different subgraph descents. ``FanOutNode`` populates
# entries before descending into instances; updates state as
# instances progress; the entry stays in the dict for the duration
# of the fan-out so concurrent saves see consistent sibling state.
# ``_maybe_save_checkpoint`` projects this into the frozen
# ``FanOutProgress`` shape on the saved CheckpointRecord.
fan_out_progress_state: dict[tuple[tuple[str, ...], str], _FanOutExecutionState] = field(
default_factory=dict[tuple[tuple[str, ...], str], _FanOutExecutionState]
)
# Per spec §6 Drain (proposal 0010): shared mutable counters that
# the worker reads at drain-cancel time to report undelivered events
# in the returned ``DrainSummary``. Subgraphs share the parent's
# counters because subgraphs share the parent's queue + worker, so
# the parent context's counts naturally cover subgraph events.
drain_counters: _DrainCounters = field(default_factory=_DrainCounters)
# Per spec §10.2 (proposal 0028): the canonical source for
# ``CheckpointRecord.schema_version``. Set once at the outermost
# ``invoke`` to the compiled graph's declared state class
# (``CompiledGraph.state_cls``); propagated unchanged through every
# descent (subgraphs, fan-out instances, parallel branches). All
# save sites within an invocation MUST read ``schema_version`` from
# this class — NOT from ``type(state)`` at save time — so the
# value is consistent across the outer dispatch save, fan-out
# instance internal saves, and the fan-out node's own completion
# save. The distinction matters only when a user passes a State
# subclass that shadows ``schema_version``; the declared class is
# the only consistent choice for §10.12 migration lookups.
# ``Any`` rather than ``type[State]`` to avoid an import cycle
# between graph and observer; callers narrow at the read site.
state_cls: Any = None
def full_observers(self) -> tuple[SubscribedObserver, ...]:
"""Return the ordered observer list to deliver for events from
this depth. Per spec §6: graph-attached (outermost → innermost),
then invocation-scoped (passed to the outermost invoke)."""
return self.graph_attached + self.invocation_scoped
def descend_into_subgraph(
self,
subgraph_node_name: str,
parent_state: State,
sub_attached: tuple[SubscribedObserver, ...],
*,
subgraph_identity: str | None = None,
) -> _InvocationContext:
"""Build the context for a subgraph-as-node call.
The returned context shares the queue and step counter (so step
numbering is monotonic across the boundary) but has an extended
namespace prefix, parent-state stack, and graph-attached observer
chain. Invocation-scoped observers carry through unchanged.
``fan_out_index`` is inherited so a subgraph descent inside a
fan-out instance still tags inner events with the index.
Checkpointing fields propagate unchanged: subgraph-internal
nodes save to the same backend with the same invocation_id
(per spec §10.3; one save per inner-node completion).
"""
return _InvocationContext(
queue=self.queue,
graph_attached=self.graph_attached + sub_attached,
invocation_scoped=self.invocation_scoped,
step_counter=self.step_counter,
namespace_prefix=self.namespace_prefix + (subgraph_node_name,),
parent_states_prefix=self.parent_states_prefix + (parent_state,),
subgraph_identities=self.subgraph_identities + (subgraph_identity,),
fan_out_index=self.fan_out_index,
invocation_id=self.invocation_id,
correlation_id=self.correlation_id,
checkpointer=self.checkpointer,
completed_positions=self.completed_positions,
resume_skip_set=self.resume_skip_set,
pending_resume_states=self.pending_resume_states,
resume_invocation=self.resume_invocation,
fan_out_progress_state=self.fan_out_progress_state,
drain_counters=self.drain_counters,
state_cls=self.state_cls,
)
def descend_into_fan_out_instance(
self,
fan_out_node_name: str,
parent_state: State,
sub_attached: tuple[SubscribedObserver, ...],
fan_out_index: int,
*,
subgraph_identity: str | None = None,
) -> _InvocationContext:
"""Build the context for one fan-out instance's subgraph invocation.
Same shape as ``descend_into_subgraph`` but stamps the fan-out
index onto the new context so every inner-node event carries it.
Per spec §9 the index is the instance's 0-based position.
Per pipeline-utilities §10.3 (revised by proposal 0009): fan-out
instance internal nodes DO produce checkpoint saves. The
checkpointer reference propagates unchanged so an inner node's
``completed`` event triggers a save; the engine's save path
projects the shared ``fan_out_progress_state`` into the record's
per-instance progress field. ``resume_skip_set`` is dropped:
inner-position skipping is governed by the per-instance
``completed_inner_positions`` field on the loaded record's
``fan_out_progress`` entry, not by the outer skip-set (which
would conflate inner and outer positions otherwise).
"""
return _InvocationContext(
queue=self.queue,
graph_attached=self.graph_attached + sub_attached,
invocation_scoped=self.invocation_scoped,
step_counter=self.step_counter,
namespace_prefix=self.namespace_prefix + (fan_out_node_name,),
parent_states_prefix=self.parent_states_prefix + (parent_state,),
subgraph_identities=self.subgraph_identities + (subgraph_identity,),
fan_out_index=fan_out_index,
invocation_id=self.invocation_id,
correlation_id=self.correlation_id,
checkpointer=self.checkpointer,
completed_positions=self.completed_positions,
resume_skip_set=frozenset(),
pending_resume_states={},
resume_invocation=self.resume_invocation,
# Propagate the shared per-fan-out tracking dict so an
# inner-instance node can update its own entry and so the
# outer save sees consistent sibling state.
fan_out_progress_state=self.fan_out_progress_state,
drain_counters=self.drain_counters,
state_cls=self.state_cls,
)
def descend_into_parallel_branch(
self,
parallel_branches_node_name: str,
parent_state: State,
sub_attached: tuple[SubscribedObserver, ...],
) -> _InvocationContext:
"""Build the context for one parallel-branches branch's
subgraph invocation.
Per pipeline-utilities §11.6 the parallel-branches node looks
to outer middleware like a single dispatch; inner-branch
events come from the branch's subgraph execution. Stamps the
namespace prefix with the parallel-branches node name so
inner events nest under it (mirrors
``descend_into_fan_out_instance``'s namespace stamping).
Branch identity lives on the
``observability.correlation._branch_name_var`` ContextVar
rather than on the descend context; set inside the
branch's task closure so ``copy_context`` inherits it
through the subgraph's execution.
Per §11.9 / §10.7 atomic-restart: drops the checkpointer
and pending_resume_states (a crash mid-dispatch re-runs the
whole parallel-branches node from scratch on resume; the
branches' inner saves wouldn't be useful).
"""
return _InvocationContext(
queue=self.queue,
graph_attached=self.graph_attached + sub_attached,
invocation_scoped=self.invocation_scoped,
step_counter=self.step_counter,
namespace_prefix=self.namespace_prefix + (parallel_branches_node_name,),
parent_states_prefix=self.parent_states_prefix + (parent_state,),
# Parallel-branches don't reify a single inner subgraph
# identity at the wrapper position — each branch can hold a
# different subgraph — so we extend the chain with ``None``
# at this depth. Per-branch identity handling (if ever
# needed) is a future addition.
subgraph_identities=self.subgraph_identities + (None,),
fan_out_index=self.fan_out_index,
invocation_id=self.invocation_id,
correlation_id=self.correlation_id,
checkpointer=None,
completed_positions=self.completed_positions,
resume_skip_set=frozenset(),
pending_resume_states={},
resume_invocation=self.resume_invocation,
fan_out_progress_state=self.fan_out_progress_state,
drain_counters=self.drain_counters,
state_cls=self.state_cls,
)
def take_step(self) -> int:
"""Atomically (single-threaded asyncio) read-and-increment the
shared step counter. Returns the value to assign to the just-
executed node's event."""
n = self.step_counter[0]
self.step_counter[0] = n + 1
return n
def _dispatch(context: _InvocationContext, event: NodeEvent) -> None:
"""Enqueue a node event for the delivery worker.
For ``"started"``-phase events, also call any subscribed observer's
optional ``prepare_sync(event)`` synchronously — in the engine task,
BEFORE queueing — so observers that need to publish per-event state
the engine itself reads in the same engine-task scope (e.g., the
OTel observer setting ``current_active_observer_span`` for the
engine to attach into the OTel context) can do so before the node
body runs.
Phase-gated forwarding: ``prepare_sync`` only fires when ``"started"``
is in the subscribed observer's ``phases`` set, mirroring how the
async ``deliver_loop`` filters dispatch. A user who explicitly
subscribes only to ``{"completed"}`` doesn't get the synchronous
prep — the wrapper acts as a uniform phase shield across both
sync prep and async dispatch.
Errors from ``prepare_sync`` follow the same isolation contract
as the async path: don't propagate, don't break siblings, don't
block the queueing or subsequent events. Reported via
``warnings.warn``.
No-op when no observers exist for this depth — avoids paying the queue
overhead for graphs that don't observe anything.
"""
observers = context.full_observers()
if not observers:
return
if event.phase == "started":
for subscribed in observers:
if "started" not in subscribed.phases:
continue
prepare_sync = getattr(subscribed.observer, "prepare_sync", None)
if prepare_sync is None:
continue
try:
result = prepare_sync(event)
except Exception as e:
warnings.warn(
f"observer prepare_sync raised {type(e).__name__}: {e}",
stacklevel=2,
)
continue
if inspect.isawaitable(result):
# ``prepare_sync`` is opt-in via ``hasattr`` (not a
# Protocol method) so pyright can't catch a user's
# ``async def prepare_sync`` signature drift up front.
# The call here would silently return an unawaited
# coroutine — the prep work wouldn't run AND Python
# would emit a delayed "coroutine was never awaited"
# warning at GC time. Close the awaitable to suppress
# that secondary noise and surface the misconfiguration
# via our own explicit warn so it fails loudly at the
# call site. ``getattr`` rather than ``hasattr``+method
# access keeps pyright's strict-mode happy on the
# ``Awaitable`` type (``.close`` lives on
# ``Coroutine``, not the broader ``Awaitable``).
close_method = getattr(result, "close", None)
if close_method is not None:
try:
close_method()
except Exception as close_error:
# Cleanup is best-effort: a raise here MUST NOT
# propagate or block sibling observers. Surface
# via ``warnings.warn`` so the swallow is at
# least observable if it ever fires (CodeQL
# py/empty-except clears on this surface too).
warnings.warn(
f"observer prepare_sync close cleanup raised "
f"{type(close_error).__name__}: {close_error}",
stacklevel=2,
)
warnings.warn(
f"observer prepare_sync returned an awaitable "
f"({type(result).__name__}); prepare_sync MUST be sync "
f"(define as `def`, not `async def`). The returned "
f"awaitable will not be awaited and is NOT guaranteed "
f"to complete before the node body starts; log "
f"correlation may miss this node's span.",
stacklevel=2,
)
context.queue.put_nowait(_QueuedItem(event=event, observers=observers))
# Per spec §6 Drain (proposal 0010): increment AFTER the put so a
# raise from ``put_nowait`` (queue full on a bounded queue — we
# don't bound, but the invariant holds) doesn't desync the counter.
context.drain_counters.dispatched += 1
async def deliver_loop(
queue: asyncio.Queue[_QueuedItem | None],
counters: _DrainCounters,
) -> None:
"""Background worker: read queued events, deliver to observers serially.
- No two observers receive the same event concurrently (we await
each).
- No observer receives event N+1 until everyone has finished N
(the loop processes one item fully before pulling the next).
- Observers whose ``phases`` set excludes the event's phase do
NOT receive it. Phase filter applies at delivery, not dispatch;
the engine still produces both events for every attempt.
- Observer exceptions don't propagate, don't break siblings,
don't block subsequent events. Reported via ``warnings.warn``.
The loop terminates when it receives ``_DRAIN_SENTINEL`` (None).
"""
while True:
item = await queue.get()
if item is None:
return
for subscribed in item.observers:
if item.event.phase not in subscribed.phases:
continue
try:
await subscribed.observer(item.event)
except Exception as e:
warnings.warn(
f"observer raised {type(e).__name__}: {e}",
stacklevel=1,
)
# Per spec §6 Drain (proposal 0010): increment AFTER the
# observer for-loop completes for this event, so an event
# cancelled mid-for-loop is counted as undelivered
# (``dispatched - delivered`` includes it). The phase-filter
# ``continue`` above does NOT skip the increment — an event
# filtered out for every observer is still considered
# delivered (we did all the work there was to do for it).
counters.delivered += 1
__all__ = [
"ALL_PHASES",
"DrainSummary",
"Observer",
"RemoveHandle",
"SubscribedObserver",
# Engine-internal but listed so pyright sees them as exported (they're
# imported by `compiled.py` and `subgraph.py`). The underscore prefix
# is the user-facing "don't import these" signal.
"_DRAIN_SENTINEL",
"_DrainCounters",
"_FanOutExecutionState",
"_FanOutInstanceState",
"_InvocationContext",
"_QueuedItem",
"_coerce_subscribed",
"_dispatch",
"deliver_loop",
]