Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
- **Langfuse per-branch dispatch-span observation** (observability §4.3 / §8.4.2, proposals 0042 / 0044). The Langfuse observer now synthesizes a per-branch Span observation under a `parallel_branches` dispatcher node, so each branch's inner observations nest under their own branch span (a three-level dispatcher / per-branch-span / inner-nodes tree) instead of parenting directly under the dispatcher. The per-branch observation carries the OA-emitted `branch_name` alongside the caller baseline metadata and any per-branch augmentation, and the Generation observation now carries `branch_name` too. The OTel observer already produced this shape (proposal 0044 shipped OTel-only in v0.11.0); this brings the Langfuse mapping into line. Callable branches (proposal 0075) are unchanged.
- **Augmentation no longer lands on a shared-parent fan-out / parallel-branches node** (observability §3.4, proposal 0045). A key set via `set_invocation_metadata` inside a fan-out instance or a parallel-branches branch was incorrectly applied to the shared fan-out / dispatcher NODE span (the fork point) when the augmenting context executed at that node's own namespace, in addition to the per-instance / per-branch dispatch span where it belongs. Both the OTel and Langfuse observers now skip the shared-parent node in that case, matching the behavior already applied to strict-ancestor shared parents. The per-instance / per-branch dispatch spans and the lineage ancestors that carry the augmentation are unaffected.
- **Langfuse fan-out-instance dispatch nested below the top level** (observability §5.4, proposal 0013). The Langfuse observer's per-instance dispatch synthesis and parent resolution are now prefix-general, so a fan-out node sitting inside a serial subgraph wrapper (rather than at the top namespace level) gets its per-instance dispatch observation synthesized and its inner observations parented under it. This matches the OTel observer, which already resolved across every namespace prefix.
- **Dispatch spans nested inside an outer fan-out instance no longer collide across outer instances** (observability §5.4 / §3.4, proposals 0013 / 0044 / 0045). A fan-out instance dispatch or a parallel-branches per-branch dispatch sitting inside an outer fan-out instance (a fan-out within a fan-out, or parallel-branches within a fan-out) was keyed by its local namespace only, so the same dispatch in different outer instances shared one key: the second outer instance's inner nodes reparented under the first instance's dispatch, and an inner augmentation reached the wrong outer instance's dispatch. Both the OTel and Langfuse observers now key dispatches by their full enclosing fan-out instance / branch lineage, and resolve a nested node's parent by that lineage too, so each outer instance gets its own correctly-parented dispatch subtree with isolated augmentation. Top-level and serial-nested dispatch behavior is unchanged.
- **Nested fan-out no longer collapses under concurrency** (engine). A fan-out nested inside an outer fan-out instance shared a single per-fan-out tracking entry across all outer instances, because that entry was keyed by namespace plus node name only. With concurrent outer instances the second instance found the first's entry already marked complete and rolled its result forward, so every outer instance returned the first instance's inner result (silently wrong output) and the inner subgraph ran only once. The tracking key now carries the enclosing fan-out instance lineage, so each outer instance gets its own inner fan-out progress and correct per-instance results. Top-level and subgraph- or branch-nested fan-outs are unaffected (their enclosing lineage is empty). Resume of a fan-out nested inside an outer fan-out instance does not yet round-trip per-outer-instance progress, so it re-runs rather than skipping on resume; tracked as a follow-up.

## [0.15.0] — 2026-06-22

Expand Down
52 changes: 45 additions & 7 deletions src/openarmature/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,13 @@ def _find_innermost_fan_out_instance_state(
# fan-out's full key is (namespace_before_fan_out, fan_out_name)
# where namespace_before_fan_out + (fan_out_name,) == prefix.
for split in range(len(prefix), 0, -1):
key = (prefix[: split - 1], prefix[split - 1])
# The fan-out at prefix[:split] registered its tracking entry keyed by
# its ENCLOSING fan-out instance lineage (the non-None fan_out_index chain
# up to its own level, prefix depth split-1). Reconstruct it from the
# current chain so a fan-out nested inside an outer instance routes to the
# right outer instance's entry.
lineage = tuple(i for i in context.fan_out_index_chain[: split - 1] if i is not None)
key = (prefix[: split - 1], prefix[split - 1], lineage)
if key in state_dict:
exec_state = state_dict[key]
idx = context.fan_out_index
Expand All @@ -326,7 +332,7 @@ def _find_innermost_fan_out_instance_state(


def _project_fan_out_progress(
state_dict: Mapping[tuple[tuple[str, ...], str], _FanOutExecutionState],
state_dict: Mapping[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState],
) -> tuple[FanOutProgress, ...]:
"""Project the engine-internal mutable per-fan-out state into the
frozen :class:`FanOutProgress` shape on a saved record.
Expand All @@ -343,7 +349,16 @@ def _project_fan_out_progress(
byte-identically, which matters for backends that hash records.
"""
out: list[FanOutProgress] = []
for (namespace, name), exec_state in sorted(state_dict.items()):
# The key's third element is the enclosing fan-out instance lineage; it is
# NOT projected onto the record (top-level / subgraph-nested fan-outs have an
# empty lineage, and nested-fan-out resume is a tracked limitation). One
# consequence: a fan-out nested inside an outer fan-out instance emits one
# record PER outer instance, all sharing (namespace, fan_out_node_name);
# _restore_fan_out_progress_state is last-wins on that collision (the nested
# fan-out re-runs on resume regardless). Sorting includes the lineage so
# those same-namespace entries still order deterministically (preserving the
# byte-identical-record guarantee above).
for (namespace, name, _lineage), exec_state in sorted(state_dict.items()):
instances = tuple(
FanOutInstanceProgress(
state=inst.state,
Expand All @@ -366,7 +381,7 @@ def _project_fan_out_progress(

def _restore_fan_out_progress_state(
saved: Sequence[FanOutProgress],
) -> dict[tuple[tuple[str, ...], str], _FanOutExecutionState]:
) -> dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState]:
"""Inverse projection of :func:`_project_fan_out_progress`. On resume
the loaded record's frozen ``fan_out_progress`` tuple gets unpacked
into the mutable per-fan-out tracking dict that ``FanOutNode``
Expand All @@ -387,7 +402,7 @@ def _restore_fan_out_progress_state(
the engine's canonical error-record shape, and a heuristic would
misclassify them.
"""
out: dict[tuple[tuple[str, ...], str], _FanOutExecutionState] = {}
out: dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState] = {}
for fp in saved:
instances: list[_FanOutInstanceState] = []
for inst in fp.instances:
Expand All @@ -400,7 +415,23 @@ def _restore_fan_out_progress_state(
completed_inner_positions=list(inst.completed_inner_positions),
)
)
key = (fp.namespace, fp.fan_out_node_name)
# The enclosing fan-out instance lineage defaults to empty: the saved
# record carries no lineage, which is correct for top-level and
# subgraph/branch-nested fan-outs (all empty). A fan-out nested inside an
# outer fan-out instance does not round-trip its per-outer-instance
# progress through the current record format (it would need the lineage
# on the record): its in-memory keys carry the lineage, so the restored
# empty-lineage entry never matches. The consequence only bites when
# resume actually RE-ENTERS the nested fan-out -- i.e. its outer instance
# was in-flight at the save. A completed outer instance rolls forward and
# never re-runs its inner fan-out, so the missing inner entry is moot
# there. When the inner fan-out does re-enter, it re-runs from scratch
# rather than skipping its completed inner instances. (Before the lineage
# fix it would instead skip, rolling forward the collapsed/wrong shared
# entry -- so re-running is the safer of two never-correct behaviors, and
# matches the spec's inner-subgraph re-entry model.) A full fix needs the
# lineage on the record; tracked separately.
key = (fp.namespace, fp.fan_out_node_name, ())
out[key] = _FanOutExecutionState(
fan_out_node_name=fp.fan_out_node_name,
namespace=fp.namespace,
Expand Down Expand Up @@ -1987,7 +2018,14 @@ async def innermost(s: Any) -> Mapping[str, Any]:
# raised, so subsequent saves in this invocation do not carry
# stale fan-out progress and a retry middleware on the fan-out
# node sees a fresh tracked state on the second attempt.
fan_out_progress_key = (context.namespace_prefix, current)
# Match the lineage-aware key FanOutNode.run registers (namespace, node
# name, enclosing fan-out instance lineage) so a nested fan-out's cleanup
# pops its own outer-instance entry, not a sibling's.
fan_out_progress_key = (
context.namespace_prefix,
current,
tuple(i for i in context.fan_out_index_chain if i is not None),
)
try:
try:
try:
Expand Down
13 changes: 12 additions & 1 deletion src/openarmature/graph/fan_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,18 @@ async def run_with_context(
# shared dict. Resume threads a pre-restored entry through
# ``context.fan_out_progress_state``; first-run constructs a
# fresh one with all instances ``not_started``.
key = (context.namespace_prefix, self.name)
#
# The key carries the ENCLOSING fan-out instance lineage (the non-None
# fan_out_index chain), not just the namespace + node name. A fan-out
# nested inside an outer fan-out instance has the same namespace for every
# outer instance, so without the lineage the shared dict collides across
# concurrent outer instances and the second instance rolls forward the
# first's "completed" results (silently wrong results). Subgraph / branch
# nesting and top-level fan-outs contribute no fan-out index, so the
# lineage is empty there -- matching the resume restore (which defaults it
# to empty), so top-level / subgraph-nested resume is unaffected.
enclosing_fan_out_lineage = tuple(i for i in context.fan_out_index_chain if i is not None)
key = (context.namespace_prefix, self.name, enclosing_fan_out_lineage)
exec_state = context.fan_out_progress_state.get(key)
if exec_state is None:
exec_state = _FanOutExecutionState(
Expand Down
7 changes: 5 additions & 2 deletions src/openarmature/graph/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,11 @@ class _InvocationContext:
# of the fan-out so concurrent saves see consistent sibling state.
# ``_maybe_save_checkpoint`` projects this into the frozen
# ``FanOutProgress`` shape on the saved CheckpointRecord.
fan_out_progress_state: dict[tuple[tuple[str, ...], str], _FanOutExecutionState] = field(
default_factory=dict[tuple[tuple[str, ...], str], _FanOutExecutionState]
# Keyed by (namespace, fan_out_node_name, enclosing_fan_out_instance_lineage)
# -- the lineage (non-None outer fan_out_index chain) disambiguates a fan-out
# nested inside an outer fan-out instance across concurrent outer instances.
fan_out_progress_state: dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState] = field(
default_factory=dict[tuple[tuple[str, ...], str, tuple[int, ...]], _FanOutExecutionState]
)
# Per spec §6 Drain (proposal 0010): shared mutable counters that
# the worker reads at drain-cancel time to report undelivered events
Expand Down
Loading