Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

- **`current_fan_out_index()` inside fan-out instance middleware** now returns the executing instance's index (and `current_fan_out_index_chain()` its lineage) instead of `None`. The engine set the fan-out lineage ContextVars per-node, inside the inner subgraph, which left them unset in `instance_middleware` that wraps the subgraph from outside; they are now set around the instance-middleware chain. The documented `instance_middleware` use (`RetryMiddleware`) does not read the index, so no shipped behavior changes. This corrects the value seen by custom instance middleware that reads the index or calls `set_invocation_metadata`.
- **Langfuse per-branch dispatch-span observation** (observability §4.3 / §8.4.2, proposals 0042 / 0044). The Langfuse observer now synthesizes a per-branch Span observation under a `parallel_branches` dispatcher node, so each branch's inner observations nest under their own branch span (a three-level dispatcher / per-branch-span / inner-nodes tree) instead of parenting directly under the dispatcher. The per-branch observation carries the OA-emitted `branch_name` alongside the caller baseline metadata and any per-branch augmentation, and the Generation observation now carries `branch_name` too. The OTel observer already produced this shape (proposal 0044 shipped OTel-only in v0.11.0); this brings the Langfuse mapping into line. Callable branches (proposal 0075) are unchanged.
- **Augmentation no longer lands on a shared-parent fan-out / parallel-branches node** (observability §3.4, proposal 0045). A key set via `set_invocation_metadata` inside a fan-out instance or a parallel-branches branch was incorrectly applied to the shared fan-out / dispatcher NODE span (the fork point) when the augmenting context executed at that node's own namespace, in addition to the per-instance / per-branch dispatch span where it belongs. Both the OTel and Langfuse observers now skip the shared-parent node in that case, matching the behavior already applied to strict-ancestor shared parents. The per-instance / per-branch dispatch spans and the lineage ancestors that carry the augmentation are unaffected.
- **Langfuse fan-out-instance dispatch nested below the top level** (observability §5.4, proposal 0013). The Langfuse observer's per-instance dispatch synthesis and parent resolution are now prefix-general, so a fan-out node sitting inside a serial subgraph wrapper (rather than at the top namespace level) gets its per-instance dispatch observation synthesized and its inner observations parented under it. This matches the OTel observer, which already resolved across every namespace prefix.

## [0.15.0] — 2026-06-22

Expand Down
97 changes: 51 additions & 46 deletions src/openarmature/observability/langfuse/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,12 +669,14 @@ def _handle_metadata_augmentation(self, event: MetadataAugmentationEvent) -> Non
# parent_node_name caches.
for key, observation in inv_state.open_observations.items():
ns, _ai, _fi, _bn = key
if ns == aug_ns:
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
observation.handle.update(metadata=metadata_delta)
continue
if not is_strict_prefix(ns, aug_ns):
if ns != aug_ns and not is_strict_prefix(ns, aug_ns):
continue
# A fan-out / pb NODE is a shared parent and MUST NOT carry an
# instance's / branch's augmentation (proposal 0045 §3.4). This skip
# applies whether the NODE sits strictly above the augmenter OR at
# the augmenter's own namespace: an instance/branch executes AT the
# fan-out/pb node's namespace, so ns == aug_ns also matches the shared
# NODE (its per-instance dispatch is the one updated, separately above).
if ns in inv_state.fan_out_parent_node_name or ns in inv_state.parallel_branches_parent_node_name:
continue
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
Expand Down Expand Up @@ -914,19 +916,24 @@ def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent)
# 3. Leaf node observation at any matching ancestor prefix,
# walked longest-first.
# 4. None — the Trace itself becomes the implicit parent.
# Per proposal 0044: an inner branch node parents under its per-branch
# dispatch observation (longest-first; innermost branch wins).
if event.branch_name is not None:
for prefix_len in range(len(event.namespace) - 1, 0, -1):
prefix = event.namespace[:prefix_len]
# Per proposals 0044 / 0013: an inner node parents under the INNERMOST
# dispatch on its lineage -- a per-branch dispatch
# (parallel_branches_branch_spans, keyed prefix + (branch_name,)) or a
# per-instance fan-out dispatch (fan_out_instance_observations, keyed
# prefix + (str(fan_out_index),)). Walk prefixes longest-first so the
# innermost wins; this resolves arbitrary nesting (fan-out in fan-out,
# parallel-branches in fan-out, ...). Mirrors the OTel
# _resolve_parent_context.
for prefix_len in range(len(event.namespace), 0, -1):
prefix = event.namespace[:prefix_len]
if event.branch_name is not None:
branch_dispatch = inv_state.parallel_branches_branch_spans.get(prefix + (event.branch_name,))
if branch_dispatch is not None:
return branch_dispatch.handle.id
if event.fan_out_index is not None and event.namespace:
instance_key = event.namespace[:1] + (str(event.fan_out_index),)
dispatch = inv_state.fan_out_instance_observations.get(instance_key)
if dispatch is not None:
return dispatch.handle.id
if event.fan_out_index is not None:
dispatch = inv_state.fan_out_instance_observations.get(prefix + (str(event.fan_out_index),))
if dispatch is not None:
return dispatch.handle.id
for prefix_len in range(len(event.namespace) - 1, 0, -1):
prefix = event.namespace[:prefix_len]
sg = inv_state.subgraph_observations.get(prefix)
Expand Down Expand Up @@ -1007,39 +1014,35 @@ def _sync_subgraph_observations(
prefix = namespace[:depth]
if prefix in inv_state.subgraph_observations:
continue
# Non-detached per-instance dispatch for the current
# event's own fan-out instance gets opened below; skip
# the regular subgraph path here so we don't double-open.
# The per-instance dispatch for this event's own instance is opened
# below; skip the regular subgraph path so we don't double-open. Runs
# at ANY depth -- the fan-out may sit inside another fan-out instance
# or a subgraph wrapper -- mirroring the OTel observer.
if (
depth == 1
and event.fan_out_index is not None
event.fan_out_index is not None
and (prefix + (str(event.fan_out_index),)) in inv_state.fan_out_instance_observations
):
continue
# Detached subgraph: the first segment matches a
# configured detached_subgraphs name → mint a fresh
# detached Trace + open the dispatch observation in it.
if depth == 1 and prefix[0] in self.detached_subgraphs:
# Detached subgraph: detached_subgraphs holds bare node names, so
# match on prefix[-1] (the node-name segment) at any depth (at depth 1
# this coincides with prefix[0], so the depth-1 behavior is unchanged).
if prefix[-1] in self.detached_subgraphs:
self._open_detached_subgraph_trace(inv_state, correlation_id, prefix, event)
continue
# Detached fan-out: the fan-out instance gets its own
# Trace per spec §8.5. The fan-out node's Span observation
# in the parent Trace already exists (opened on the
# fan-out node's started event); the detached dispatch
# observation goes into the new Trace.
if depth == 1 and event.fan_out_index is not None and prefix[0] in self.detached_fan_outs:
# Detached fan-out: the fan-out instance gets its own Trace per spec
# §8.5. The fan-out node's Span observation in the parent Trace
# already exists; the detached dispatch goes into the new Trace.
if event.fan_out_index is not None and prefix[-1] in self.detached_fan_outs:
self._open_detached_fan_out_instance_trace(inv_state, correlation_id, prefix, event)
continue
Comment thread
chris-colinsky marked this conversation as resolved.
# Non-detached fan-out: synthesize per-instance dispatch
# observation under the fan-out node observation (proposal
# 0013 v0.10.0). Only triggers when the inner event is
# inside a fan-out instance AND the fan-out node's
# parent_node_name has been cached (i.e., the fan-out
# node's own started event was seen).
# Non-detached fan-out: synthesize the per-instance dispatch
# observation under the fan-out node observation (proposal 0013
# v0.10.0). The fan_out_parent_node_name cache match self-gates to
# the fan-out node's namespace, so this runs at any depth (nested
# fan-out, or a fan-out inside a subgraph wrapper / branch).
if (
depth == 1
and event.fan_out_index is not None
and prefix[0] not in self.detached_fan_outs
event.fan_out_index is not None
and prefix[-1] not in self.detached_fan_outs
and prefix in inv_state.fan_out_parent_node_name
):
self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event)
Expand Down Expand Up @@ -1440,14 +1443,16 @@ def _close_parallel_branches_branch_dispatch_observation(
def _find_node_observation(
self, inv_state: _InvState, prefix: tuple[str, ...]
) -> _OpenObservation | None:
# Find a NODE's own open leaf observation at the given prefix (the
# fan-out or parallel-branches NODE, whose per-instance / per-branch
# dispatches parent under it). Retry middleware wrapping the node bumps
# the attempt_index; this scans for any entry at ``prefix`` with
# ``fan_out_index is None``. Only one such entry is open at a time
# (retry opens and closes within an attempt's lifecycle).
# Find a NODE's own open leaf observation at ``prefix`` (the fan-out or
# parallel-branches NODE, whose per-instance / per-branch dispatches
# parent under it). Scan by namespace only: the NODE may itself carry an
# outer fan_out_index / branch_name when it is nested inside another
# fan-out instance or branch, so filtering on fan_out_index is None would
# miss it. Only one entry per namespace is open at a time (retry opens
# and closes attempts serially), so the scan is unambiguous. Mirrors the
# OTel _find_fan_out_node_span.
for key, observation in inv_state.open_observations.items():
if key[0] == prefix and key[2] is None:
if key[0] == prefix:
return observation
return None

Expand Down
23 changes: 10 additions & 13 deletions src/openarmature/observability/otel/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,20 +1190,17 @@ def _collect_augmentation_targets(
# parent_node_name caches)
for key, open_span in inv_state.open_spans.items():
ns, _ai, _fi, _bn = key
if ns == aug_ns:
# Same context — must have matching chain to be the
# augmenter's own attempt rather than a sibling
# instance's same-named node.
if _span_chain_on_path(open_span, aug_fi_chain, aug_bn_chain):
targets.append(open_span.span)
if ns != aug_ns and not is_strict_prefix(ns, aug_ns):
continue
if not is_strict_prefix(ns, aug_ns):
continue
# Shared-parent check: if this NODE is a fan-out node or
# a parallel-branches node (dispatcher), it's a shared
# parent and MUST NOT be updated regardless of cardinality
# (§3.4 — the structural classification governs, not the
# live sibling count).
# Shared-parent check: a fan-out NODE or parallel-branches NODE
# (dispatcher) is a shared parent and MUST NOT be updated regardless
# of cardinality (§3.4 — the structural classification governs, not
# the live sibling count). This applies whether the NODE is a strict
# ancestor OR sits at the augmenter's own namespace: an
# instance/branch executes AT the fan-out/pb node's namespace, so
# ns == aug_ns matches the shared NODE too (its per-instance dispatch
# span is the one updated, separately above). The chain check below
# still excludes sibling instances' same-named nodes.
if ns in inv_state.fan_out_parent_node_name or ns in inv_state.parallel_branches_parent_node_name:
continue
if _span_chain_on_path(open_span, aug_fi_chain, aug_bn_chain):
Expand Down
Loading