Skip to content

Commit 2509835

Browse files
Add caller-supplied invocation metadata (proposal 0034)
Implements observability §3.4 + §5.6 + §8.4.1/§8.4.2: callers pass ``invoke(metadata={...})`` and the framework propagates the entries to every observability backend. Public surface: - ``compiled.invoke(metadata=...)`` accepts a per-invocation ``dict[str, AttributeValue]`` mapping where values are OTel scalars or homogeneous arrays (str/int/float/bool). - ``openarmature.observability.set_invocation_metadata(**entries)`` augments the in-scope mapping mid-invocation (additive merge; existing keys overwritten). - ``openarmature.observability.current_invocation_metadata()`` reads the in-scope mapping (returns empty MappingProxyType outside an invocation). - Synchronous validation at the API boundary rejects keys under the reserved ``openarmature.*`` / ``gen_ai.*`` prefixes and rejects non-OTel-compatible value types (``ValueError`` before any work begins). Engine internals: - ContextVar lifecycle in ``openarmature.observability.metadata``; engine drives ``_set_invocation_metadata`` / ``_reset_invocation_metadata`` around the outermost ``invoke()`` call. - ``NodeEvent.caller_invocation_metadata`` carries a dispatch-time snapshot (the deliver_loop task's Context is frozen at invoke time, so observers can't re-read the live ContextVar safely). - ``LlmEventPayload.caller_invocation_metadata`` mirrors the pattern for LLM provider events. OTel observer (§5.6): emits each entry as ``openarmature.user.<key>`` on every span — invocation, node, subgraph wrapper, fan-out instance dispatch, LLM provider, detached roots, checkpoint-migrate, checkpoint-save. Cross-cutting attribute family parallel to ``openarmature.correlation_id``. Langfuse observer (§8.4.1 + §8.4.2): merges each entry as a top-level key into ``trace.metadata`` (at trace open) and into every observation's ``metadata`` bag (leaf nodes, subgraph wrappers, fan-out instance dispatches, detached-trace wrappers + link observations, LLM generations). Conformance fixtures (proposal 0035's full set is 026 / 027 / 028 / 029 / 030): - 026 (OTel cross-cutting) and 027 (Langfuse top-level merge) activated. - 028 (boundary rejection) activated via a dedicated ``_run_fixture_028`` runner that attaches both observers and asserts ``ValueError`` + no spans + no Langfuse traces. - 029 (fan-out per-instance) and 030 (parallel-branches per-branch) stay deferred — they need an ``augment_metadata_from_field`` harness primitive that calls ``set_invocation_metadata`` per fan-out instance / per parallel branch. The augmentation surface is already exercised by unit tests; the conformance fixtures un-defer in a follow-up. Adds 25 focused unit tests covering boundary validation, ContextVar lifecycle, augmentation merge / overwrite, reserved-namespace rejection, ``invoke()``-boundary rejection, mid-invocation augmentation persisting to subsequent nodes, ``openarmature.user.*`` emission on every OTel span, and Langfuse top-level merge on trace + every observation. The OTel-side LLM-payload runner was extended to handle multi-node graphs where ``calls_llm`` is on a non-entry node (fixture 026 has a ``prep`` step before the LLM call).
1 parent 020953f commit 2509835

12 files changed

Lines changed: 890 additions & 16 deletions

File tree

src/openarmature/graph/compiled.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@
7878
current_active_observer_span,
7979
current_attempt_index,
8080
)
81+
from openarmature.observability.metadata import (
82+
_reset_invocation_metadata,
83+
_set_invocation_metadata,
84+
current_invocation_metadata,
85+
validate_invocation_metadata,
86+
)
8187

8288
from .edges import END, ConditionalEdge, EndSentinel, StaticEdge
8389
from .errors import (
@@ -767,6 +773,7 @@ async def invoke(
767773
*,
768774
correlation_id: str | None = None,
769775
resume_invocation: str | None = None,
776+
metadata: Mapping[str, Any] | None = None,
770777
) -> StateT:
771778
"""Run the graph from ``initial_state`` to END and return the
772779
final state.
@@ -805,8 +812,33 @@ async def invoke(
805812
own retry logic if transient backend failures should be
806813
reattempted.
807814
815+
**Caller-supplied invocation metadata (proposal 0034).**
816+
817+
- ``metadata`` is an optional mapping of arbitrary
818+
``key → value`` entries the framework propagates to every
819+
observability backend. Values MUST be OTel-attribute-
820+
compatible scalars (``str`` / ``int`` / ``float`` / ``bool``)
821+
or homogeneous arrays of those types. Keys MUST NOT use
822+
the ``openarmature.*`` or ``gen_ai.*`` reserved namespaces.
823+
Validation runs synchronously at the API boundary; rule
824+
violations raise ``ValueError`` BEFORE any work begins.
825+
- Per spec §5.6 the OTel observer emits each entry as an
826+
``openarmature.user.<key>`` cross-cutting span attribute on
827+
every span and OTel log record. The Langfuse observer
828+
merges each entry into ``trace.metadata`` AND every
829+
``observation.metadata`` (top level, sibling to
830+
``correlation_id``).
831+
- Mid-invocation augmentation via
832+
:func:`openarmature.observability.set_invocation_metadata`
833+
merges into the same ContextVar with the same validation
834+
rules; affects spans emitted AFTER the call returns.
835+
808836
Raises one of the runtime error categories on failure.
809837
"""
838+
# Validate caller-supplied metadata at the API boundary so any
839+
# rule violation surfaces synchronously before the worker task
840+
# is created or any node body runs.
841+
validated_metadata = validate_invocation_metadata(metadata)
810842

811843
invocation_scoped = tuple(_coerce_subscribed(o) for o in (observers or ()))
812844
queue: asyncio.Queue[_QueuedItem | None] = asyncio.Queue()
@@ -943,6 +975,7 @@ async def invoke(
943975
# "per-invocation is OUTERMOST invoke" wording).
944976
correlation_token = _set_correlation_id(resolved_correlation_id)
945977
invocation_token = _set_invocation_id(invocation_id)
978+
metadata_token = _set_invocation_metadata(validated_metadata)
946979
worker = asyncio.create_task(deliver_loop(queue, context.drain_counters))
947980
self._active_workers[worker] = context
948981
# Auto-prune: when the worker completes (after the sentinel is
@@ -978,6 +1011,7 @@ async def invoke(
9781011
try:
9791012
return await self._invoke(starting_state, context)
9801013
finally:
1014+
_reset_invocation_metadata(metadata_token)
9811015
_reset_invocation_id(invocation_token)
9821016
_reset_correlation_id(correlation_token)
9831017
# Sentinel terminates the worker after it processes events
@@ -1988,6 +2022,7 @@ def _dispatch_started(
19882022
fan_out_config=fan_out_config,
19892023
branch_name=current_branch_name(),
19902024
subgraph_identities=context.subgraph_identities,
2025+
caller_invocation_metadata=current_invocation_metadata(),
19912026
),
19922027
)
19932028

@@ -2022,6 +2057,7 @@ def _dispatch_completed(
20222057
fan_out_config=fan_out_config,
20232058
branch_name=current_branch_name(),
20242059
subgraph_identities=context.subgraph_identities,
2060+
caller_invocation_metadata=current_invocation_metadata(),
20252061
),
20262062
)
20272063

@@ -2205,5 +2241,6 @@ async def _maybe_save_checkpoint(
22052241
attempt_index=attempt_index,
22062242
fan_out_index=None,
22072243
subgraph_identities=context.subgraph_identities,
2244+
caller_invocation_metadata=current_invocation_metadata(),
22082245
),
22092246
)

src/openarmature/graph/events.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,21 @@
1313
Frozen dataclass; observers receive a snapshot, not a live handle.
1414
"""
1515

16-
from dataclasses import dataclass
16+
from collections.abc import Mapping
17+
from dataclasses import dataclass, field
18+
from types import MappingProxyType
1719
from typing import Any, Literal
1820

21+
from openarmature.observability.metadata import AttributeValue
22+
1923
from .errors import RuntimeGraphError
2024
from .state import State
2125

26+
# Sentinel empty metadata mapping for events constructed without a
27+
# live caller-metadata snapshot (test helpers, synthetic events).
28+
# Read-only proxy keeps the default allocation-free.
29+
_EMPTY_METADATA: MappingProxyType[str, AttributeValue] = MappingProxyType({})
30+
2231

2332
# Spec: realizes observability §5.4 fan-out attributes via the
2433
# event-payload mechanism added by proposal 0013 (v0.10.0). Backend
@@ -205,6 +214,21 @@ class NodeEvent:
205214
# empty string when ``None`` per §5.3's "if the implementation
206215
# tracks one" clause.
207216
subgraph_identities: tuple[str | None, ...] = ()
217+
# Per observability §3.4 + §5.6 (proposal 0034): snapshot of the
218+
# caller-supplied invocation metadata at event-construction
219+
# time. The engine reads ``current_invocation_metadata()`` when
220+
# it constructs the event (in the engine task / node body's
221+
# Context); the observer reads from the snapshot on the event
222+
# rather than re-reading the ContextVar at observer time —
223+
# critical because the observer runs on the engine's
224+
# ``deliver_loop`` task whose Context is frozen at invoke time
225+
# (asyncio.create_task copies the parent Context at task
226+
# creation), so the live ContextVar value in the deliver_loop
227+
# would NOT reflect mid-invocation augmentations made by node
228+
# bodies running in the main engine task. Observers emit each
229+
# entry as ``openarmature.user.<key>`` (OTel, §5.6) /
230+
# ``metadata.<key>`` (Langfuse, §8.4.1+§8.4.2).
231+
caller_invocation_metadata: Mapping[str, AttributeValue] = field(default_factory=lambda: _EMPTY_METADATA)
208232

209233

210234
__all__ = ["FanOutEventConfig", "NodeEvent"]

src/openarmature/llm/providers/openai.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
current_namespace_prefix,
6161
)
6262
from openarmature.observability.llm_event import LlmEventPayload
63+
from openarmature.observability.metadata import current_invocation_metadata
6364

6465
# ``current_prompt_group`` / ``current_prompt_result`` are imported
6566
# lazily inside :meth:`OpenAIProvider.complete` to avoid a module-load
@@ -1264,6 +1265,7 @@ def _make_llm_event(
12641265
response_id=response_id,
12651266
response_model=response_model,
12661267
genai_system=genai_system,
1268+
caller_invocation_metadata=dict(current_invocation_metadata()),
12671269
)
12681270
return NodeEvent(
12691271
node_name="openarmature.llm.complete",

src/openarmature/observability/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@
4141
# ``opentelemetry-sdk`` dependency) along.
4242
from .llm_event import LLM_NAMESPACE, LlmEventPayload
4343

44+
# v0.10.0 (proposal 0034): caller-supplied invocation metadata surface.
45+
# `set_invocation_metadata` is the public augmentation helper users
46+
# call from inside node bodies / middleware / observers;
47+
# `current_invocation_metadata` is the public reader observers and
48+
# capability code consume.
49+
from .metadata import (
50+
current_invocation_metadata,
51+
set_invocation_metadata,
52+
)
53+
4454
__all__ = [
4555
"LLM_NAMESPACE",
4656
"LlmEventPayload",
@@ -50,5 +60,7 @@
5060
"current_dispatch",
5161
"current_fan_out_index",
5262
"current_invocation_id",
63+
"current_invocation_metadata",
5364
"current_namespace_prefix",
65+
"set_invocation_metadata",
5466
]

src/openarmature/observability/langfuse/observer.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import json
2626
import uuid
27+
from collections.abc import Mapping
2728
from dataclasses import dataclass, field
2829
from typing import TYPE_CHECKING, Any, cast
2930

@@ -80,6 +81,28 @@ def _empty_str_frozenset() -> frozenset[str]:
8081
return frozenset()
8182

8283

84+
def _apply_caller_metadata(metadata: dict[str, Any], caller_metadata: Mapping[str, Any]) -> None:
85+
"""Merge caller-supplied invocation metadata into a Trace's or
86+
Observation's metadata bag at top level per observability §8.4.1
87+
+ §8.4.2 (proposal 0034).
88+
89+
Top-level placement is by spec: Langfuse UI filters on
90+
``metadata.<key>`` directly, so caller-supplied entries become
91+
siblings to ``correlation_id`` / ``entry_node`` rather than
92+
nested under a ``user`` sub-object.
93+
94+
Reserved-key collision with §8.4.1 / §8.4.2 keys
95+
(``correlation_id``, ``entry_node``, ``spec_version``,
96+
``namespace``, etc.) is not currently checked here: the spec
97+
permits the rejection to happen at either boundary, and the
98+
``invoke()`` API-boundary validation already rejects
99+
``openarmature.*`` / ``gen_ai.*`` prefixed keys. Per-Langfuse-
100+
backend collision rejection is queued as a follow-up.
101+
"""
102+
for key, value in caller_metadata.items():
103+
metadata[key] = value
104+
105+
83106
def _subgraph_identity_at(event: NodeEvent, depth: int) -> str:
84107
"""Return the compiled-subgraph identity for the wrapper at the
85108
given 1-based namespace depth, or the empty string when no
@@ -366,6 +389,7 @@ def _open_trace(self, invocation_id: str, correlation_id: str | None, event: Nod
366389
}
367390
if correlation_id is not None:
368391
metadata["correlation_id"] = correlation_id
392+
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
369393
# §8.6 trace name: caller-supplied invocation label takes
370394
# precedence; entry-node name is the spec-recommended fallback.
371395
# The caller-supplied path lands in proposal 0034 (PR 4) — for
@@ -533,6 +557,7 @@ def _open_subgraph_observation(
533557
}
534558
if correlation_id is not None:
535559
metadata["correlation_id"] = correlation_id
560+
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
536561
handle = self.client.span(
537562
trace_id=inv_state.trace_id,
538563
name=prefix[-1],
@@ -566,6 +591,7 @@ def _open_fan_out_instance_dispatch_observation(
566591
}
567592
if correlation_id is not None:
568593
metadata["correlation_id"] = correlation_id
594+
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
569595
handle = self.client.span(
570596
trace_id=inv_state.trace_id,
571597
name=prefix[-1],
@@ -622,6 +648,7 @@ def _open_detached_subgraph_trace(
622648
}
623649
if correlation_id is not None:
624650
link_metadata["correlation_id"] = correlation_id
651+
_apply_caller_metadata(link_metadata, event.caller_invocation_metadata)
625652
parent_observation_id: str | None = None
626653
for plen in range(len(prefix) - 1, 0, -1):
627654
outer = prefix[:plen]
@@ -646,6 +673,7 @@ def _open_detached_subgraph_trace(
646673
detached_metadata: dict[str, Any] = {"detached_from_invocation_id": inv_state.trace_id}
647674
if correlation_id is not None:
648675
detached_metadata["correlation_id"] = correlation_id
676+
_apply_caller_metadata(detached_metadata, event.caller_invocation_metadata)
649677
identity = _subgraph_identity_at(event, len(prefix))
650678
# The detached trace's wrapper observation IS the migrated
651679
# SubgraphNode wrapper. Per the resolution in coord thread
@@ -673,6 +701,7 @@ def _open_detached_subgraph_trace(
673701
}
674702
if correlation_id is not None:
675703
dispatch_metadata["correlation_id"] = correlation_id
704+
_apply_caller_metadata(dispatch_metadata, event.caller_invocation_metadata)
676705
handle = self.client.span(
677706
trace_id=detached_trace_id,
678707
name=wrapper_obs_name,
@@ -715,6 +744,7 @@ def _open_detached_fan_out_instance_trace(
715744
}
716745
if correlation_id is not None:
717746
link_metadata["correlation_id"] = correlation_id
747+
_apply_caller_metadata(link_metadata, event.caller_invocation_metadata)
718748
fan_out_open.handle.update(metadata=link_metadata)
719749
# Open the detached Trace + per-instance dispatch observation.
720750
detached_metadata: dict[str, Any] = {
@@ -723,6 +753,7 @@ def _open_detached_fan_out_instance_trace(
723753
}
724754
if correlation_id is not None:
725755
detached_metadata["correlation_id"] = correlation_id
756+
_apply_caller_metadata(detached_metadata, event.caller_invocation_metadata)
726757
self.client.trace(
727758
id=detached_trace_id,
728759
name=prefix[-1],
@@ -736,6 +767,7 @@ def _open_detached_fan_out_instance_trace(
736767
}
737768
if correlation_id is not None:
738769
dispatch_metadata["correlation_id"] = correlation_id
770+
_apply_caller_metadata(dispatch_metadata, event.caller_invocation_metadata)
739771
handle = self.client.span(
740772
trace_id=detached_trace_id,
741773
name=prefix[-1],
@@ -866,6 +898,7 @@ def _observation_metadata(self, event: NodeEvent, correlation_id: str | None) ->
866898
metadata["fan_out_item_count"] = cfg.item_count
867899
metadata["fan_out_concurrency"] = 0 if cfg.concurrency is None else cfg.concurrency
868900
metadata["fan_out_error_policy"] = cfg.error_policy
901+
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
869902
return metadata
870903

871904
# ------------------------------------------------------------------
@@ -1005,6 +1038,7 @@ def _llm_metadata_and_payload(
10051038
active_group = payload.active_prompt_group
10061039
if active_group is not None:
10071040
metadata["prompt_group_name"] = active_group.group_name
1041+
_apply_caller_metadata(metadata, payload.caller_invocation_metadata)
10081042

10091043
model_parameters: dict[str, Any] = {}
10101044
request_params = payload.request_params or {}

src/openarmature/observability/llm_event.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
from typing import Any
3939

40-
from pydantic import BaseModel, ConfigDict
40+
from pydantic import BaseModel, ConfigDict, Field
4141

4242
# Sentinel namespace the LLM provider emits to signal "this is an LLM
4343
# event, not a regular node event." Backend mappings (the OTel observer
@@ -112,6 +112,18 @@ class LlmEventPayload(BaseModel):
112112
response_id: str | None = None
113113
response_model: str | None = None
114114
genai_system: str = "openai"
115+
# Per proposal 0034 / observability §3.4 + §5.6: snapshot of
116+
# caller-supplied invocation metadata captured at LLM-event
117+
# dispatch time (in the calling node's Context). Backend
118+
# observers read from the snapshot rather than re-reading the
119+
# ContextVar at observer time — the OTel + Langfuse observers
120+
# run on the engine's ``deliver_loop`` task whose Context is
121+
# frozen at invoke time, so mid-invocation augmentations made
122+
# by node bodies running in the main engine task are NOT visible
123+
# there. The snapshot pattern mirrors the existing
124+
# ``calling_namespace_prefix`` / ``calling_attempt_index`` /
125+
# ``calling_fan_out_index`` fields.
126+
caller_invocation_metadata: dict[str, Any] = Field(default_factory=dict)
115127

116128

117129
__all__ = ["LLM_NAMESPACE", "LlmEventPayload"]

0 commit comments

Comments
 (0)