Skip to content

Commit 7053244

Browse files
Fix nested fan-out collapse under concurrency
A fan-out nested inside an outer fan-out instance keyed its per-fan-out tracking entry by namespace and node name only, so the shared progress dict collided across concurrent outer instances. The second instance found the first's entry already complete and rolled its result forward, so every outer instance returned the first instance's inner result (silently wrong output) and the inner subgraph ran only once. Carry the enclosing fan-out instance lineage on the tracking key, in the in-memory dict and through the checkpoint projection, lookup, cleanup, and restore. Top-level and subgraph- or branch-nested fan-outs have an empty lineage, so their behavior, including resume, is unchanged. A fan-out nested inside an outer instance re-runs rather than skipping on resume, since the record format carries no lineage; tracked as a follow-up.
1 parent 3d29f00 commit 7053244

6 files changed

Lines changed: 254 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
1212
- **Langfuse per-branch dispatch-span observation** (observability §4.3 / §8.4.2, proposals 0042 / 0044). The Langfuse observer now synthesizes a per-branch Span observation under a `parallel_branches` dispatcher node, so each branch's inner observations nest under their own branch span (a three-level dispatcher / per-branch-span / inner-nodes tree) instead of parenting directly under the dispatcher. The per-branch observation carries the OA-emitted `branch_name` alongside the caller baseline metadata and any per-branch augmentation, and the Generation observation now carries `branch_name` too. The OTel observer already produced this shape (proposal 0044 shipped OTel-only in v0.11.0); this brings the Langfuse mapping into line. Callable branches (proposal 0075) are unchanged.
1313
- **Augmentation no longer lands on a shared-parent fan-out / parallel-branches node** (observability §3.4, proposal 0045). A key set via `set_invocation_metadata` inside a fan-out instance or a parallel-branches branch was incorrectly applied to the shared fan-out / dispatcher NODE span (the fork point) when the augmenting context executed at that node's own namespace, in addition to the per-instance / per-branch dispatch span where it belongs. Both the OTel and Langfuse observers now skip the shared-parent node in that case, matching the behavior already applied to strict-ancestor shared parents. The per-instance / per-branch dispatch spans and the lineage ancestors that carry the augmentation are unaffected.
1414
- **Langfuse fan-out-instance dispatch nested below the top level** (observability §5.4, proposal 0013). The Langfuse observer's per-instance dispatch synthesis and parent resolution are now prefix-general, so a fan-out node sitting inside a serial subgraph wrapper (rather than at the top namespace level) gets its per-instance dispatch observation synthesized and its inner observations parented under it. This matches the OTel observer, which already resolved across every namespace prefix.
15+
- **Nested fan-out no longer collapses under concurrency** (engine). A fan-out nested inside an outer fan-out instance shared a single per-fan-out tracking entry across all outer instances, because that entry was keyed by namespace plus node name only. With concurrent outer instances the second instance found the first's entry already marked complete and rolled its result forward, so every outer instance returned the first instance's inner result (silently wrong output) and the inner subgraph ran only once. The tracking key now carries the enclosing fan-out instance lineage, so each outer instance gets its own inner fan-out progress and correct per-instance results. Top-level and subgraph- or branch-nested fan-outs are unaffected (their enclosing lineage is empty). Resume of a fan-out nested inside an outer fan-out instance does not yet round-trip per-outer-instance progress, so it re-runs rather than skipping on resume; tracked as a follow-up.
1516

1617
## [0.15.0] — 2026-06-22
1718

src/openarmature/graph/compiled.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,13 @@ def _find_innermost_fan_out_instance_state(
316316
# fan-out's full key is (namespace_before_fan_out, fan_out_name)
317317
# where namespace_before_fan_out + (fan_out_name,) == prefix.
318318
for split in range(len(prefix), 0, -1):
319-
key = (prefix[: split - 1], prefix[split - 1])
319+
# The fan-out at prefix[:split] registered its tracking entry keyed by
320+
# its ENCLOSING fan-out instance lineage (the non-None fan_out_index chain
321+
# up to its own level, prefix depth split-1). Reconstruct it from the
322+
# current chain so a fan-out nested inside an outer instance routes to the
323+
# right outer instance's entry.
324+
lineage = tuple(i for i in context.fan_out_index_chain[: split - 1] if i is not None)
325+
key = (prefix[: split - 1], prefix[split - 1], lineage)
320326
if key in state_dict:
321327
exec_state = state_dict[key]
322328
idx = context.fan_out_index
@@ -326,7 +332,7 @@ def _find_innermost_fan_out_instance_state(
326332

327333

328334
def _project_fan_out_progress(
329-
state_dict: Mapping[tuple[tuple[str, ...], str], _FanOutExecutionState],
335+
state_dict: Mapping[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState],
330336
) -> tuple[FanOutProgress, ...]:
331337
"""Project the engine-internal mutable per-fan-out state into the
332338
frozen :class:`FanOutProgress` shape on a saved record.
@@ -343,7 +349,16 @@ def _project_fan_out_progress(
343349
byte-identically, which matters for backends that hash records.
344350
"""
345351
out: list[FanOutProgress] = []
346-
for (namespace, name), exec_state in sorted(state_dict.items()):
352+
# The key's third element is the enclosing fan-out instance lineage; it is
353+
# NOT projected onto the record (top-level / subgraph-nested fan-outs have an
354+
# empty lineage, and nested-fan-out resume is a tracked limitation). One
355+
# consequence: a fan-out nested inside an outer fan-out instance emits one
356+
# record PER outer instance, all sharing (namespace, fan_out_node_name);
357+
# _restore_fan_out_progress_state is last-wins on that collision (the nested
358+
# fan-out re-runs on resume regardless). Sorting includes the lineage so
359+
# those same-namespace entries still order deterministically (preserving the
360+
# byte-identical-record guarantee above).
361+
for (namespace, name, _lineage), exec_state in sorted(state_dict.items()):
347362
instances = tuple(
348363
FanOutInstanceProgress(
349364
state=inst.state,
@@ -366,7 +381,7 @@ def _project_fan_out_progress(
366381

367382
def _restore_fan_out_progress_state(
368383
saved: Sequence[FanOutProgress],
369-
) -> dict[tuple[tuple[str, ...], str], _FanOutExecutionState]:
384+
) -> dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState]:
370385
"""Inverse projection of :func:`_project_fan_out_progress`. On resume
371386
the loaded record's frozen ``fan_out_progress`` tuple gets unpacked
372387
into the mutable per-fan-out tracking dict that ``FanOutNode``
@@ -387,7 +402,7 @@ def _restore_fan_out_progress_state(
387402
the engine's canonical error-record shape, and a heuristic would
388403
misclassify them.
389404
"""
390-
out: dict[tuple[tuple[str, ...], str], _FanOutExecutionState] = {}
405+
out: dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState] = {}
391406
for fp in saved:
392407
instances: list[_FanOutInstanceState] = []
393408
for inst in fp.instances:
@@ -400,7 +415,18 @@ def _restore_fan_out_progress_state(
400415
completed_inner_positions=list(inst.completed_inner_positions),
401416
)
402417
)
403-
key = (fp.namespace, fp.fan_out_node_name)
418+
# The enclosing fan-out instance lineage defaults to empty: the saved
419+
# record carries no lineage, which is correct for top-level and
420+
# subgraph/branch-nested fan-outs (all empty). A fan-out nested inside an
421+
# outer fan-out instance does not round-trip its per-outer-instance
422+
# progress through the current record format (it would need the lineage
423+
# on the record): its in-memory keys carry the lineage, so the restored
424+
# empty-lineage entry never matches and the nested fan-out RE-RUNS on
425+
# resume. (Before the lineage fix it would instead skip, rolling forward
426+
# the collapsed/wrong shared entry -- so re-running is the safer of two
427+
# never-correct behaviors, and matches the spec's inner-subgraph re-entry
428+
# model.) A full fix needs the lineage on the record; tracked separately.
429+
key = (fp.namespace, fp.fan_out_node_name, ())
404430
out[key] = _FanOutExecutionState(
405431
fan_out_node_name=fp.fan_out_node_name,
406432
namespace=fp.namespace,
@@ -1987,7 +2013,14 @@ async def innermost(s: Any) -> Mapping[str, Any]:
19872013
# raised, so subsequent saves in this invocation do not carry
19882014
# stale fan-out progress and a retry middleware on the fan-out
19892015
# node sees a fresh tracked state on the second attempt.
1990-
fan_out_progress_key = (context.namespace_prefix, current)
2016+
# Match the lineage-aware key FanOutNode.run registers (namespace, node
2017+
# name, enclosing fan-out instance lineage) so a nested fan-out's cleanup
2018+
# pops its own outer-instance entry, not a sibling's.
2019+
fan_out_progress_key = (
2020+
context.namespace_prefix,
2021+
current,
2022+
tuple(i for i in context.fan_out_index_chain if i is not None),
2023+
)
19912024
try:
19922025
try:
19932026
try:

src/openarmature/graph/fan_out.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,18 @@ async def run_with_context(
208208
# shared dict. Resume threads a pre-restored entry through
209209
# ``context.fan_out_progress_state``; first-run constructs a
210210
# fresh one with all instances ``not_started``.
211-
key = (context.namespace_prefix, self.name)
211+
#
212+
# The key carries the ENCLOSING fan-out instance lineage (the non-None
213+
# fan_out_index chain), not just the namespace + node name. A fan-out
214+
# nested inside an outer fan-out instance has the same namespace for every
215+
# outer instance, so without the lineage the shared dict collides across
216+
# concurrent outer instances and the second instance rolls forward the
217+
# first's "completed" results (silently wrong results). Subgraph / branch
218+
# nesting and top-level fan-outs contribute no fan-out index, so the
219+
# lineage is empty there -- matching the resume restore (which defaults it
220+
# to empty), so top-level / subgraph-nested resume is unaffected.
221+
enclosing_fan_out_lineage = tuple(i for i in context.fan_out_index_chain if i is not None)
222+
key = (context.namespace_prefix, self.name, enclosing_fan_out_lineage)
212223
exec_state = context.fan_out_progress_state.get(key)
213224
if exec_state is None:
214225
exec_state = _FanOutExecutionState(

src/openarmature/graph/observer.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,11 @@ class _InvocationContext:
525525
# of the fan-out so concurrent saves see consistent sibling state.
526526
# ``_maybe_save_checkpoint`` projects this into the frozen
527527
# ``FanOutProgress`` shape on the saved CheckpointRecord.
528-
fan_out_progress_state: dict[tuple[tuple[str, ...], str], _FanOutExecutionState] = field(
529-
default_factory=dict[tuple[tuple[str, ...], str], _FanOutExecutionState]
528+
# Keyed by (namespace, fan_out_node_name, enclosing_fan_out_instance_lineage)
529+
# -- the lineage (non-None outer fan_out_index chain) disambiguates a fan-out
530+
# nested inside an outer fan-out instance across concurrent outer instances.
531+
fan_out_progress_state: dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState] = field(
532+
default_factory=dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState]
530533
)
531534
# Per spec §6 Drain (proposal 0010): shared mutable counters that
532535
# the worker reads at drain-cancel time to report undelivered events

tests/unit/test_checkpoint.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,118 @@ async def test_nested_fan_out_records_outermost_schema_version() -> None:
745745
)
746746

747747

748+
# ---------------------------------------------------------------------------
749+
# Nested fan-out (a fan-out inside an outer fan-out INSTANCE): the per-fan-out
750+
# tracking key carries the enclosing instance lineage. Verify the save/restore
751+
# round-trip projects the inner fan-out's progress and resume rolls the outer
752+
# instances forward to the correct per-instance results.
753+
# ---------------------------------------------------------------------------
754+
755+
756+
class _RNestLeafState(State):
757+
tag: str = ""
758+
seed: str = ""
759+
out: str = ""
760+
761+
762+
class _RNestMidState(State):
763+
tag: str = ""
764+
seeds: list[str] = Field(default_factory=list[str])
765+
collected: list[str] = Field(default_factory=list[str])
766+
767+
768+
class _RNestOuterState(State):
769+
products: list[str] = Field(default_factory=list[str])
770+
seeds: list[str] = Field(default_factory=list[str])
771+
results: list[Any] = Field(default_factory=list[Any])
772+
773+
774+
async def test_nested_fan_out_in_instance_resume_round_trips() -> None:
775+
"""A fan-out nested inside an outer fan-out instance round-trips through a
776+
checkpoint save/restore: the projection emits the inner fan-out's
777+
per-outer-instance progress, restore rebuilds the lineage-keyed dict, and
778+
resuming from the completed record rolls the outer instances forward to the
779+
correct per-instance results without re-running the inner leaf."""
780+
leaf_calls = 0
781+
782+
async def leaf(state: _RNestLeafState) -> dict[str, str]:
783+
nonlocal leaf_calls
784+
leaf_calls += 1
785+
return {"out": f"{state.tag}-{state.seed}"}
786+
787+
leaf_g = (
788+
GraphBuilder(_RNestLeafState).add_node("ask", leaf).add_edge("ask", END).set_entry("ask").compile()
789+
)
790+
mid_g = (
791+
GraphBuilder(_RNestMidState)
792+
.add_fan_out_node(
793+
"inner_fan",
794+
subgraph=leaf_g,
795+
items_field="seeds",
796+
item_field="seed",
797+
inputs={"tag": "tag"},
798+
collect_field="out",
799+
target_field="collected",
800+
)
801+
.add_edge("inner_fan", END)
802+
.set_entry("inner_fan")
803+
.compile()
804+
)
805+
cp = InMemoryCheckpointer()
806+
captured_ids: list[str] = []
807+
captured_records: list[CheckpointRecord] = []
808+
original_save = cp.save
809+
810+
async def capture_save(invocation_id: str, record: CheckpointRecord) -> None:
811+
captured_ids.append(invocation_id)
812+
captured_records.append(record)
813+
await original_save(invocation_id, record)
814+
815+
cp.save = capture_save # type: ignore[method-assign]
816+
817+
outer_g = (
818+
GraphBuilder(_RNestOuterState)
819+
.add_fan_out_node(
820+
"outer_fan",
821+
subgraph=mid_g,
822+
items_field="products",
823+
item_field="tag",
824+
inputs={"seeds": "seeds"},
825+
collect_field="collected",
826+
target_field="results",
827+
)
828+
.add_edge("outer_fan", END)
829+
.set_entry("outer_fan")
830+
.with_checkpointer(cp)
831+
.compile()
832+
)
833+
correct = [("A-x", "A-y"), ("B-x", "B-y")]
834+
final = await outer_g.invoke(_RNestOuterState(products=["A", "B"], seeds=["x", "y"]))
835+
assert sorted(tuple(sorted(sub)) for sub in final.results) == correct
836+
assert leaf_calls == 4
837+
# The projection emitted the INNER fan-out's progress (a record whose
838+
# fan_out_node_name is the inner node), so the lineage-keyed nested entry
839+
# round-trips through the record format without colliding the projection's
840+
# key destructuring.
841+
assert any(
842+
fp.fan_out_node_name == "inner_fan" for rec in captured_records for fp in rec.fan_out_progress
843+
), "expected the inner fan-out's progress in a saved record"
844+
845+
# Resume from the completed invocation: the outer instances are tracked
846+
# ``completed`` so they roll forward (no inner re-run) to the same results,
847+
# exercising _restore_fan_out_progress_state on a record set that includes
848+
# the nested fan-out's (collapsed-lineage) entries.
849+
resume_id = captured_ids[-1]
850+
leaf_calls_before_resume = leaf_calls
851+
resumed = await outer_g.invoke(
852+
_RNestOuterState(products=["A", "B"], seeds=["x", "y"]), resume_invocation=resume_id
853+
)
854+
assert sorted(tuple(sorted(sub)) for sub in resumed.results) == correct
855+
assert leaf_calls == leaf_calls_before_resume, (
856+
"completed outer instances roll forward on resume; the inner leaf must not re-run"
857+
)
858+
859+
748860
# ---------------------------------------------------------------------------
749861
# Resume re-entry into subgraph: parent_states populated on inner-node saves
750862
# ---------------------------------------------------------------------------

tests/unit/test_fan_out.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,90 @@ async def compute(state: WorkerState) -> Mapping[str, Any]:
273273
assert final.results == [11, 12, 13]
274274

275275

276+
# ---------------------------------------------------------------------------
277+
# nested fan-out (a fan-out inside an outer fan-out instance)
278+
# ---------------------------------------------------------------------------
279+
280+
281+
class _NestedLeafState(State):
282+
tag: str = ""
283+
seed: str = ""
284+
out: str = ""
285+
286+
287+
class _NestedMidState(State):
288+
tag: str = ""
289+
seeds: list[str] = Field(default_factory=list[str])
290+
collected: Annotated[list[str], append] = Field(default_factory=list[str])
291+
292+
293+
class _NestedOuterState(State):
294+
products: list[str] = Field(default_factory=list[str])
295+
seeds: list[str] = Field(default_factory=list[str])
296+
results: Annotated[list[Any], append] = Field(default_factory=list[Any])
297+
298+
299+
async def test_nested_fan_out_distinct_per_outer_instance_under_concurrency() -> None:
300+
"""A fan-out nested inside an outer fan-out instance runs its inner
301+
subgraph once per (outer, inner) pair and returns the right per-outer
302+
result, even with the outer instances running concurrently."""
303+
# Regression: the per-fan-out tracking entry was keyed by (namespace, node
304+
# name) only, so the inner fan-out's entry collided across outer instances.
305+
# With concurrent outer instances the second found the first's entry already
306+
# marked complete and rolled its result forward, so every outer instance
307+
# returned the first's inner result and the inner subgraph ran only once.
308+
leaf_calls = 0
309+
310+
async def leaf(state: _NestedLeafState) -> Mapping[str, Any]:
311+
nonlocal leaf_calls
312+
await asyncio.sleep(0) # yield so the concurrent outer instances interleave
313+
leaf_calls += 1
314+
return {"out": f"{state.tag}-{state.seed}"}
315+
316+
leaf_builder: GraphBuilder[_NestedLeafState] = GraphBuilder(_NestedLeafState)
317+
leaf_builder.set_entry("ask")
318+
leaf_builder.add_node("ask", leaf)
319+
leaf_builder.add_edge("ask", END)
320+
leaf_graph = leaf_builder.compile()
321+
322+
mid_builder: GraphBuilder[_NestedMidState] = GraphBuilder(_NestedMidState)
323+
mid_builder.set_entry("inner_fan")
324+
mid_builder.add_fan_out_node(
325+
"inner_fan",
326+
subgraph=leaf_graph,
327+
items_field="seeds",
328+
item_field="seed",
329+
inputs={"tag": "tag"},
330+
collect_field="out",
331+
target_field="collected",
332+
)
333+
mid_builder.add_edge("inner_fan", END)
334+
mid_graph = mid_builder.compile()
335+
336+
outer_builder: GraphBuilder[_NestedOuterState] = GraphBuilder(_NestedOuterState)
337+
outer_builder.set_entry("outer_fan")
338+
outer_builder.add_fan_out_node(
339+
"outer_fan",
340+
subgraph=mid_graph,
341+
items_field="products",
342+
item_field="tag",
343+
inputs={"seeds": "seeds"},
344+
collect_field="collected",
345+
target_field="results",
346+
)
347+
outer_builder.add_edge("outer_fan", END)
348+
outer_graph = outer_builder.compile()
349+
350+
final = await outer_graph.invoke(_NestedOuterState(products=["A", "B"], seeds=["x", "y"]))
351+
await outer_graph.drain()
352+
# Each outer instance collected its OWN inner results; the collapse bug gave
353+
# [["A-x", "A-y"], ["A-x", "A-y"]] (the second outer reused the first's).
354+
got = sorted(tuple(sorted(sub)) for sub in final.results)
355+
assert got == [("A-x", "A-y"), ("B-x", "B-y")]
356+
# The inner leaf ran once per (outer, inner) pair, not once total.
357+
assert leaf_calls == 4
358+
359+
276360
# ---------------------------------------------------------------------------
277361
# concurrency
278362
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)