Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,29 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

## [Unreleased]

### Added

- **`LangfuseObserver` Trace input/output sourcing** (proposal 0043, observability §8.4.1). New observer construction knobs populate `trace.input` and `trace.output` per the three-lever decision tree:
- **`disable_state_payload: bool = True`** — privacy knob symmetric to `disable_llm_payload`. When ON (default), Trace fields receive the minimal stub `{entry_node, correlation_id}` / `{final_node, status}`; when OFF, the raw state object is serialized.
- **`trace_input_from_state` / `trace_output_from_state`** — optional caller hooks returning the domain-shaped value to use for `trace.input` / `trace.output`. Returning `None` falls through to the next applicable lever.
- `status` is the closed `Literal["completed", "failed"]` enum from spec §8.4.1.
- **Two new observer event types** delivered through the existing `graph.observer.Observer` queue:
- **`InvocationStartedEvent(initial_state, invocation_id, correlation_id, entry_node)`** — emitted once at invocation entry before any node fires.
- **`InvocationCompletedEvent(final_state, status, final_node, invocation_id, correlation_id)`** — emitted once at invocation exit on both the success path (`status="completed"`) and failure path (`status="failed"`).

The `Observer.__call__` signature widens to `NodeEvent | MetadataAugmentationEvent | InvocationStartedEvent | InvocationCompletedEvent`. The new `ObserverEvent` type alias (re-exported from `openarmature.graph`) gives observer authors a one-name handle on the union; existing observers that ignore non-`NodeEvent` variants early-return after an `isinstance(event, NodeEvent)` check.
- **`LangfuseTrace.input` / `LangfuseTrace.output` dataclass fields** on the in-memory recorder, populated by the new observer paths.

### Changed

- **Reserved-key extension** (proposal 0042, observability §3.4). Three additional bare key names — `branch_name`, `detached`, `detached_from_invocation_id` — are reserved against caller-supplied `invocation_metadata` and `set_invocation_metadata` collision; the framework rejects them at the `invoke()` boundary and at the mid-invocation augmentation helper with `ValueError`. The reserved-name set grows from 21 to 24. These three are top-level Langfuse metadata keys the observer mapping already writes; without reservation a caller key matching one would silently shadow the OA-emitted field.
- **`observation.metadata.detached: true` moves to the parent-side dispatching observation** (proposal 0042, observability §8.4.2). The Langfuse mapping previously emitted `detached: true` on the dispatch observation inside the detached child trace; the §8.4.2 row added by 0042 places it on the **parent-side** dispatching observation that fires the detached child (the link observation in the main trace for detached subgraphs; the parent fan-out node observation for detached fan-outs). The detached-side observation no longer carries the flag.
- **`LangfuseClient.update_trace` Protocol grows `input` / `output` keyword parameters** so observer-supplied values land on the Trace's headline fields.

### Notes

- **Pinned spec version bumped from v0.31.0 to v0.34.0.** Absorbs proposals 0042 (reserved-key extension; observation.metadata.detached + branch_name + trace.metadata.detached_from_invocation_id rows), 0038 (Google Gemini wire-format mapping — not yet implemented in python), and 0020 (sessions capability — not yet implemented in python).
- **Pinned spec version bumped from v0.31.0 to v0.35.0.** Absorbs proposals 0042 (reserved-key extension), 0043 (Langfuse trace.input/output sourcing), and the textual additions in v0.32.0 (Gemini wire-format mapping, 0038, not yet implemented) and v0.33.0 (sessions capability, 0020, not yet implemented).
- The SDK adapter caches `input` / `output` in its `_trace_info` map; landing the values on the live Langfuse Trace from outside an active span context requires SDK-version-specific calls (v4's `langfuse.update_current_trace` works inside a context; cross-context REST updates need `client.api.trace.update`). The `InMemoryLangfuseClient` used by tests applies the fields directly. SDK-adapter end-to-end emit lands in a follow-up.

## [0.10.0] — 2026-05-27

Expand Down
7 changes: 6 additions & 1 deletion conformance.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

[manifest]
implementation = "openarmature-python"
spec_pin = "v0.34.0"
spec_pin = "v0.35.0"

# Status values:
# implemented — shipped behavior matches the proposal's contract
Expand Down Expand Up @@ -205,3 +205,8 @@ status = "not-yet"
[proposals."0042"]
status = "implemented"
since = "0.11.0"

# Spec v0.35.0 (proposal 0043).
[proposals."0043"]
status = "implemented"
since = "0.11.0"
4 changes: 2 additions & 2 deletions examples/00-hello-world/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
END,
CompiledGraph,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
ObserverEvent,
State,
append,
merge,
Expand Down Expand Up @@ -195,7 +195,7 @@ def route(state: PipelineState) -> str:
return state.classification.intent


async def trace(event: NodeEvent | MetadataAugmentationEvent) -> None:
async def trace(event: ObserverEvent) -> None:
# OpenAIProvider emits NodeEvent-shaped events for LLM-span
# tracking under a sentinel namespace; those have post_state=None.
# ``set_invocation_metadata`` from within a node body emits a
Expand Down
10 changes: 5 additions & 5 deletions examples/03-observer-hooks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
CompiledGraph,
ExplicitMapping,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
Observer,
ObserverEvent,
State,
append,
)
Expand Down Expand Up @@ -187,7 +187,7 @@ def build_review_subgraph() -> CompiledGraph[ReviewState]:
# fire on every invocation of the compiled graph until removed.


async def console_tracer(event: NodeEvent | MetadataAugmentationEvent) -> None:
async def console_tracer(event: ObserverEvent) -> None:
"""Print one structured line per node boundary to stderr.

Format: `[step=N] namespace.path → fields_changed_in_this_step`
Expand All @@ -197,7 +197,7 @@ async def console_tracer(event: NodeEvent | MetadataAugmentationEvent) -> None:
reach observers as ``MetadataAugmentationEvent`` instances; this
tracer ignores them.
"""
if isinstance(event, MetadataAugmentationEvent):
if not isinstance(event, NodeEvent):
return
namespace = ".".join(event.namespace)
if event.error is not None:
Expand Down Expand Up @@ -239,8 +239,8 @@ def __init__(self) -> None:
self.errors: int = 0
self.namespaces: set[tuple[str, ...]] = set()

async def __call__(self, event: NodeEvent | MetadataAugmentationEvent) -> None:
if isinstance(event, MetadataAugmentationEvent):
async def __call__(self, event: ObserverEvent) -> None:
if not isinstance(event, NodeEvent):
return
self.events += 1
if event.error is not None:
Expand Down
6 changes: 3 additions & 3 deletions examples/04-nested-subgraphs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
CompiledGraph,
ExplicitMapping,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
ObserverEvent,
State,
append,
)
Expand Down Expand Up @@ -350,8 +350,8 @@ def _fmt_state(state: Any) -> str:
return " ".join(parts) if parts else "(empty)"


async def depth_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
if isinstance(event, MetadataAugmentationEvent):
async def depth_observer(event: ObserverEvent) -> None:
if not isinstance(event, NodeEvent):
return
depth = len(event.namespace)
indent = " " * (depth - 1)
Expand Down
4 changes: 2 additions & 2 deletions examples/05-fan-out-with-retry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@
END,
CompiledGraph,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
ObserverEvent,
State,
append,
)
Expand Down Expand Up @@ -297,7 +297,7 @@ def build_graph(error_policy: str = "fail_fast") -> CompiledGraph[BatchState]:
)


async def fan_out_config_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
async def fan_out_config_observer(event: ObserverEvent) -> None:
"""Print the fan-out node's resolved config when its dispatch event
fires.

Expand Down
4 changes: 2 additions & 2 deletions examples/06-parallel-branches/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@
BranchSpec,
CompiledGraph,
GraphBuilder,
MetadataAugmentationEvent,
NodeEvent,
ObserverEvent,
State,
append,
)
Expand Down Expand Up @@ -241,7 +241,7 @@ async def present(s: ArticleState) -> Mapping[str, Any]:
return {"trace": ["present"]}


async def branch_attribution_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
async def branch_attribution_observer(event: ObserverEvent) -> None:
"""Print which branch each inner-node event came from.

NodeEvent carries ``branch_name`` on events from nodes that
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec"
openarmature = "openarmature.cli:main"

[tool.openarmature]
spec_version = "0.34.0"
spec_version = "0.35.0"

[dependency-groups]
dev = [
Expand Down
4 changes: 2 additions & 2 deletions src/openarmature/AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# OpenArmature — Agent documentation

*This is the agent guide bundled with the openarmature Python package, version 0.10.0 (spec v0.34.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.*
*This is the agent guide bundled with the openarmature Python package, version 0.10.0 (spec v0.35.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.*

## TL;DR

Expand All @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents

## Capability contracts

_Sourced from openarmature-spec v0.34.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md`. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._
_Sourced from openarmature-spec v0.35.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md`. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._

### Capability: `graph-engine`

Expand Down
2 changes: 1 addition & 1 deletion src/openarmature/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
"""

__version__ = "0.10.0"
__spec_version__ = "0.34.0"
__spec_version__ = "0.35.0"
12 changes: 10 additions & 2 deletions src/openarmature/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
StateValidationError,
UnreachableNode,
)
from .events import MetadataAugmentationEvent, NodeEvent
from .events import (
InvocationCompletedEvent,
InvocationStartedEvent,
MetadataAugmentationEvent,
NodeEvent,
)
from .fan_out import FanOutConfig, FanOutNode
from .middleware import (
Middleware,
Expand All @@ -48,7 +53,7 @@
exponential_jitter_backoff,
)
from .nodes import FunctionNode, Node
from .observer import DrainSummary, Observer, RemoveHandle, SubscribedObserver
from .observer import DrainSummary, Observer, ObserverEvent, RemoveHandle, SubscribedObserver
from .parallel_branches import BranchSpec, ParallelBranchesNode
from .projection import ExplicitMapping, FieldNameMatching, ProjectionStrategy
from .reducers import Reducer, append, concat_flatten, last_write_wins, merge, merge_all
Expand Down Expand Up @@ -77,6 +82,8 @@
"FunctionNode",
"GraphBuilder",
"GraphError",
"InvocationCompletedEvent",
"InvocationStartedEvent",
"MappingReferencesUndeclaredField",
"MetadataAugmentationEvent",
"Middleware",
Expand All @@ -87,6 +94,7 @@
"NodeException",
"NoDeclaredEntry",
"Observer",
"ObserverEvent",
"ParallelBranchesBranchFailed",
"ParallelBranchesNoBranches",
"ParallelBranchesNode",
Expand Down
84 changes: 81 additions & 3 deletions src/openarmature/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from collections.abc import Callable, Iterable, Mapping, Sequence
from dataclasses import dataclass, field
from dataclasses import replace as dataclass_replace
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, Literal, cast

if TYPE_CHECKING:
# ``FanOutNode`` lives in ``.fan_out`` which has a TYPE_CHECKING
Expand Down Expand Up @@ -95,7 +95,12 @@
RuntimeGraphError,
StateValidationError,
)
from .events import FanOutEventConfig, NodeEvent
from .events import (
FanOutEventConfig,
InvocationCompletedEvent,
InvocationStartedEvent,
NodeEvent,
)
from .middleware import ChainCall, Middleware, compose_chain
from .nodes import Node
from .observer import (
Expand Down Expand Up @@ -1023,9 +1028,57 @@ async def invoke(
caller_invocation_metadata=current_invocation_metadata(),
),
)
# Proposal 0043: invocation-boundary event for trace.input
# sourcing. Carries the engine-constructed initial_state plus
# the §3 / §5.1 ids and the outermost-graph entry node name
# so Trace-level observers (Langfuse) can populate
# ``trace.input`` via the §8.4.1 three-lever decision tree.
# Dispatched AFTER the checkpoint-migrated event (when there
# is one) so the migration span is observable before the
# invocation-input event.
_dispatch(
context,
InvocationStartedEvent(
initial_state=starting_state,
invocation_id=invocation_id,
correlation_id=resolved_correlation_id,
entry_node=self.entry,
),
)
final_state: StateT | None = None
status: Literal["completed", "failed"] = "failed"
try:
return await self._invoke(starting_state, context)
final_state = await self._invoke(starting_state, context)
status = "completed"
return final_state
finally:
# Proposal 0043: invocation-boundary event for trace.output
# sourcing. Fires on both the success path
# (status="completed") and the failure path
# (status="failed"). ``final_node`` comes from the shared
# box the engine populates as nodes enter; on the failure
# path that's the inner-most node that raised, on the
# success path that's the last node before the END-routing
# edge. ``final_state`` is the engine's returned state on
# success and ``starting_state`` on the failure path (the
# engine doesn't expose intermediate state across raises).
if context.final_node_box:
final_node = context.final_node_box[0]
else:
# Defensive: invocation raised before any node fired
# (e.g., resume-path validation). Fall back to the
# declared entry node.
final_node = self.entry
_dispatch(
context,
InvocationCompletedEvent(
final_state=final_state if final_state is not None else starting_state,
status=status,
final_node=final_node,
invocation_id=invocation_id,
correlation_id=resolved_correlation_id,
),
)
_reset_invocation_metadata(metadata_token)
_reset_invocation_id(invocation_token)
_reset_correlation_id(correlation_token)
Expand Down Expand Up @@ -1098,6 +1151,17 @@ async def _invoke(
from .fan_out import FanOutNode # noqa: PLC0415
from .parallel_branches import ParallelBranchesNode # noqa: PLC0415

# Proposal 0043: track the most recent node about to run
# so the outermost ``invoke()`` can populate
# ``InvocationCompletedEvent.final_node`` on both the
# END-reached success path (last node before the
# END-routing edge) and the failure path (the node that
# raised). Subgraph descents reuse the same shared box
# via ``descend_into_subgraph``, so a failure deep in a
# subgraph leaves the innermost node's name in the box —
# the actual culprit, not the wrapper.
context.final_node_box[:] = [current]
Comment thread
chris-colinsky marked this conversation as resolved.

if isinstance(node, FanOutNode):
# Fan-out nodes are recognized as a distinct node type
# per pipeline-utilities §9. Dispatched through
Expand Down Expand Up @@ -1138,6 +1202,20 @@ async def _invoke(
step_result = await self._step_function_node(node, current, state, context)
state = step_result.state

# Proposal 0043 (post-PR-99 review): restore the outer
# ``current`` to the shared box after a successful step.
# Descended `_step_*` calls (subgraph, fan-out, parallel-
# branches) write inner-node names into the box; without
# this restore, the wrapper's name leaks out of the box
# when the wrapper is the last node before the END-routing
# edge — and for parallel-branches the box would end with
# whichever branch's inner finished last (nondeterministic).
# On the failure path, the raise above bypasses this line,
# so the inner-most node that raised stays in the box as
# the failure-path ``final_node`` (matching spec §4
# attribution).
context.final_node_box[:] = [current]

# Per spec graph-engine §3 step 3 (revised in proposal
# 0012 / v0.9.0): the engine MUST dispatch the
# ``completed`` event AFTER edge evaluation completes.
Expand Down
Loading