Skip to content

Commit fafe911

Browse files
Activate fixture 037 case 5 (resume re-fire) (#102)
* Activate fixture 037 case 5 (resume re-fire) Wires the langfuse conformance harness for the remaining decision-tree case of proposal 0043's §8.4.1 trace.input/output sourcing fixture. The two-phase resume flow (first invoke catches NodeException → resume invoke completes) now runs end-to-end through new harness primitives: - ``flaky: {fail_first_invocation_only: true, on_success: {...}}`` compact test seam in ``_build_node_body``. - ``checkpointer: in_memory`` directive registers ``InMemoryCheckpointer`` on the graph builder. - ``returns_state_snapshot`` added to ``_TRACE_IO_HOOK_REGISTRY``. - ``_run_resume_case`` runs the two-phase flow + asserts both traces + checks the §8.4.1 invariants (distinct trace ids, shared correlation_id, first trace unchanged, hooks re-fire on resumed trace). Activation surfaced two engine bugs that PR #99 missed. The first: ``InvocationCompletedEvent.final_state`` on the failure path defaulted to ``starting_state``, but spec §8.4.1 *Resume semantics* requires the failure-path ``trace.output`` hook to receive "the partial final state captured at the failure point" (the most recent successful step's post-merge state). Adds a new ``latest_state_box`` on ``_InvocationContext`` that the engine writes after every successful step's ``state = step_result.state`` assignment; the outermost ``invoke()`` reads it in the finally-block before falling back to ``starting_state``. The second: ``latest_state_box`` MUST be per-context (unlike its sibling ``final_node_box`` which shares by reference across subgraph descents). An inner-subgraph step's success previously would overwrite the outer box with an inner-typed state; on a subsequent outer-level raise the outer ``trace.output`` hook would receive an inner state when its signature expects the outer state class. Each ``descend_into_*`` method now omits ``latest_state_box`` from the copy, so each level gets a fresh box. Four new unit-test regressions pin the bug fix across all four graph-descent shapes: flat, subgraph, fan-out instance, parallel- branches branch. Each test wires a graph where an outer node succeeds (outer_a_done=true) and a deeper raise propagates back; the ``trace_output_from_state`` hook MUST see the outer-state-typed value with the success captured. Cross-cap parser deferral for 037 stays in place — that parser still doesn't model ``langfuse_trace`` shape. Activation lives in the langfuse-specific harness only. * Tighten fan-out regression + fix CHANGELOG count PR #102 review caught two issues: The fan-out regression test's inner subgraph contained only a raising node, so under the original shared-`latest_state_box` bug no inner step would have successfully written to the box — the test would have passed without exercising the leak it was meant to guard. The inner subgraph now has two nodes: `inner_succeeds` writes `inner_done=true` (so the descent's _invoke writes inner state to the box) followed by `inner_raises`. Confirmed by temp-reverting the descend-omit-`latest_state_box` change and observing the test fail with the typed-state-mismatch assertion. CHANGELOG said "three regression tests" but enumerated four (flat, subgraph, fan-out, parallel-branches). Bumped the count to four.
1 parent bdfb285 commit fafe911

5 files changed

Lines changed: 632 additions & 19 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
2525
- **`observation.metadata.detached: true` moves to the parent-side dispatching observation** (proposal 0042, observability §8.4.2). The Langfuse mapping previously emitted `detached: true` on the dispatch observation inside the detached child trace; the §8.4.2 row added by 0042 places it on the **parent-side** dispatching observation that fires the detached child (the link observation in the main trace for detached subgraphs; the parent fan-out node observation for detached fan-outs). The detached-side observation no longer carries the flag.
2626
- **`LangfuseClient.update_trace` Protocol grows `input` / `output` keyword parameters** so observer-supplied values land on the Trace's headline fields.
2727

28+
### Fixed
29+
30+
- **`InvocationCompletedEvent.final_state` on the failure path now surfaces the partial state at failure point.** Spec §8.4.1 *Resume semantics* requires the failure-path `trace.output` hook to receive "the partial final state captured at the failure point"; the original PR #99 implementation defaulted to `starting_state`, so the hook saw pre-execution state when it should have seen post-execution-up-to-failure state. The engine now tracks the latest post-merge state via a `latest_state_box` on `_InvocationContext`, updated after every successful step and read on the failure path. Success-path behavior unchanged.
31+
- **`latest_state_box` is per-context, not shared across subgraph descents.** Unlike the sibling `final_node_box` (which shares by reference because the spec wants the innermost failing node's name — the real culprit), `latest_state_box` must isolate per level so the outermost Langfuse trace receives outer-state-typed values. Without the isolation, a subgraph-internal step's inner-typed state would leak up to the outer trace.output hook, breaking the hook's typed contract. Each subgraph / fan-out instance / parallel-branches branch gets its own fresh box. Pinned by four regression tests covering flat, subgraph, fan-out, and parallel-branches failure paths.
32+
2833
### Notes
2934

3035
- **Pinned spec version bumped from v0.31.0 to v0.35.0.** Absorbs proposals 0042 (reserved-key extension), 0043 (Langfuse trace.input/output sourcing), and the textual additions in v0.32.0 (Gemini wire-format mapping, 0038, not yet implemented) and v0.33.0 (sessions capability, 0020, not yet implemented).
3136
- `LangfuseSDKAdapter` now applies `trace.input` / `trace.output` to the live Langfuse Trace. Input lands on the first real observation under the trace via `set_trace_io`; output uses a synthetic short-lived `openarmature.trace_io` observation as the carrier. The InMemoryLangfuseClient used by tests applies the fields directly.
32-
- Conformance fixture `observability/conformance/037-langfuse-trace-input-output` activated for the four decision-tree cases (default stub / `disable_state_payload=False` / hooks non-null / hooks null-fallthrough). Case 5 (resume re-fire) is deferred to a follow-up — needs the langfuse harness to grow checkpointer wiring + flaky-node test seam + two-phase multi-trace assertion.
37+
- Conformance fixture `observability/conformance/037-langfuse-trace-input-output` activated for all five cases (default stub / `disable_state_payload=False` / hooks non-null / hooks null-fallthrough / resume re-fire). The langfuse harness grew per-case `checkpointer: in_memory` wiring, a compact `flaky:` test seam, and a two-phase resume-flow assertion path.
3338
- The Langfuse v4 SDK marks `set_current_trace_io` / `Span.set_trace_io` deprecated ("removal in a future major version"). Empirical verification against Langfuse Cloud (v4.7.1, 2026-05-29) confirms it remains the **only** path that populates the Traces list view's headline `Input` / `Output` columns; `propagate_attributes(metadata=...)` does not substitute for it in the current UI. We will revisit when Langfuse publishes a concrete migration guide for v5.
3439

3540
## [0.10.0] — 2026-05-27

src/openarmature/graph/compiled.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,20 +1059,38 @@ async def invoke(
10591059
# box the engine populates as nodes enter; on the failure
10601060
# path that's the inner-most node that raised, on the
10611061
# success path that's the last node before the END-routing
1062-
# edge. ``final_state`` is the engine's returned state on
1063-
# success and ``starting_state`` on the failure path (the
1064-
# engine doesn't expose intermediate state across raises).
1062+
# edge. ``final_state`` precedence: the engine's returned
1063+
# state on success → the most recent successful step's
1064+
# post-merge state on a mid-graph raise (per §8.4.1
1065+
# *Resume semantics* "partial final state captured at the
1066+
# failure point") → ``starting_state`` only when no step
1067+
# ever completed.
10651068
if context.final_node_box:
10661069
final_node = context.final_node_box[0]
10671070
else:
10681071
# Defensive: invocation raised before any node fired
10691072
# (e.g., resume-path validation). Fall back to the
10701073
# declared entry node.
10711074
final_node = self.entry
1075+
# ``latest_state_box`` is typed ``list[Any]`` on
1076+
# _InvocationContext (the context isn't parameterized on
1077+
# StateT), but at the outermost level (where this code
1078+
# runs) it always holds an outer ``StateT`` from a
1079+
# successful step's post-merge state. Cast for type
1080+
# narrowing; the per-context box-isolation pinned by
1081+
# ``test_failure_path_final_state_is_outer_type_*`` keeps
1082+
# this invariant honest.
1083+
event_final_state: StateT
1084+
if final_state is not None:
1085+
event_final_state = final_state
1086+
elif context.latest_state_box:
1087+
event_final_state = cast("StateT", context.latest_state_box[0])
1088+
else:
1089+
event_final_state = starting_state
10721090
_dispatch(
10731091
context,
10741092
InvocationCompletedEvent(
1075-
final_state=final_state if final_state is not None else starting_state,
1093+
final_state=event_final_state,
10761094
status=status,
10771095
final_node=final_node,
10781096
invocation_id=invocation_id,
@@ -1201,6 +1219,15 @@ async def _invoke(
12011219
else:
12021220
step_result = await self._step_function_node(node, current, state, context)
12031221
state = step_result.state
1222+
# Proposal 0043 (post-PR-99 review): surface the most
1223+
# recent successful step's post-merge state so the
1224+
# outermost ``invoke()`` can populate
1225+
# ``InvocationCompletedEvent.final_state`` on the failure
1226+
# path with the partial state, not the bare initial state.
1227+
# Updated AFTER ``state = step_result.state`` so an
1228+
# exception inside the step bypasses this assignment and
1229+
# the previous value (or the empty box) survives.
1230+
context.latest_state_box[:] = [state]
12041231

12051232
# Proposal 0043 (post-PR-99 review): restore the outer
12061233
# ``current`` to the shared box after a successful step.

src/openarmature/graph/observer.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,23 @@ class _InvocationContext:
497497
# descents so the inner-most node's name wins on failure (the
498498
# real culprit, not the wrapper).
499499
final_node_box: list[str] = field(default_factory=list[str])
500+
# Per proposal 0043 (observability §8.4.1 *Resume semantics* +
501+
# "partial final state captured at the failure point" clause).
502+
# Tracks the most recent successful step's post-merge state at THIS
503+
# context level so the outermost ``invoke()`` can populate
504+
# ``InvocationCompletedEvent.final_state`` on the failure path with
505+
# the partial outer state, not the bare ``starting_state``. On the
506+
# success path the box is unused — the engine's return value is the
507+
# canonical ``final_state``. **Distinct from ``final_node_box``**:
508+
# the latest-state box is per-level (each subgraph / fan-out
509+
# instance / parallel-branches branch gets its own fresh box),
510+
# because the OUTER Langfuse trace cares about the outer-graph's
511+
# state type, and an inner state has a different type. The
512+
# ``final_node_box`` shares by reference because the spec wants the
513+
# innermost failing node's name (the real culprit); state has the
514+
# opposite contract — the outermost level's state is what the
515+
# outer trace.output hook receives.
516+
latest_state_box: list[Any] = field(default_factory=list[Any])
500517

501518
def full_observers(self) -> tuple[SubscribedObserver, ...]:
502519
"""Return the ordered observer list to deliver for events from
@@ -545,6 +562,9 @@ def descend_into_subgraph(
545562
drain_counters=self.drain_counters,
546563
state_cls=self.state_cls,
547564
final_node_box=self.final_node_box,
565+
# latest_state_box is INTENTIONALLY NOT propagated — each
566+
# context level tracks its own outer-state-typed latest
567+
# successful step. See the field docstring above.
548568
)
549569

550570
def descend_into_fan_out_instance(
@@ -596,6 +616,9 @@ def descend_into_fan_out_instance(
596616
drain_counters=self.drain_counters,
597617
state_cls=self.state_cls,
598618
final_node_box=self.final_node_box,
619+
# latest_state_box is INTENTIONALLY NOT propagated — each
620+
# context level tracks its own outer-state-typed latest
621+
# successful step. See the field docstring above.
599622
)
600623

601624
def descend_into_parallel_branch(
@@ -650,6 +673,9 @@ def descend_into_parallel_branch(
650673
drain_counters=self.drain_counters,
651674
state_cls=self.state_cls,
652675
final_node_box=self.final_node_box,
676+
# latest_state_box is INTENTIONALLY NOT propagated — each
677+
# context level tracks its own outer-state-typed latest
678+
# successful step. See the field docstring above.
653679
)
654680

655681
def take_step(self) -> int:

0 commit comments

Comments
 (0)