Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

- **`current_fan_out_index()` inside fan-out instance middleware** now returns the executing instance's index (and `current_fan_out_index_chain()` its lineage) instead of `None`. The engine set the fan-out lineage ContextVars per-node, inside the inner subgraph, which left them unset in `instance_middleware` that wraps the subgraph from outside; they are now set around the instance-middleware chain. The documented `instance_middleware` use (`RetryMiddleware`) does not read the index, so no shipped behavior changes. This corrects the value seen by custom instance middleware that reads the index or calls `set_invocation_metadata`.
- **Langfuse per-branch dispatch-span observation** (observability §4.3 / §8.4.2, proposals 0042 / 0044). The Langfuse observer now synthesizes a per-branch Span observation under a `parallel_branches` dispatcher node, so each branch's inner observations nest under their own branch span (a three-level dispatcher / per-branch-span / inner-nodes tree) instead of parenting directly under the dispatcher. The per-branch observation carries the OA-emitted `branch_name` alongside the caller baseline metadata and any per-branch augmentation, and the Generation observation now carries `branch_name` too. The OTel observer already produced this shape (proposal 0044 shipped OTel-only in v0.11.0); this brings the Langfuse mapping into line. Callable branches (proposal 0075) are unchanged.
- **Augmentation no longer lands on a shared-parent fan-out / parallel-branches node** (observability §3.4, proposal 0045). A key set via `set_invocation_metadata` inside a fan-out instance or a parallel-branches branch was incorrectly applied to the shared fan-out / dispatcher NODE span (the fork point) when the augmenting context executed at that node's own namespace, in addition to the per-instance / per-branch dispatch span where it belongs. Both the OTel and Langfuse observers now skip the shared-parent node in that case, matching the behavior already applied to strict-ancestor shared parents. The per-instance / per-branch dispatch spans and the lineage ancestors that carry the augmentation are unaffected.
- **Langfuse fan-out-instance dispatch nested below the top level** (observability §5.4, proposal 0013). The Langfuse observer's per-instance dispatch synthesis and parent resolution are now prefix-general, so a fan-out node sitting inside a serial subgraph wrapper (rather than at the top namespace level) gets its per-instance dispatch observation synthesized and its inner observations parented under it. This matches the OTel observer, which already resolved across every namespace prefix.

## [0.15.0] — 2026-06-22

Expand Down
96 changes: 52 additions & 44 deletions src/openarmature/observability/langfuse/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,12 +669,14 @@ def _handle_metadata_augmentation(self, event: MetadataAugmentationEvent) -> Non
# parent_node_name caches.
for key, observation in inv_state.open_observations.items():
ns, _ai, _fi, _bn = key
if ns == aug_ns:
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
observation.handle.update(metadata=metadata_delta)
continue
if not is_strict_prefix(ns, aug_ns):
if ns != aug_ns and not is_strict_prefix(ns, aug_ns):
continue
# A fan-out / pb NODE is a shared parent and MUST NOT carry an
# instance's / branch's augmentation (proposal 0045 §3.4). This skip
# applies whether the NODE sits strictly above the augmenter OR at
# the augmenter's own namespace: an instance/branch executes AT the
# fan-out/pb node's namespace, so ns == aug_ns also matches the shared
# NODE (its per-instance dispatch is the one updated, separately above).
if ns in inv_state.fan_out_parent_node_name or ns in inv_state.parallel_branches_parent_node_name:
continue
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
Expand Down Expand Up @@ -914,19 +916,24 @@ def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent)
# 3. Leaf node observation at any matching ancestor prefix,
# walked longest-first.
# 4. None — the Trace itself becomes the implicit parent.
# Per proposal 0044: an inner branch node parents under its per-branch
# dispatch observation (longest-first; innermost branch wins).
if event.branch_name is not None:
for prefix_len in range(len(event.namespace) - 1, 0, -1):
prefix = event.namespace[:prefix_len]
# Per proposals 0044 / 0013: an inner node parents under the INNERMOST
# dispatch on its lineage -- a per-branch dispatch
# (parallel_branches_branch_spans, keyed prefix + (branch_name,)) or a
# per-instance fan-out dispatch (fan_out_instance_observations, keyed
# prefix + (str(fan_out_index),)). Walk prefixes longest-first so the
# innermost wins; this resolves arbitrary nesting (fan-out in fan-out,
# parallel-branches in fan-out, ...). Mirrors the OTel
# _resolve_parent_context.
for prefix_len in range(len(event.namespace), 0, -1):
prefix = event.namespace[:prefix_len]
if event.branch_name is not None:
branch_dispatch = inv_state.parallel_branches_branch_spans.get(prefix + (event.branch_name,))
if branch_dispatch is not None:
return branch_dispatch.handle.id
if event.fan_out_index is not None and event.namespace:
instance_key = event.namespace[:1] + (str(event.fan_out_index),)
dispatch = inv_state.fan_out_instance_observations.get(instance_key)
if dispatch is not None:
return dispatch.handle.id
if event.fan_out_index is not None:
dispatch = inv_state.fan_out_instance_observations.get(prefix + (str(event.fan_out_index),))
if dispatch is not None:
return dispatch.handle.id
for prefix_len in range(len(event.namespace) - 1, 0, -1):
prefix = event.namespace[:prefix_len]
sg = inv_state.subgraph_observations.get(prefix)
Expand Down Expand Up @@ -1007,39 +1014,38 @@ def _sync_subgraph_observations(
prefix = namespace[:depth]
if prefix in inv_state.subgraph_observations:
continue
# Non-detached per-instance dispatch for the current
# event's own fan-out instance gets opened below; skip
# the regular subgraph path here so we don't double-open.
# The per-instance dispatch for this event's own instance is opened
# below; skip the regular subgraph path so we don't double-open. Runs
# at ANY depth -- the fan-out may sit inside another fan-out instance
# or a subgraph wrapper -- mirroring the OTel observer.
if (
depth == 1
and event.fan_out_index is not None
event.fan_out_index is not None
and (prefix + (str(event.fan_out_index),)) in inv_state.fan_out_instance_observations
):
continue
# Detached subgraph: the first segment matches a
# configured detached_subgraphs name → mint a fresh
# detached Trace + open the dispatch observation in it.
# Detached subgraph: kept top-level (depth == 1). _trace_id_for routes
# detached events by namespace[:1], so a nested detached unit would
# partially detach (its dispatch in the new Trace, inner nodes in the
# main one). Nested-detached support rides with the deferred
# nested-dispatch-keying fix that generalizes _trace_id_for too.
if depth == 1 and prefix[0] in self.detached_subgraphs:
self._open_detached_subgraph_trace(inv_state, correlation_id, prefix, event)
continue
# Detached fan-out: the fan-out instance gets its own
# Trace per spec §8.5. The fan-out node's Span observation
# in the parent Trace already exists (opened on the
# fan-out node's started event); the detached dispatch
# observation goes into the new Trace.
# Detached fan-out: the fan-out instance gets its own Trace per spec
# §8.5. The fan-out node's Span observation in the parent Trace
# already exists; the detached dispatch goes into the new Trace. Kept
# top-level for the same reason as detached subgraphs above.
if depth == 1 and event.fan_out_index is not None and prefix[0] in self.detached_fan_outs:
self._open_detached_fan_out_instance_trace(inv_state, correlation_id, prefix, event)
continue
# Non-detached fan-out: synthesize per-instance dispatch
# observation under the fan-out node observation (proposal
# 0013 v0.10.0). Only triggers when the inner event is
# inside a fan-out instance AND the fan-out node's
# parent_node_name has been cached (i.e., the fan-out
# node's own started event was seen).
# Non-detached fan-out: synthesize the per-instance dispatch
# observation under the fan-out node observation (proposal 0013
# v0.10.0). The fan_out_parent_node_name cache match self-gates to
# the fan-out node's namespace, so this runs at any depth (nested
# fan-out, or a fan-out inside a subgraph wrapper / branch).
if (
depth == 1
and event.fan_out_index is not None
and prefix[0] not in self.detached_fan_outs
event.fan_out_index is not None
and prefix[-1] not in self.detached_fan_outs
and prefix in inv_state.fan_out_parent_node_name
):
self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event)
Expand Down Expand Up @@ -1440,14 +1446,16 @@ def _close_parallel_branches_branch_dispatch_observation(
def _find_node_observation(
self, inv_state: _InvState, prefix: tuple[str, ...]
) -> _OpenObservation | None:
# Find a NODE's own open leaf observation at the given prefix (the
# fan-out or parallel-branches NODE, whose per-instance / per-branch
# dispatches parent under it). Retry middleware wrapping the node bumps
# the attempt_index; this scans for any entry at ``prefix`` with
# ``fan_out_index is None``. Only one such entry is open at a time
# (retry opens and closes within an attempt's lifecycle).
# Find a NODE's own open leaf observation at ``prefix`` (the fan-out or
# parallel-branches NODE, whose per-instance / per-branch dispatches
# parent under it). Scan by namespace only: the NODE may itself carry an
# outer fan_out_index / branch_name when it is nested inside another
# fan-out instance or branch, so filtering on fan_out_index is None would
# miss it. Only one entry per namespace is open at a time (retry opens
# and closes attempts serially), so the scan is unambiguous. Mirrors the
# OTel _find_fan_out_node_span.
for key, observation in inv_state.open_observations.items():
if key[0] == prefix and key[2] is None:
if key[0] == prefix:
return observation
return None

Expand Down
23 changes: 10 additions & 13 deletions src/openarmature/observability/otel/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1190,20 +1190,17 @@ def _collect_augmentation_targets(
# parent_node_name caches)
for key, open_span in inv_state.open_spans.items():
ns, _ai, _fi, _bn = key
if ns == aug_ns:
# Same context — must have matching chain to be the
# augmenter's own attempt rather than a sibling
# instance's same-named node.
if _span_chain_on_path(open_span, aug_fi_chain, aug_bn_chain):
targets.append(open_span.span)
if ns != aug_ns and not is_strict_prefix(ns, aug_ns):
continue
if not is_strict_prefix(ns, aug_ns):
continue
# Shared-parent check: if this NODE is a fan-out node or
# a parallel-branches node (dispatcher), it's a shared
# parent and MUST NOT be updated regardless of cardinality
# (§3.4 — the structural classification governs, not the
# live sibling count).
# Shared-parent check: a fan-out NODE or parallel-branches NODE
# (dispatcher) is a shared parent and MUST NOT be updated regardless
# of cardinality (§3.4 — the structural classification governs, not
# the live sibling count). This applies whether the NODE is a strict
# ancestor OR sits at the augmenter's own namespace: an
# instance/branch executes AT the fan-out/pb node's namespace, so
# ns == aug_ns matches the shared NODE too (its per-instance dispatch
# span is the one updated, separately above). The chain check below
# still excludes sibling instances' same-named nodes.
if ns in inv_state.fan_out_parent_node_name or ns in inv_state.parallel_branches_parent_node_name:
continue
if _span_chain_on_path(open_span, aug_fi_chain, aug_bn_chain):
Expand Down
Loading