Implement proposal 0043 (trace input/output)#99
Merged
Conversation
Adds the observability §8.4.1 *Trace input/output sourcing* mechanism:
the Langfuse observer populates `trace.input` at invocation entry
and `trace.output` at invocation exit via a three-lever decision
tree (caller hook returning non-null → hook value; raw state when
`disable_state_payload=False` → serialized state; default → minimal
stub `{entry_node, correlation_id}` / `{final_node, status}` where
status is the closed `Literal["completed", "failed"]` enum).
Wires through two new observer event types delivered on the
existing serial-delivery queue: `InvocationStartedEvent` and
`InvocationCompletedEvent`. The engine enqueues both at the
invocation lifecycle's outermost boundaries (entry before any node
fires; exit on both success and failure paths). Mirrors the 0040
pattern used for `MetadataAugmentationEvent`. The `Observer.__call__`
signature widens to a four-variant union; the new `ObserverEvent`
type alias gives observer authors a one-name handle and is
re-exported from `openarmature.graph`.
The OTel observer no-ops on both new events (OTel has no Trace-
level input/output concept). The LangfuseSDKAdapter caches input
and output on `_trace_info`; live-Trace emission via the v4 SDK is
deferred to a follow-up (the InMemoryLangfuseClient used by tests
applies the fields directly so the contract is unit-test-pinned).
Bumps the spec pin from v0.34.0 to v0.35.0. `conformance.toml`
records 0043 as implemented since 0.11.0. Conformance fixture 037
is deferred because cases 3/4/5 need a caller-hook YAML directive
the cross-capability harness doesn't model yet; the five-case
decision tree is pinned by new unit tests at
`tests/unit/test_observability_langfuse.py::test_trace_input_output_*`.
There was a problem hiding this comment.
Pull request overview
Implements proposal 0043 by adding invocation-boundary observer events and using them to populate Langfuse trace-level input/output fields, while updating public observer typing, examples, tests, conformance metadata, and the pinned spec version.
Changes:
- Adds
InvocationStartedEvent/InvocationCompletedEventand theObserverEventunion, delivered through the existing observer queue. - Adds Langfuse trace input/output sourcing knobs and in-memory trace fields.
- Updates tests, examples, docs, changelog, and conformance/spec pin metadata for spec v0.35.0.
Reviewed changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/openarmature/graph/events.py |
Defines invocation boundary event dataclasses. |
src/openarmature/graph/observer.py |
Widens observer event typing and queue delivery docs/types. |
src/openarmature/graph/compiled.py |
Emits invocation started/completed events around graph execution. |
src/openarmature/graph/__init__.py |
Re-exports new event types and ObserverEvent. |
src/openarmature/observability/correlation.py |
Updates active dispatch typing for new event variants. |
src/openarmature/observability/langfuse/observer.py |
Implements trace input/output sourcing in LangfuseObserver. |
src/openarmature/observability/langfuse/client.py |
Adds in-memory trace input/output fields and update support. |
src/openarmature/observability/langfuse/adapter.py |
Extends SDK adapter cache shape for input/output updates. |
src/openarmature/observability/otel/observer.py |
No-ops OTel handling for invocation boundary events. |
src/openarmature/__init__.py |
Bumps spec version constant. |
src/openarmature/AGENTS.md |
Updates bundled docs spec version text. |
pyproject.toml |
Updates OpenArmature spec pin. |
conformance.toml |
Marks proposal 0043 implemented against v0.35.0. |
CHANGELOG.md |
Documents proposal 0043 additions and behavior. |
tests/unit/test_observability_langfuse.py |
Adds unit coverage for trace input/output sourcing. |
tests/unit/test_observer.py |
Updates observer tests to use ObserverEvent. |
tests/unit/test_drain.py |
Updates drain expectations for boundary events. |
tests/unit/test_runtime_errors.py |
Updates runtime error observers to ignore non-node events. |
tests/test_smoke.py |
Updates expected spec version. |
tests/conformance/test_observability_langfuse.py |
Documents fixture 037 deferral. |
tests/conformance/test_fixture_parsing.py |
Defers parsing fixture 037. |
tests/conformance/test_conformance.py |
Updates conformance observer typing. |
tests/conformance/adapter.py |
Updates conformance observer helper typing/filtering. |
examples/00-hello-world/main.py |
Updates example observer typing/filtering. |
examples/03-observer-hooks/main.py |
Updates example observer typing/filtering. |
examples/04-nested-subgraphs/main.py |
Updates example observer typing/filtering. |
examples/05-fan-out-with-retry/main.py |
Updates example observer typing. |
examples/06-parallel-branches/main.py |
Updates example observer typing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Two issues surfaced by PR #99 review: `final_node_box` is shared by reference across subgraph, fan-out, and parallel-branches descents (`descend_into_*` propagates the list). Inner-node writes leak into the outer box on the success path, so the outermost `invoke()` reads the wrong `final_node` when an outer wrapper is the last node before the END-routing edge. For parallel-branches the leaked value depends on which branch finishes last, making `InvocationCompletedEvent.final_node` nondeterministic. Restore the outer `current` to the box after each `_step_*` call returns successfully. The restore is on the success path only — the failure path's raise bypasses it, so the inner-most node that raised stays in the box for the spec §4 attribution. A follow-up race remains for parallel-branches and fan-out failure cases: concurrent inner writes mean the box may end with a successful sibling's inner rather than the failing sibling's. Addressing that requires error-aware tracking the engine doesn't currently expose. Pydantic's `model_dump()` defaults to Python mode and leaves `datetime` / `UUID` / `Decimal` as Python objects. The downstream `json.dumps` truncation path raises `TypeError` on those types, and the observer raise is swallowed by the engine's warnings-only observer-isolation contract, silently leaving `trace.input` / `trace.output` blank under `disable_state_payload=False`. `_state_to_jsonable` now calls `model_dump(mode="json")` so the common non-JSON-native types serialize to their JSON-compatible string forms before reaching the truncation step. Adds a regression test using a State with `datetime`, `UUID`, and `Decimal` fields.
5 tasks
chris-colinsky
added a commit
that referenced
this pull request
May 31, 2026
* Emit trace.input/output via Langfuse SDK adapter PR #99 (proposal 0043) shipped the Langfuse observer's three-lever decision tree but left the SDK adapter's `update_trace(input=..., output=...)` as a no-op — only the InMemoryLangfuseClient applied the values. Production users of `LangfuseSDKAdapter` saw blank `Input` / `Output` columns in the Langfuse Traces list view despite the observer emitting the values. Wire the adapter to apply both via the v4 SDK's `set_trace_io`: - `update_trace(input=...)` caches `pending_input` in `_trace_info`. The next `_start_observation` for that trace pops the cache and calls `obs.set_trace_io(input=cached)` on the just-created observation. Piggybacks on a real span; no extra observations added in the common case. - `update_trace(output=...)` opens a synthetic short-lived `openarmature.trace_io` observation as the carrier for `set_trace_io(output=...)`. By the time the `InvocationCompletedEvent` reaches the observer all real node spans have ended, so a synthetic span is the only path with an active OTel span context. - Edge case: an invocation that fails before any node fires has no real span. The synthetic output observation also applies the cached pending_input, so both fields still land. The Langfuse v4 SDK marks `set_trace_io` deprecated ("removal in a future major version"). Empirical verification against Langfuse Cloud v4.7.1 confirms it remains the only path that surfaces `trace.input` / `trace.output` on the Traces list view headline columns; `propagate_attributes(metadata=...)` writes the values into the metadata bag but the UI does not project them as headline columns from there. Documented in CHANGELOG; will revisit when Langfuse publishes a v5 migration path. Adds two integration tests (`tests/integration/`) gated by `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY`. Both pass against Langfuse Cloud end-to-end (real-obs + synthetic-only paths). * Mark live-Langfuse tests as integration PR #100 review caught a gap: the integration tests gated only on env-var presence are still picked up by `pytest tests/` when a developer has `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY` in scope locally. The default `pyproject.toml` config excludes `@pytest.mark.integration` via `addopts = ["-m", "not integration"]` but not unmarked tests in a separate directory. Add the marker to both tests so they match the existing precedent at `tests/unit/test_observability_langfuse_adapter.py:177` and stay out of the default test run regardless of credential availability.
5 tasks
chris-colinsky
added a commit
that referenced
this pull request
May 31, 2026
* Activate fixture 037 case 5 (resume re-fire)
Wires the langfuse conformance harness for the remaining decision-tree
case of proposal 0043's §8.4.1 trace.input/output sourcing fixture.
The two-phase resume flow (first invoke catches NodeException → resume
invoke completes) now runs end-to-end through new harness primitives:
- ``flaky: {fail_first_invocation_only: true, on_success: {...}}``
compact test seam in ``_build_node_body``.
- ``checkpointer: in_memory`` directive registers
``InMemoryCheckpointer`` on the graph builder.
- ``returns_state_snapshot`` added to ``_TRACE_IO_HOOK_REGISTRY``.
- ``_run_resume_case`` runs the two-phase flow + asserts both traces +
checks the §8.4.1 invariants (distinct trace ids, shared
correlation_id, first trace unchanged, hooks re-fire on resumed
trace).
Activation surfaced two engine bugs that PR #99 missed.
The first: ``InvocationCompletedEvent.final_state`` on the failure
path defaulted to ``starting_state``, but spec §8.4.1 *Resume
semantics* requires the failure-path ``trace.output`` hook to receive
"the partial final state captured at the failure point" (the most
recent successful step's post-merge state). Adds a new
``latest_state_box`` on ``_InvocationContext`` that the engine writes
after every successful step's ``state = step_result.state``
assignment; the outermost ``invoke()`` reads it in the finally-block
before falling back to ``starting_state``.
The second: ``latest_state_box`` MUST be per-context (unlike its
sibling ``final_node_box`` which shares by reference across subgraph
descents). An inner-subgraph step's success previously would
overwrite the outer box with an inner-typed state; on a subsequent
outer-level raise the outer ``trace.output`` hook would receive an
inner state when its signature expects the outer state class. Each
``descend_into_*`` method now omits ``latest_state_box`` from the
copy, so each level gets a fresh box.
Four new unit-test regressions pin the bug fix across all four
graph-descent shapes: flat, subgraph, fan-out instance, parallel-
branches branch. Each test wires a graph where an outer node
succeeds (outer_a_done=true) and a deeper raise propagates back; the
``trace_output_from_state`` hook MUST see the outer-state-typed
value with the success captured.
Cross-cap parser deferral for 037 stays in place — that parser
still doesn't model ``langfuse_trace`` shape. Activation lives in
the langfuse-specific harness only.
* Tighten fan-out regression + fix CHANGELOG count
PR #102 review caught two issues:
The fan-out regression test's inner subgraph contained only a raising
node, so under the original shared-`latest_state_box` bug no inner
step would have successfully written to the box — the test would
have passed without exercising the leak it was meant to guard. The
inner subgraph now has two nodes: `inner_succeeds` writes
`inner_done=true` (so the descent's _invoke writes inner state to the
box) followed by `inner_raises`. Confirmed by temp-reverting the
descend-omit-`latest_state_box` change and observing the test fail
with the typed-state-mismatch assertion.
CHANGELOG said "three regression tests" but enumerated four (flat,
subgraph, fan-out, parallel-branches). Bumped the count to four.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
trace.inputat invocation entry andtrace.outputat invocation exit via a three-lever decision tree.disable_state_payload=False→ serialized state; default → minimal stub{entry_node, correlation_id}/{final_node, status}wherestatusis the closedLiteral[\"completed\", \"failed\"]enum.disable_state_payload: bool = Trueknob,trace_input_from_stateandtrace_output_from_statecaller hooks onLangfuseObserver.InvocationStartedEvent,InvocationCompletedEvent) delivered on the existing serial-delivery queue.Observer.__call__widens to a four-variant union; newObserverEventtype alias re-exported fromopenarmature.graph.conformance.tomlrecords 0043 as implemented since 0.11.0.Second of three PRs landing on main for the v0.11.0 release. PR 9 (proposal 0044, parallel-branches dispatch span) follows once spec accepts.
Notes for reviewers
LangfuseSDKAdaptercachesinput/outputon_trace_info; live-Trace emission via the v4 SDK is deferred to a follow-up (theInMemoryLangfuseClientused by tests applies the fields directly, so the contract is unit-test-pinned and contract-correct).tests/unit/test_observability_langfuse.py::test_trace_input_output_*.NodeEvent | MetadataAugmentationEventtoObserverEvent(mirrors the 0040 sweep in PR Implement 0040 open-span metadata update #96). Event-count assertions intest_drain.pyupdated from 6 to 8 (6 NodeEvents + 2 boundary events).Test plan
uv run pytest tests/ -q— 992 passed, 203 skipped, 0 faileduv run pyright src/openarmature tests examples— 0 errorsuv run ruff check src/ tests/ examples/— cleanuv run python scripts/check_conformance_manifest.py— 40 entries, all consistenttest_trace_input_output_*unit tests covering the four-case decision tree + failure-status pathInvocationStartedEvent→ node events →InvocationCompletedEvent(verified via failure-path test assertingstatus=\"failed\"+ correctfinal_node)