Skip to content

Commit 028d134

Browse files
Implement 0040 open-span metadata update
Mid-invocation set_invocation_metadata now emits a MetadataAugmentationEvent on the engine's serial observer queue. Both observers match the augmenter's lineage tuple (namespace, attempt_index, fan_out_index, branch_name) against their open observations and apply the entries in place: span.set_attribute on OTel, observation.update(metadata=...) on Langfuse. Outermost-serial also fires trace.update so the augmented keys land at trace.metadata.<key>. Sibling instances and ancestors above the innermost containment are skipped per spec sec 3.4. OTel's open-span _StackKey widened to include branch_name so concurrent same-named inner nodes across sibling parallel-branches branches no longer collide. Same widening on the Langfuse observer plus the LLM event payload's calling_branch_name field. Shared tuple-prefix helpers extracted to observability/lineage.py. Conformance: activate 028's mid-invocation rejection case (helper raises at the call site on a reserved key) and 034 outermost-serial fixture via the Langfuse harness. Fix the pre-existing 005 flake by resetting the OTel global tracer Once primitive before set_tracer_provider. 029 and 030 stay deferred with documented fixture-shape gaps; the augmentation mechanism itself is covered end-to-end by new unit tests in test_observability_otel and test_observability_langfuse. Two coord threads opened for follow-up: nested fan-out lineage scope and parallel-branches per-branch dispatch span shape.
1 parent 254dce1 commit 028d134

25 files changed

Lines changed: 1469 additions & 157 deletions

File tree

examples/00-hello-world/main.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
END,
5050
CompiledGraph,
5151
GraphBuilder,
52+
MetadataAugmentationEvent,
5253
NodeEvent,
5354
State,
5455
append,
@@ -194,14 +195,18 @@ def route(state: PipelineState) -> str:
194195
return state.classification.intent
195196

196197

197-
async def trace(event: NodeEvent) -> None:
198+
async def trace(event: NodeEvent | MetadataAugmentationEvent) -> None:
198199
# OpenAIProvider emits NodeEvent-shaped events for LLM-span
199200
# tracking under a sentinel namespace; those have post_state=None.
201+
# ``set_invocation_metadata`` from within a node body emits a
202+
# MetadataAugmentationEvent; this tracer ignores those.
200203
# Filter to events that carry a PipelineState snapshot before
201-
# reading it. The isinstance check both narrows the type for
204+
# reading it. The isinstance checks both narrow the type for
202205
# static checkers (post_state is typed as the base State, not
203-
# PipelineState) and acts as a defensive guard against any
206+
# PipelineState) and act as a defensive guard against any
204207
# foreign-state observer event the engine might dispatch.
208+
if not isinstance(event, NodeEvent):
209+
return
205210
if event.phase == "completed" and event.error is None and isinstance(event.post_state, PipelineState):
206211
print(f"{event.node_name}: sources={event.post_state.sources}")
207212

examples/03-observer-hooks/main.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
CompiledGraph,
5555
ExplicitMapping,
5656
GraphBuilder,
57+
MetadataAugmentationEvent,
5758
NodeEvent,
5859
Observer,
5960
State,
@@ -186,12 +187,18 @@ def build_review_subgraph() -> CompiledGraph[ReviewState]:
186187
# fire on every invocation of the compiled graph until removed.
187188

188189

189-
async def console_tracer(event: NodeEvent) -> None:
190+
async def console_tracer(event: NodeEvent | MetadataAugmentationEvent) -> None:
190191
"""Print one structured line per node boundary to stderr.
191192
192193
Format: `[step=N] namespace.path → fields_changed_in_this_step`
193194
On error, format flips to `... ✗ error_category`.
195+
196+
Mid-invocation ``set_invocation_metadata`` augmentations also
197+
reach observers as ``MetadataAugmentationEvent`` instances; this
198+
tracer ignores them.
194199
"""
200+
if isinstance(event, MetadataAugmentationEvent):
201+
return
195202
namespace = ".".join(event.namespace)
196203
if event.error is not None:
197204
print(
@@ -232,7 +239,9 @@ def __init__(self) -> None:
232239
self.errors: int = 0
233240
self.namespaces: set[tuple[str, ...]] = set()
234241

235-
async def __call__(self, event: NodeEvent) -> None:
242+
async def __call__(self, event: NodeEvent | MetadataAugmentationEvent) -> None:
243+
if isinstance(event, MetadataAugmentationEvent):
244+
return
236245
self.events += 1
237246
if event.error is not None:
238247
self.errors += 1

examples/04-nested-subgraphs/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
CompiledGraph,
5050
ExplicitMapping,
5151
GraphBuilder,
52+
MetadataAugmentationEvent,
5253
NodeEvent,
5354
State,
5455
append,
@@ -349,7 +350,9 @@ def _fmt_state(state: Any) -> str:
349350
return " ".join(parts) if parts else "(empty)"
350351

351352

352-
async def depth_observer(event: NodeEvent) -> None:
353+
async def depth_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
354+
if isinstance(event, MetadataAugmentationEvent):
355+
return
353356
depth = len(event.namespace)
354357
indent = " " * (depth - 1)
355358
ns = " > ".join(event.namespace)

examples/05-fan-out-with-retry/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
END,
7979
CompiledGraph,
8080
GraphBuilder,
81+
MetadataAugmentationEvent,
8182
NodeEvent,
8283
State,
8384
append,
@@ -296,7 +297,7 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]:
296297
)
297298

298299

299-
async def fan_out_config_observer(event: NodeEvent) -> None:
300+
async def fan_out_config_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
300301
"""Print the fan-out node's resolved config when its dispatch event
301302
fires.
302303
@@ -308,6 +309,8 @@ async def fan_out_config_observer(event: NodeEvent) -> None:
308309
``concurrency`` are callable resolvers whose value isn't visible
309310
in code.
310311
"""
312+
if not isinstance(event, NodeEvent):
313+
return
311314
if event.fan_out_config is None:
312315
return
313316
if event.phase != "started":

examples/06-parallel-branches/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
BranchSpec,
7171
CompiledGraph,
7272
GraphBuilder,
73+
MetadataAugmentationEvent,
7374
NodeEvent,
7475
State,
7576
append,
@@ -240,7 +241,7 @@ async def present(s: ArticleState) -> Mapping[str, Any]:
240241
return {"trace": ["present"]}
241242

242243

243-
async def branch_attribution_observer(event: NodeEvent) -> None:
244+
async def branch_attribution_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
244245
"""Print which branch each inner-node event came from.
245246
246247
NodeEvent carries ``branch_name`` on events from nodes that
@@ -250,6 +251,8 @@ async def branch_attribution_observer(event: NodeEvent) -> None:
250251
observer skips events with no branch attribution and prints
251252
``(branch=…) node_name`` for the rest.
252253
"""
254+
if not isinstance(event, NodeEvent):
255+
return
253256
if event.branch_name is None or event.phase != "started":
254257
return
255258
print(f" [observer] (branch={event.branch_name}) inner node {event.node_name!r} started")

src/openarmature/graph/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
StateValidationError,
3636
UnreachableNode,
3737
)
38-
from .events import NodeEvent
38+
from .events import MetadataAugmentationEvent, NodeEvent
3939
from .fan_out import FanOutConfig, FanOutNode
4040
from .middleware import (
4141
Middleware,
@@ -78,6 +78,7 @@
7878
"GraphBuilder",
7979
"GraphError",
8080
"MappingReferencesUndeclaredField",
81+
"MetadataAugmentationEvent",
8182
"Middleware",
8283
"MultipleOutgoingEdges",
8384
"NextCall",

src/openarmature/graph/events.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,46 @@ class NodeEvent:
231231
caller_invocation_metadata: Mapping[str, AttributeValue] = field(default_factory=lambda: _EMPTY_METADATA)
232232

233233

234-
__all__ = ["FanOutEventConfig", "NodeEvent"]
234+
# Spec: realizes observability §3.4 + graph-engine §6 augmentation
235+
# event mechanism (proposal 0040). Emitted by
236+
# ``set_invocation_metadata`` when called mid-invocation; carries the
237+
# delta + the augmenting context's lineage identity so observers can
238+
# resolve which of their open observations belong to the augmenting
239+
# context's subtree and apply the entries in place.
240+
@dataclass(frozen=True)
241+
class MetadataAugmentationEvent:
242+
"""A metadata-augmentation event delivered to observers.
243+
244+
Emitted by :func:`openarmature.observability.metadata.set_invocation_metadata`
245+
when called mid-invocation. Carries:
246+
247+
- ``entries``: the delta merged into the per-async-context
248+
invocation metadata mapping by the call. Read-only view.
249+
- ``namespace`` / ``attempt_index`` / ``fan_out_index`` /
250+
``branch_name``: the four lineage fields that jointly identify
251+
the augmenting execution context (the calling node's identity
252+
tuple). When ``set_invocation_metadata`` is called from outside
253+
a node body, ``namespace`` is the empty tuple, ``attempt_index``
254+
is ``0``, and both ``fan_out_index`` and ``branch_name`` are
255+
``None`` — the invocation-level identity.
256+
257+
Distinct from :class:`NodeEvent` because there is no node phase,
258+
no pre/post state, and no error: this event reports a side-channel
259+
augmentation, not a node-attempt boundary. Per graph-engine §6 the
260+
event is NOT subject to the observer ``phases`` filter (which only
261+
governs ``NodeEvent`` phases); the delivery worker forwards it to
262+
every subscribed observer. Observers that handle it iterate their
263+
open observations whose lineage is an ancestor of (or equal to)
264+
the augmenting context's lineage and apply the entries as
265+
``openarmature.user.<key>`` (OTel, §5.6) /
266+
``metadata.<key>`` (Langfuse, §8.4.1+§8.4.2).
267+
"""
268+
269+
entries: Mapping[str, AttributeValue]
270+
namespace: tuple[str, ...]
271+
attempt_index: int = 0
272+
fan_out_index: int | None = None
273+
branch_name: str | None = None
274+
275+
276+
__all__ = ["FanOutEventConfig", "MetadataAugmentationEvent", "NodeEvent"]

src/openarmature/graph/observer.py

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,20 @@
3434
from dataclasses import dataclass, field
3535
from typing import Any, Literal, Protocol
3636

37-
from .events import NodeEvent
37+
from .events import MetadataAugmentationEvent, NodeEvent
3838
from .state import State
3939

4040

4141
class Observer(Protocol):
42-
"""The shape of a callable that receives node-boundary events.
42+
"""The shape of a callable that receives observer events.
4343
4444
`Observer` is a structural Protocol; any async callable matching the
4545
signature qualifies, no subclass required. Plain functions, bound
4646
methods, and class instances with `__call__` all work::
4747
48-
async def log_observer(event: NodeEvent) -> None:
49-
print(event.node_name, event.phase)
48+
async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
49+
if isinstance(event, NodeEvent):
50+
print(event.node_name, event.phase)
5051
5152
compiled.attach_observer(log_observer)
5253
@@ -63,6 +64,27 @@ async def log_observer(event: NodeEvent) -> None:
6364
conformance doesn't pin you to that name; any of `event`, `_event`,
6465
`e`, etc. matches.
6566
67+
Two event variants reach observers (graph-engine §6 + proposal
68+
0040). The signature is the union; observers ``isinstance``-narrow
69+
on the first line and choose which variants they handle.
70+
71+
- :class:`NodeEvent` — the started/completed/checkpoint phase
72+
events. Subject to the ``phases`` filter on
73+
:class:`SubscribedObserver`; observers whose phase set excludes
74+
``event.phase`` do NOT receive it.
75+
- :class:`MetadataAugmentationEvent` — emitted by
76+
:func:`openarmature.observability.metadata.set_invocation_metadata`
77+
when called mid-invocation. Carries the augmenting context's
78+
lineage tuple (``namespace``, ``attempt_index``,
79+
``fan_out_index``, ``branch_name``) so rich backends can update
80+
their open observations in place
81+
(``span.set_attribute(openarmature.user.<key>, v)`` for OTel,
82+
``observation.update(metadata=...)`` for Langfuse). Per spec §6
83+
this variant is NOT subject to the ``phases`` filter — every
84+
subscribed observer sees it and isinstance-narrows to decide
85+
whether to act. Simple user observers typically early-return
86+
after ``isinstance(event, NodeEvent)`` checks.
87+
6688
Optional ``prepare_sync`` extension
6789
-----------------------------------
6890
An observer MAY additionally define a synchronous method::
@@ -81,9 +103,13 @@ def prepare_sync(self, event: NodeEvent, /) -> None: ...
81103
the synchronous prep entirely; observers that do define it run
82104
only for ``"started"``-phase events, with errors warned-not-
83105
propagated (same isolation contract as the async path).
106+
``prepare_sync`` is never invoked for
107+
:class:`MetadataAugmentationEvent` (the synchronous-prep contract
108+
is anchored on the ``started`` phase, which only ``NodeEvent``
109+
carries).
84110
"""
85111

86-
async def __call__(self, event: NodeEvent, /) -> None: ...
112+
async def __call__(self, event: NodeEvent | MetadataAugmentationEvent, /) -> None: ...
87113

88114

89115
# Per spec v0.6.0 §6: the two valid phase strings. Used as the default
@@ -200,15 +226,22 @@ class _QueuedItem:
200226
receive it. The list is computed at dispatch time so events from
201227
different depths in nested subgraphs carry the correct observer chain
202228
without the worker needing to know the graph topology.
229+
230+
``event`` is the union of ``NodeEvent`` (started / completed /
231+
checkpoint phases) and ``MetadataAugmentationEvent`` (proposal
232+
0040, side-channel augmentation). The delivery worker branches by
233+
type to apply the right delivery contract (phase-filter for
234+
``NodeEvent``, no filter for the augmentation event).
203235
"""
204236

205-
event: NodeEvent
237+
event: NodeEvent | MetadataAugmentationEvent
206238
observers: tuple[SubscribedObserver, ...]
207239

208240

209241
# A sentinel value the engine puts on the queue to signal the worker to
210242
# return after draining the events ahead of it. None is unambiguous —
211-
# observers receive `NodeEvent` instances, never None.
243+
# the queue carries `NodeEvent` and `MetadataAugmentationEvent` instances
244+
# wrapped in `_QueuedItem`, never None.
212245
_DRAIN_SENTINEL = None
213246

214247

@@ -587,16 +620,29 @@ def take_step(self) -> int:
587620
return n
588621

589622

590-
def _dispatch(context: _InvocationContext, event: NodeEvent) -> None:
591-
"""Enqueue a node event for the delivery worker.
592-
593-
For ``"started"``-phase events, also call any subscribed observer's
594-
optional ``prepare_sync(event)`` synchronously — in the engine task,
595-
BEFORE queueing — so observers that need to publish per-event state
596-
the engine itself reads in the same engine-task scope (e.g., the
597-
OTel observer setting ``current_active_observer_span`` for the
598-
engine to attach into the OTel context) can do so before the node
599-
body runs.
623+
def _dispatch(
624+
context: _InvocationContext,
625+
event: NodeEvent | MetadataAugmentationEvent,
626+
) -> None:
627+
"""Enqueue an event for the delivery worker.
628+
629+
Handles two event variants:
630+
631+
- :class:`NodeEvent`: the started/completed/checkpoint pair model.
632+
For ``"started"``-phase events, also calls any subscribed
633+
observer's optional ``prepare_sync(event)`` synchronously — in
634+
the engine task, BEFORE queueing — so observers that need to
635+
publish per-event state the engine itself reads in the same
636+
engine-task scope (e.g., the OTel observer setting
637+
``current_active_observer_span`` for the engine to attach into
638+
the OTel context) can do so before the node body runs.
639+
- :class:`MetadataAugmentationEvent` (proposal 0040): a side-
640+
channel augmentation event emitted by
641+
``set_invocation_metadata`` mid-invocation. Bypasses the
642+
``prepare_sync`` branch entirely — the sync-prep contract is
643+
anchored on ``"started"``, which only ``NodeEvent`` carries.
644+
Queued onto the same serial worker so observers see it in
645+
strict order with the surrounding node events.
600646
601647
Phase-gated forwarding: ``prepare_sync`` only fires when ``"started"``
602648
is in the subscribed observer's ``phases`` set, mirroring how the
@@ -616,7 +662,7 @@ def _dispatch(context: _InvocationContext, event: NodeEvent) -> None:
616662
observers = context.full_observers()
617663
if not observers:
618664
return
619-
if event.phase == "started":
665+
if isinstance(event, NodeEvent) and event.phase == "started":
620666
for subscribed in observers:
621667
if "started" not in subscribed.phases:
622668
continue
@@ -686,9 +732,15 @@ async def deliver_loop(
686732
each).
687733
- No observer receives event N+1 until everyone has finished N
688734
(the loop processes one item fully before pulling the next).
689-
- Observers whose ``phases`` set excludes the event's phase do
690-
NOT receive it. Phase filter applies at delivery, not dispatch;
691-
the engine still produces both events for every attempt.
735+
- For :class:`NodeEvent`, observers whose ``phases`` set excludes
736+
the event's phase do NOT receive it. Phase filter applies at
737+
delivery, not dispatch; the engine still produces both events
738+
for every attempt.
739+
- For :class:`MetadataAugmentationEvent` (proposal 0040), the
740+
``phases`` filter is bypassed entirely — the event isn't a
741+
node-phase event, so every subscribed observer receives it
742+
regardless of ``phases``. Observers ``isinstance``-narrow on
743+
the first line and choose whether to act.
692744
- Observer exceptions don't propagate, don't break siblings,
693745
don't block subsequent events. Reported via ``warnings.warn``.
694746
@@ -698,11 +750,12 @@ async def deliver_loop(
698750
item = await queue.get()
699751
if item is None:
700752
return
753+
event = item.event
701754
for subscribed in item.observers:
702-
if item.event.phase not in subscribed.phases:
755+
if isinstance(event, NodeEvent) and event.phase not in subscribed.phases:
703756
continue
704757
try:
705-
await subscribed.observer(item.event)
758+
await subscribed.observer(event)
706759
except Exception as e:
707760
warnings.warn(
708761
f"observer raised {type(e).__name__}: {e}",

src/openarmature/llm/providers/openai.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from openarmature.graph.events import NodeEvent
5656
from openarmature.observability.correlation import (
5757
current_attempt_index,
58+
current_branch_name,
5859
current_dispatch,
5960
current_fan_out_index,
6061
current_namespace_prefix,
@@ -1256,6 +1257,7 @@ def _make_llm_event(
12561257
calling_namespace_prefix=current_namespace_prefix(),
12571258
calling_attempt_index=current_attempt_index(),
12581259
calling_fan_out_index=current_fan_out_index(),
1260+
calling_branch_name=current_branch_name(),
12591261
active_prompt=active_prompt,
12601262
active_prompt_group=active_prompt_group,
12611263
input_messages=input_messages,

0 commit comments

Comments
 (0)