Skip to content

Commit 3d29f00

Browse files
Fix observer augmentation scoping, wire 039 case 3 (#193)
* Fix fan-out dispatch and augmentation scoping Two observer corrections surfaced while wiring the nested-lineage conformance fixture (proposal 0045): - The augmentation walk's same-namespace arm now skips shared-parent fan-out / parallel-branches NODE spans, in both the OTel and Langfuse observers. A key set via set_invocation_metadata inside a fan-out instance or branch was wrongly applied to the shared fork NODE when the augmenting context executed at that node's own namespace, in addition to the dispatch span where it belongs. This violated the observability 3.4 MUST-NOT; the strict-ancestor arm already skipped shared parents. Per-instance / per-branch dispatch spans and lineage ancestors are unaffected. - The Langfuse per-instance fan-out dispatch synthesis and parent resolution are now prefix-general, so a fan-out nested below the top namespace level (inside a serial subgraph wrapper) gets its dispatch observation synthesized and its inner observations parented under it, matching the OTel observer. * Wire 039 case 3 conformance, defer cases 1+2 Activate the nested-lineage fixture (039) in the Langfuse conformance runner. Case 3 (a fan-out inside a serial subgraph wrapper) is built by a dedicated hand-built runner -- the generic adapter cannot construct nested fan-out graphs -- and asserted against the fixture's expected trace. A ContextVar-scoped negative check enforces proposal 0045's MUST-NOT: an augmented key absent from an observation's expected metadata must be absent in the actual, which the subset matcher alone cannot catch. Cases 1 and 2 are temporarily deferred via _DEFERRED_CASES. Both need a shared observer fix: dispatch keys do not encode the enclosing fan-out instance, so a dispatch inside an outer instance collides across instances. Tracked as a separate effort. * Keep detached dispatch synthesis top-level From CoPilot review of #193: the prefix-general change over-generalized the detached subgraph / fan-out synthesis arms to any depth, but _trace_id_for still routes detached events by namespace[:1]. A nested detached fan-out would partially detach -- its dispatch in the new Trace but inner nodes in the main one. Re-gate both detached arms to depth == 1; only the non-detached fan-out arm and the dedup need to be prefix-general (what case 3 exercises). A nested detached fan-out now gets no synthesis, consistent with the prior behavior, until the deferred nested-dispatch-keying fix generalizes _trace_id_for too.
1 parent b942d7d commit 3d29f00

4 files changed

Lines changed: 255 additions & 77 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
1010

1111
- **`current_fan_out_index()` inside fan-out instance middleware** now returns the executing instance's index (and `current_fan_out_index_chain()` its lineage) instead of `None`. The engine set the fan-out lineage ContextVars per-node, inside the inner subgraph, which left them unset in `instance_middleware` that wraps the subgraph from outside; they are now set around the instance-middleware chain. The documented `instance_middleware` use (`RetryMiddleware`) does not read the index, so no shipped behavior changes. This corrects the value seen by custom instance middleware that reads the index or calls `set_invocation_metadata`.
1212
- **Langfuse per-branch dispatch-span observation** (observability §4.3 / §8.4.2, proposals 0042 / 0044). The Langfuse observer now synthesizes a per-branch Span observation under a `parallel_branches` dispatcher node, so each branch's inner observations nest under their own branch span (a three-level dispatcher / per-branch-span / inner-nodes tree) instead of parenting directly under the dispatcher. The per-branch observation carries the OA-emitted `branch_name` alongside the caller baseline metadata and any per-branch augmentation, and the Generation observation now carries `branch_name` too. The OTel observer already produced this shape (proposal 0044 shipped OTel-only in v0.11.0); this brings the Langfuse mapping into line. Callable branches (proposal 0075) are unchanged.
13+
- **Augmentation no longer lands on a shared-parent fan-out / parallel-branches node** (observability §3.4, proposal 0045). A key set via `set_invocation_metadata` inside a fan-out instance or a parallel-branches branch was incorrectly applied to the shared fan-out / dispatcher NODE span (the fork point) when the augmenting context executed at that node's own namespace, in addition to the per-instance / per-branch dispatch span where it belongs. Both the OTel and Langfuse observers now skip the shared-parent node in that case, matching the behavior already applied to strict-ancestor shared parents. The per-instance / per-branch dispatch spans and the lineage ancestors that carry the augmentation are unaffected.
14+
- **Langfuse fan-out-instance dispatch nested below the top level** (observability §5.4, proposal 0013). The Langfuse observer's per-instance dispatch synthesis and parent resolution are now prefix-general, so a fan-out node sitting inside a serial subgraph wrapper (rather than at the top namespace level) gets its per-instance dispatch observation synthesized and its inner observations parented under it. This matches the OTel observer, which already resolved across every namespace prefix.
1315

1416
## [0.15.0] — 2026-06-22
1517

src/openarmature/observability/langfuse/observer.py

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -669,12 +669,14 @@ def _handle_metadata_augmentation(self, event: MetadataAugmentationEvent) -> Non
669669
# parent_node_name caches.
670670
for key, observation in inv_state.open_observations.items():
671671
ns, _ai, _fi, _bn = key
672-
if ns == aug_ns:
673-
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
674-
observation.handle.update(metadata=metadata_delta)
675-
continue
676-
if not is_strict_prefix(ns, aug_ns):
672+
if ns != aug_ns and not is_strict_prefix(ns, aug_ns):
677673
continue
674+
# A fan-out / pb NODE is a shared parent and MUST NOT carry an
675+
# instance's / branch's augmentation (proposal 0045 §3.4). This skip
676+
# applies whether the NODE sits strictly above the augmenter OR at
677+
# the augmenter's own namespace: an instance/branch executes AT the
678+
# fan-out/pb node's namespace, so ns == aug_ns also matches the shared
679+
# NODE (its per-instance dispatch is the one updated, separately above).
678680
if ns in inv_state.fan_out_parent_node_name or ns in inv_state.parallel_branches_parent_node_name:
679681
continue
680682
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
@@ -914,19 +916,24 @@ def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent)
914916
# 3. Leaf node observation at any matching ancestor prefix,
915917
# walked longest-first.
916918
# 4. None — the Trace itself becomes the implicit parent.
917-
# Per proposal 0044: an inner branch node parents under its per-branch
918-
# dispatch observation (longest-first; innermost branch wins).
919-
if event.branch_name is not None:
920-
for prefix_len in range(len(event.namespace) - 1, 0, -1):
921-
prefix = event.namespace[:prefix_len]
919+
# Per proposals 0044 / 0013: an inner node parents under the INNERMOST
920+
# dispatch on its lineage -- a per-branch dispatch
921+
# (parallel_branches_branch_spans, keyed prefix + (branch_name,)) or a
922+
# per-instance fan-out dispatch (fan_out_instance_observations, keyed
923+
# prefix + (str(fan_out_index),)). Walk prefixes longest-first so the
924+
# innermost wins; this resolves arbitrary nesting (fan-out in fan-out,
925+
# parallel-branches in fan-out, ...). Mirrors the OTel
926+
# _resolve_parent_context.
927+
for prefix_len in range(len(event.namespace), 0, -1):
928+
prefix = event.namespace[:prefix_len]
929+
if event.branch_name is not None:
922930
branch_dispatch = inv_state.parallel_branches_branch_spans.get(prefix + (event.branch_name,))
923931
if branch_dispatch is not None:
924932
return branch_dispatch.handle.id
925-
if event.fan_out_index is not None and event.namespace:
926-
instance_key = event.namespace[:1] + (str(event.fan_out_index),)
927-
dispatch = inv_state.fan_out_instance_observations.get(instance_key)
928-
if dispatch is not None:
929-
return dispatch.handle.id
933+
if event.fan_out_index is not None:
934+
dispatch = inv_state.fan_out_instance_observations.get(prefix + (str(event.fan_out_index),))
935+
if dispatch is not None:
936+
return dispatch.handle.id
930937
for prefix_len in range(len(event.namespace) - 1, 0, -1):
931938
prefix = event.namespace[:prefix_len]
932939
sg = inv_state.subgraph_observations.get(prefix)
@@ -1007,39 +1014,38 @@ def _sync_subgraph_observations(
10071014
prefix = namespace[:depth]
10081015
if prefix in inv_state.subgraph_observations:
10091016
continue
1010-
# Non-detached per-instance dispatch for the current
1011-
# event's own fan-out instance gets opened below; skip
1012-
# the regular subgraph path here so we don't double-open.
1017+
# The per-instance dispatch for this event's own instance is opened
1018+
# below; skip the regular subgraph path so we don't double-open. Runs
1019+
# at ANY depth -- the fan-out may sit inside another fan-out instance
1020+
# or a subgraph wrapper -- mirroring the OTel observer.
10131021
if (
1014-
depth == 1
1015-
and event.fan_out_index is not None
1022+
event.fan_out_index is not None
10161023
and (prefix + (str(event.fan_out_index),)) in inv_state.fan_out_instance_observations
10171024
):
10181025
continue
1019-
# Detached subgraph: the first segment matches a
1020-
# configured detached_subgraphs name → mint a fresh
1021-
# detached Trace + open the dispatch observation in it.
1026+
# Detached subgraph: kept top-level (depth == 1). _trace_id_for routes
1027+
# detached events by namespace[:1], so a nested detached unit would
1028+
# partially detach (its dispatch in the new Trace, inner nodes in the
1029+
# main one). Nested-detached support rides with the deferred
1030+
# nested-dispatch-keying fix that generalizes _trace_id_for too.
10221031
if depth == 1 and prefix[0] in self.detached_subgraphs:
10231032
self._open_detached_subgraph_trace(inv_state, correlation_id, prefix, event)
10241033
continue
1025-
# Detached fan-out: the fan-out instance gets its own
1026-
# Trace per spec §8.5. The fan-out node's Span observation
1027-
# in the parent Trace already exists (opened on the
1028-
# fan-out node's started event); the detached dispatch
1029-
# observation goes into the new Trace.
1034+
# Detached fan-out: the fan-out instance gets its own Trace per spec
1035+
# §8.5. The fan-out node's Span observation in the parent Trace
1036+
# already exists; the detached dispatch goes into the new Trace. Kept
1037+
# top-level for the same reason as detached subgraphs above.
10301038
if depth == 1 and event.fan_out_index is not None and prefix[0] in self.detached_fan_outs:
10311039
self._open_detached_fan_out_instance_trace(inv_state, correlation_id, prefix, event)
10321040
continue
1033-
# Non-detached fan-out: synthesize per-instance dispatch
1034-
# observation under the fan-out node observation (proposal
1035-
# 0013 v0.10.0). Only triggers when the inner event is
1036-
# inside a fan-out instance AND the fan-out node's
1037-
# parent_node_name has been cached (i.e., the fan-out
1038-
# node's own started event was seen).
1041+
# Non-detached fan-out: synthesize the per-instance dispatch
1042+
# observation under the fan-out node observation (proposal 0013
1043+
# v0.10.0). The fan_out_parent_node_name cache match self-gates to
1044+
# the fan-out node's namespace, so this runs at any depth (nested
1045+
# fan-out, or a fan-out inside a subgraph wrapper / branch).
10391046
if (
1040-
depth == 1
1041-
and event.fan_out_index is not None
1042-
and prefix[0] not in self.detached_fan_outs
1047+
event.fan_out_index is not None
1048+
and prefix[-1] not in self.detached_fan_outs
10431049
and prefix in inv_state.fan_out_parent_node_name
10441050
):
10451051
self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event)
@@ -1440,14 +1446,16 @@ def _close_parallel_branches_branch_dispatch_observation(
14401446
def _find_node_observation(
14411447
self, inv_state: _InvState, prefix: tuple[str, ...]
14421448
) -> _OpenObservation | None:
1443-
# Find a NODE's own open leaf observation at the given prefix (the
1444-
# fan-out or parallel-branches NODE, whose per-instance / per-branch
1445-
# dispatches parent under it). Retry middleware wrapping the node bumps
1446-
# the attempt_index; this scans for any entry at ``prefix`` with
1447-
# ``fan_out_index is None``. Only one such entry is open at a time
1448-
# (retry opens and closes within an attempt's lifecycle).
1449+
# Find a NODE's own open leaf observation at ``prefix`` (the fan-out or
1450+
# parallel-branches NODE, whose per-instance / per-branch dispatches
1451+
# parent under it). Scan by namespace only: the NODE may itself carry an
1452+
# outer fan_out_index / branch_name when it is nested inside another
1453+
# fan-out instance or branch, so filtering on fan_out_index is None would
1454+
# miss it. Only one entry per namespace is open at a time (retry opens
1455+
# and closes attempts serially), so the scan is unambiguous. Mirrors the
1456+
# OTel _find_fan_out_node_span.
14491457
for key, observation in inv_state.open_observations.items():
1450-
if key[0] == prefix and key[2] is None:
1458+
if key[0] == prefix:
14511459
return observation
14521460
return None
14531461

src/openarmature/observability/otel/observer.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,20 +1190,17 @@ def _collect_augmentation_targets(
11901190
# parent_node_name caches)
11911191
for key, open_span in inv_state.open_spans.items():
11921192
ns, _ai, _fi, _bn = key
1193-
if ns == aug_ns:
1194-
# Same context — must have matching chain to be the
1195-
# augmenter's own attempt rather than a sibling
1196-
# instance's same-named node.
1197-
if _span_chain_on_path(open_span, aug_fi_chain, aug_bn_chain):
1198-
targets.append(open_span.span)
1193+
if ns != aug_ns and not is_strict_prefix(ns, aug_ns):
11991194
continue
1200-
if not is_strict_prefix(ns, aug_ns):
1201-
continue
1202-
# Shared-parent check: if this NODE is a fan-out node or
1203-
# a parallel-branches node (dispatcher), it's a shared
1204-
# parent and MUST NOT be updated regardless of cardinality
1205-
# (§3.4 — the structural classification governs, not the
1206-
# live sibling count).
1195+
# Shared-parent check: a fan-out NODE or parallel-branches NODE
1196+
# (dispatcher) is a shared parent and MUST NOT be updated regardless
1197+
# of cardinality (§3.4 — the structural classification governs, not
1198+
# the live sibling count). This applies whether the NODE is a strict
1199+
# ancestor OR sits at the augmenter's own namespace: an
1200+
# instance/branch executes AT the fan-out/pb node's namespace, so
1201+
# ns == aug_ns matches the shared NODE too (its per-instance dispatch
1202+
# span is the one updated, separately above). The chain check below
1203+
# still excludes sibling instances' same-named nodes.
12071204
if ns in inv_state.fan_out_parent_node_name or ns in inv_state.parallel_branches_parent_node_name:
12081205
continue
12091206
if _span_chain_on_path(open_span, aug_fi_chain, aug_bn_chain):

0 commit comments

Comments
 (0)