Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions examples/00-hello-world/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
END,
CompiledGraph,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
State,
append,
Expand Down Expand Up @@ -194,14 +195,18 @@ def route(state: PipelineState) -> str:
return state.classification.intent


async def trace(event: NodeEvent) -> None:
async def trace(event: NodeEvent | MetadataAugmentationEvent) -> None:
# OpenAIProvider emits NodeEvent-shaped events for LLM-span
# tracking under a sentinel namespace; those have post_state=None.
# ``set_invocation_metadata`` from within a node body emits a
# MetadataAugmentationEvent; this tracer ignores those.
# Filter to events that carry a PipelineState snapshot before
# reading it. The isinstance check both narrows the type for
# reading it. The isinstance checks both narrow the type for
# static checkers (post_state is typed as the base State, not
# PipelineState) and acts as a defensive guard against any
# PipelineState) and act as a defensive guard against any
# foreign-state observer event the engine might dispatch.
if not isinstance(event, NodeEvent):
return
if event.phase == "completed" and event.error is None and isinstance(event.post_state, PipelineState):
print(f"{event.node_name}: sources={event.post_state.sources}")

Expand Down
13 changes: 11 additions & 2 deletions examples/03-observer-hooks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
CompiledGraph,
ExplicitMapping,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
Observer,
State,
Expand Down Expand Up @@ -186,12 +187,18 @@ def build_review_subgraph() -> CompiledGraph[ReviewState]:
# fire on every invocation of the compiled graph until removed.


async def console_tracer(event: NodeEvent) -> None:
async def console_tracer(event: NodeEvent | MetadataAugmentationEvent) -> None:
"""Print one structured line per node boundary to stderr.

Format: `[step=N] namespace.path → fields_changed_in_this_step`
On error, format flips to `... ✗ error_category`.

Mid-invocation ``set_invocation_metadata`` augmentations also
reach observers as ``MetadataAugmentationEvent`` instances; this
tracer ignores them.
"""
if isinstance(event, MetadataAugmentationEvent):
return
namespace = ".".join(event.namespace)
if event.error is not None:
print(
Expand Down Expand Up @@ -232,7 +239,9 @@ def __init__(self) -> None:
self.errors: int = 0
self.namespaces: set[tuple[str, ...]] = set()

async def __call__(self, event: NodeEvent) -> None:
async def __call__(self, event: NodeEvent | MetadataAugmentationEvent) -> None:
if isinstance(event, MetadataAugmentationEvent):
return
self.events += 1
if event.error is not None:
self.errors += 1
Expand Down
5 changes: 4 additions & 1 deletion examples/04-nested-subgraphs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
CompiledGraph,
ExplicitMapping,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
State,
append,
Expand Down Expand Up @@ -349,7 +350,9 @@ def _fmt_state(state: Any) -> str:
return " ".join(parts) if parts else "(empty)"


async def depth_observer(event: NodeEvent) -> None:
async def depth_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
if isinstance(event, MetadataAugmentationEvent):
return
depth = len(event.namespace)
indent = " " * (depth - 1)
ns = " > ".join(event.namespace)
Expand Down
5 changes: 4 additions & 1 deletion examples/05-fan-out-with-retry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
END,
CompiledGraph,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
State,
append,
Expand Down Expand Up @@ -296,7 +297,7 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]:
)


async def fan_out_config_observer(event: NodeEvent) -> None:
async def fan_out_config_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
"""Print the fan-out node's resolved config when its dispatch event
fires.

Expand All @@ -308,6 +309,8 @@ async def fan_out_config_observer(event: NodeEvent) -> None:
``concurrency`` are callable resolvers whose value isn't visible
in code.
"""
if not isinstance(event, NodeEvent):
return
if event.fan_out_config is None:
return
if event.phase != "started":
Expand Down
5 changes: 4 additions & 1 deletion examples/06-parallel-branches/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
BranchSpec,
CompiledGraph,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
State,
append,
Expand Down Expand Up @@ -240,7 +241,7 @@ async def present(s: ArticleState) -> Mapping[str, Any]:
return {"trace": ["present"]}


async def branch_attribution_observer(event: NodeEvent) -> None:
async def branch_attribution_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
"""Print which branch each inner-node event came from.

NodeEvent carries ``branch_name`` on events from nodes that
Expand All @@ -250,6 +251,8 @@ async def branch_attribution_observer(event: NodeEvent) -> None:
observer skips events with no branch attribution and prints
``(branch=…) node_name`` for the rest.
"""
if not isinstance(event, NodeEvent):
return
if event.branch_name is None or event.phase != "started":
return
print(f" [observer] (branch={event.branch_name}) inner node {event.node_name!r} started")
Expand Down
3 changes: 2 additions & 1 deletion src/openarmature/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
StateValidationError,
UnreachableNode,
)
from .events import NodeEvent
from .events import MetadataAugmentationEvent, NodeEvent
from .fan_out import FanOutConfig, FanOutNode
from .middleware import (
Middleware,
Expand Down Expand Up @@ -78,6 +78,7 @@
"GraphBuilder",
"GraphError",
"MappingReferencesUndeclaredField",
"MetadataAugmentationEvent",
"Middleware",
"MultipleOutgoingEdges",
"NextCall",
Expand Down
44 changes: 43 additions & 1 deletion src/openarmature/graph/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,46 @@ class NodeEvent:
caller_invocation_metadata: Mapping[str, AttributeValue] = field(default_factory=lambda: _EMPTY_METADATA)


__all__ = ["FanOutEventConfig", "NodeEvent"]
# Spec: realizes observability §3.4 + graph-engine §6 augmentation
# event mechanism (proposal 0040). Emitted by
# ``set_invocation_metadata`` when called mid-invocation; carries the
# delta + the augmenting context's lineage identity so observers can
# resolve which of their open observations belong to the augmenting
# context's subtree and apply the entries in place.
@dataclass(frozen=True)
class MetadataAugmentationEvent:
"""A metadata-augmentation event delivered to observers.

Emitted by :func:`openarmature.observability.metadata.set_invocation_metadata`
when called mid-invocation. Carries:

- ``entries``: the delta merged into the per-async-context
invocation metadata mapping by the call. Read-only view.
- ``namespace`` / ``attempt_index`` / ``fan_out_index`` /
``branch_name``: the four lineage fields that jointly identify
the augmenting execution context (the calling node's identity
tuple). When ``set_invocation_metadata`` is called from outside
a node body, ``namespace`` is the empty tuple, ``attempt_index``
is ``0``, and both ``fan_out_index`` and ``branch_name`` are
``None`` — the invocation-level identity.

Distinct from :class:`NodeEvent` because there is no node phase,
no pre/post state, and no error: this event reports a side-channel
augmentation, not a node-attempt boundary. Per graph-engine §6 the
event is NOT subject to the observer ``phases`` filter (which only
governs ``NodeEvent`` phases); the delivery worker forwards it to
every subscribed observer. Observers that handle it iterate their
open observations whose lineage is an ancestor of (or equal to)
the augmenting context's lineage and apply the entries as
``openarmature.user.<key>`` (OTel, §5.6) /
``metadata.<key>`` (Langfuse, §8.4.1+§8.4.2).
"""

entries: Mapping[str, AttributeValue]
namespace: tuple[str, ...]
attempt_index: int = 0
fan_out_index: int | None = None
branch_name: str | None = None


__all__ = ["FanOutEventConfig", "MetadataAugmentationEvent", "NodeEvent"]
99 changes: 76 additions & 23 deletions src/openarmature/graph/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@
from dataclasses import dataclass, field
from typing import Any, Literal, Protocol

from .events import NodeEvent
from .events import MetadataAugmentationEvent, NodeEvent

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
openarmature.graph.events
begins an import cycle.
Comment thread
chris-colinsky marked this conversation as resolved.
from .state import State


class Observer(Protocol):
"""The shape of a callable that receives node-boundary events.
"""The shape of a callable that receives observer events.

`Observer` is a structural Protocol; any async callable matching the
signature qualifies, no subclass required. Plain functions, bound
methods, and class instances with `__call__` all work::

async def log_observer(event: NodeEvent) -> None:
print(event.node_name, event.phase)
async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
if isinstance(event, NodeEvent):
print(event.node_name, event.phase)

compiled.attach_observer(log_observer)

Expand All @@ -63,6 +64,27 @@
conformance doesn't pin you to that name; any of `event`, `_event`,
`e`, etc. matches.

Two event variants reach observers (graph-engine §6 + proposal
0040). The signature is the union; observers ``isinstance``-narrow
on the first line and choose which variants they handle.

- :class:`NodeEvent` — the started/completed/checkpoint phase
events. Subject to the ``phases`` filter on
:class:`SubscribedObserver`; observers whose phase set excludes
``event.phase`` do NOT receive it.
- :class:`MetadataAugmentationEvent` — emitted by
:func:`openarmature.observability.metadata.set_invocation_metadata`
when called mid-invocation. Carries the augmenting context's
lineage tuple (``namespace``, ``attempt_index``,
``fan_out_index``, ``branch_name``) so rich backends can update
their open observations in place
(``span.set_attribute(openarmature.user.<key>, v)`` for OTel,
``observation.update(metadata=...)`` for Langfuse). Per spec §6
this variant is NOT subject to the ``phases`` filter — every
subscribed observer sees it and isinstance-narrows to decide
whether to act. Simple user observers typically early-return
after ``isinstance(event, NodeEvent)`` checks.

Optional ``prepare_sync`` extension
-----------------------------------
An observer MAY additionally define a synchronous method::
Expand All @@ -81,9 +103,13 @@
the synchronous prep entirely; observers that do define it run
only for ``"started"``-phase events, with errors warned-not-
propagated (same isolation contract as the async path).
``prepare_sync`` is never invoked for
:class:`MetadataAugmentationEvent` (the synchronous-prep contract
is anchored on the ``started`` phase, which only ``NodeEvent``
carries).
"""

async def __call__(self, event: NodeEvent, /) -> None: ...
async def __call__(self, event: NodeEvent | MetadataAugmentationEvent, /) -> None: ...
Comment thread
chris-colinsky marked this conversation as resolved.


# Per spec v0.6.0 §6: the two valid phase strings. Used as the default
Expand Down Expand Up @@ -200,15 +226,22 @@
receive it. The list is computed at dispatch time so events from
different depths in nested subgraphs carry the correct observer chain
without the worker needing to know the graph topology.

``event`` is the union of ``NodeEvent`` (started / completed /
checkpoint phases) and ``MetadataAugmentationEvent`` (proposal
0040, side-channel augmentation). The delivery worker branches by
type to apply the right delivery contract (phase-filter for
``NodeEvent``, no filter for the augmentation event).
"""

event: NodeEvent
event: NodeEvent | MetadataAugmentationEvent
observers: tuple[SubscribedObserver, ...]


# A sentinel value the engine puts on the queue to signal the worker to
# return after draining the events ahead of it. None is unambiguous —
# observers receive `NodeEvent` instances, never None.
# the queue carries `NodeEvent` and `MetadataAugmentationEvent` instances
# wrapped in `_QueuedItem`, never None.
_DRAIN_SENTINEL = None


Expand Down Expand Up @@ -587,16 +620,29 @@
return n


def _dispatch(context: _InvocationContext, event: NodeEvent) -> None:
"""Enqueue a node event for the delivery worker.

For ``"started"``-phase events, also call any subscribed observer's
optional ``prepare_sync(event)`` synchronously — in the engine task,
BEFORE queueing — so observers that need to publish per-event state
the engine itself reads in the same engine-task scope (e.g., the
OTel observer setting ``current_active_observer_span`` for the
engine to attach into the OTel context) can do so before the node
body runs.
def _dispatch(
context: _InvocationContext,
event: NodeEvent | MetadataAugmentationEvent,
) -> None:
"""Enqueue an event for the delivery worker.

Handles two event variants:

- :class:`NodeEvent`: the started/completed/checkpoint pair model.
For ``"started"``-phase events, also calls any subscribed
observer's optional ``prepare_sync(event)`` synchronously — in
the engine task, BEFORE queueing — so observers that need to
publish per-event state the engine itself reads in the same
engine-task scope (e.g., the OTel observer setting
``current_active_observer_span`` for the engine to attach into
the OTel context) can do so before the node body runs.
- :class:`MetadataAugmentationEvent` (proposal 0040): a side-
channel augmentation event emitted by
``set_invocation_metadata`` mid-invocation. Bypasses the
``prepare_sync`` branch entirely — the sync-prep contract is
anchored on ``"started"``, which only ``NodeEvent`` carries.
Queued onto the same serial worker so observers see it in
strict order with the surrounding node events.

Phase-gated forwarding: ``prepare_sync`` only fires when ``"started"``
is in the subscribed observer's ``phases`` set, mirroring how the
Expand All @@ -616,7 +662,7 @@
observers = context.full_observers()
if not observers:
return
if event.phase == "started":
if isinstance(event, NodeEvent) and event.phase == "started":
for subscribed in observers:
if "started" not in subscribed.phases:
continue
Expand Down Expand Up @@ -686,9 +732,15 @@
each).
- No observer receives event N+1 until everyone has finished N
(the loop processes one item fully before pulling the next).
- Observers whose ``phases`` set excludes the event's phase do
NOT receive it. Phase filter applies at delivery, not dispatch;
the engine still produces both events for every attempt.
- For :class:`NodeEvent`, observers whose ``phases`` set excludes
the event's phase do NOT receive it. Phase filter applies at
delivery, not dispatch; the engine still produces both events
for every attempt.
- For :class:`MetadataAugmentationEvent` (proposal 0040), the
``phases`` filter is bypassed entirely — the event isn't a
node-phase event, so every subscribed observer receives it
regardless of ``phases``. Observers ``isinstance``-narrow on
the first line and choose whether to act.
- Observer exceptions don't propagate, don't break siblings,
don't block subsequent events. Reported via ``warnings.warn``.

Expand All @@ -698,11 +750,12 @@
item = await queue.get()
if item is None:
return
event = item.event
for subscribed in item.observers:
if item.event.phase not in subscribed.phases:
if isinstance(event, NodeEvent) and event.phase not in subscribed.phases:
continue
try:
await subscribed.observer(item.event)
await subscribed.observer(event)
except Exception as e:
warnings.warn(
f"observer raised {type(e).__name__}: {e}",
Expand Down
2 changes: 2 additions & 0 deletions src/openarmature/llm/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from openarmature.graph.events import NodeEvent
from openarmature.observability.correlation import (
current_attempt_index,
current_branch_name,
current_dispatch,
current_fan_out_index,
current_namespace_prefix,
Expand Down Expand Up @@ -1256,6 +1257,7 @@ def _make_llm_event(
calling_namespace_prefix=current_namespace_prefix(),
calling_attempt_index=current_attempt_index(),
calling_fan_out_index=current_fan_out_index(),
calling_branch_name=current_branch_name(),
active_prompt=active_prompt,
active_prompt_group=active_prompt_group,
input_messages=input_messages,
Expand Down
Loading