Skip to content

Commit fc69aaa

Browse files
Fix parallel-branches branch middleware projection (#157)
Branch middleware wraps the branch's subgraph invocation (per pipeline-utilities 11.7), so it must operate in the branch subgraph's state space. The 11.4 outputs projection ran inside the middleware chain, so a middleware that short-circuits with a subgraph-space partial update (notably FailureIsolation's degraded_update) had it misapplied to the parent state and tripped extra-field validation. Move the projection outside the chain: the middleware now sees the subgraph-space partial, and the parent projection runs on whatever the chain produced (the real result on success, a degraded_update on isolation). A degraded_update that omits a projected field skips that field rather than raising, per the 11.4 partial-contribution model. The bug was latent because RetryMiddleware, the only branch middleware exercised before now, re-invokes rather than returning a cross-space update. The success path, fan-out instance middleware, and node-level placement are unchanged.
1 parent 447c06a commit fc69aaa

3 files changed

Lines changed: 187 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
1717
- **Failure-isolation events report the originating cause's category at non-node placements** (proposal 0065, pipeline-utilities §6.3). When `FailureIsolationMiddleware` runs as instance middleware (§9.7), branch middleware (§11.7), or parent-node middleware on a fan-out / parallel-branches node, the graph engine has already wrapped the originating error as a `node_exception` carrier before the middleware catches it. `FailureIsolatedEvent.caught_exception.category` now resolves through that carrier (and any nested carriers) to the nearest categorized originating cause and reports its category instead of the masking `node_exception`, so the reported category agrees with what the §6.1 retry classifier acted on. For example, an instance whose retries exhaust on `provider_unavailable` now surfaces `provider_unavailable` rather than `node_exception`. The `message` tracks the resolved cause for category/message coherence. Node-level placement was already faithful and is unchanged, and catch/degrade behavior is unchanged at every site (only the event's reported cause changes). The wrapped-instance/branch lineage SHOULD (`fan_out_index` / `branch_name`) is deferred to a follow-up, since it needs the engine to surface per-instance identity to the wrapping-site middleware.
1818
- **Observer privacy flag `disable_llm_payload` renamed to `disable_provider_payload`** (proposal 0059, observability §5.5.4, spec v0.54.0). The observer-level flag on both bundled observers (`OTelObserver` and `LangfuseObserver`) is renamed, and its scope broadens from LLM-completion payload to any provider-call payload (LLM completion today; embedding and rerank when those land). This is a breaking change to both observer constructors: config passing `disable_llm_payload=True` (or `False`) updates to `disable_provider_payload=...` with no other change. The default stays `True` (payload suppressed), and the gating behavior for `LlmCompletionEvent` / `LlmFailedEvent` rendering is unchanged at every existing site. The rename is the only part of proposal 0059 adopted this cycle: the retrieval-provider capability itself (the `EmbeddingProvider` protocol, the `EmbeddingEvent` / `EmbeddingFailedEvent` typed variants, and the embedding span / observation mapping) is not yet implemented and rides as `not-yet` in `conformance.toml`. The §5.5.4 rename touches existing LLM-payload gating, so it lands with the pin. Pinned spec advances v0.53.0 → v0.54.0.
1919

20+
### Fixed
21+
22+
- **Parallel-branches branch middleware now runs in the branch subgraph's state space** (pipeline-utilities §11.7). Branch middleware wraps the branch's subgraph invocation, so a middleware that short-circuits with a subgraph-space partial update (notably `FailureIsolationMiddleware`'s `degraded_update`) is now projected to the parent through the branch's `outputs` mapping, exactly like a real subgraph result. Previously the `outputs` projection ran inside the middleware chain, so a branch-level `degraded_update` written in the subgraph's fields reached the parent state unprojected and tripped extra-field validation. The bug was latent because the only bundled branch middleware exercised until now was `RetryMiddleware`, which re-invokes the chain rather than returning a cross-space update; it surfaces with failure isolation at a branch placement. A `degraded_update` that does not cover a projected `outputs` field contributes nothing for that field (the parent keeps its prior value) rather than raising, consistent with the §11.4 buffer-then-merge model for partial contributions. The success path, fan-out instance middleware (which already operated in subgraph space), and node-level placement are unchanged.
23+
2024
## [0.13.0] — 2026-06-09
2125

2226
LLM provider hardening release. The pinned spec advances from v0.46.0 to v0.53.0, absorbing four implemented proposals. Proposal 0049 introduces the first spec-normatively-typed observer event variant, `LlmCompletionEvent`, dispatched on every successful LLM provider call; proposal 0058 adds the failure-side counterpart, `LlmFailedEvent`; proposal 0057 extends the completion variant with eight request-side fields. The bundled `OpenAIProvider` retires its sentinel-namespace `NodeEvent` emission for LLM calls entirely, and the OTel and Langfuse observers now drive their LLM span / Generation from the typed events with back-dated timestamps so durations reflect the adapter boundary. Proposal 0047 closes implicit prefix-cache wire-byte stability: `Response.usage` gains cache-stat fields, the OTel observer emits `openarmature.llm.cache_read` attributes, and the OpenAI Chat Completions request body is byte-stable across equivalent inputs regardless of dict insertion order. Custom observers that filtered LLM calls by sentinel namespace MUST migrate to `isinstance` discrimination; `LLM_NAMESPACE` and `LlmEventPayload` remain as a documented compatibility surface.

src/openarmature/graph/parallel_branches.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,42 @@ async def run_branch(branch_name: str, spec: BranchSpec[Any]) -> Mapping[str, An
166166

167167
async def innermost(s: Any) -> Mapping[str, Any]:
168168
final_branch_state = await spec.subgraph._invoke(s, child_context) # noqa: SLF001
169-
# Per §11.4 projection out: only fields named
170-
# in ``outputs`` contribute back to parent
171-
# state; unnamed subgraph fields are discarded.
169+
# Branch middleware wraps the subgraph invocation
170+
# (§11.7), so the chain operates in the branch
171+
# subgraph's state space. Surface the ``outputs``
172+
# source fields keyed by their subgraph names (via
173+
# getattr, preserving field-value identity) so a
174+
# middleware that short-circuits with a subgraph-space
175+
# partial update — FailureIsolation's degraded_update —
176+
# composes in the same space. The §11.4 projection to
177+
# parent fields runs below, OUTSIDE the chain.
172178
return {
173-
parent_field: getattr(final_branch_state, sub_field)
174-
for parent_field, sub_field in spec.outputs.items()
179+
sub_field: getattr(final_branch_state, sub_field)
180+
for sub_field in spec.outputs.values()
175181
}
176182

177183
chain: ChainCall = compose_chain(spec.middleware, innermost)
178-
return await chain(initial)
184+
branch_partial = await chain(initial)
185+
# Per §11.4 projection out: map each ``outputs`` sub-field
186+
# (read from the subgraph-space partial the chain produced
187+
# — the real subgraph result on success, or a
188+
# degraded_update on isolation) to its parent field.
189+
# Unnamed subgraph fields are discarded.
190+
#
191+
# Skip a sub-field the partial doesn't carry: a branch
192+
# contributes only the parent fields it supplies and the
193+
# §11.4 buffer-then-merge model already merges heterogeneous
194+
# partial contributions, so an omitted field leaves the
195+
# parent to its prior / sibling-branch value. On the success
196+
# path ``innermost`` always supplies every sub-field; the
197+
# subset case is a degraded_update that doesn't cover a
198+
# projected field, where a hard miss would defeat the point
199+
# of failure isolation.
200+
return {
201+
parent_field: branch_partial[sub_field]
202+
for parent_field, sub_field in spec.outputs.items()
203+
if sub_field in branch_partial
204+
}
179205
finally:
180206
_reset_branch_name(token)
181207

tests/unit/test_parallel_branches.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939
append,
4040
merge,
4141
)
42+
from openarmature.graph.middleware import (
43+
FailureIsolationMiddleware,
44+
RetryConfig,
45+
RetryMiddleware,
46+
deterministic_backoff,
47+
)
4248

4349
# ---------------------------------------------------------------------------
4450
# Shared schemas + helpers
@@ -217,6 +223,151 @@ async def test_three_heterogeneous_branches_merge_to_parent() -> None:
217223
assert final.gamma_result == 3
218224

219225

226+
# ---------------------------------------------------------------------------
227+
# Branch middleware — state space (§11.7)
228+
# ---------------------------------------------------------------------------
229+
230+
231+
async def test_branch_middleware_degraded_update_projects_through_outputs() -> None:
232+
# Regression: branch middleware wraps the subgraph invocation (§11.7),
233+
# so the chain operates in the branch subgraph's state space. A
234+
# middleware that short-circuits with a subgraph-space partial update —
235+
# here FailureIsolation's degraded_update writing the subgraph field
236+
# ``b_out`` — MUST project to the parent through the branch's
237+
# ``outputs`` mapping, exactly like a real subgraph result. Before the
238+
# fix the ``outputs`` projection ran INSIDE the middleware chain, so the
239+
# degraded_update reached the parent as ``b_out`` and tripped
240+
# extra-field validation (ParentState has no ``b_out``).
241+
isolation = FailureIsolationMiddleware(
242+
degraded_update={"b_out": 99},
243+
event_name="beta_isolated",
244+
)
245+
compiled = (
246+
GraphBuilder(ParentState)
247+
.set_entry("dispatcher")
248+
.add_parallel_branches_node(
249+
"dispatcher",
250+
branches={
251+
"beta": BranchSpec(
252+
subgraph=_build_beta_raises("boom"),
253+
outputs={"beta_result": "b_out"},
254+
middleware=(isolation,),
255+
),
256+
},
257+
)
258+
.add_edge("dispatcher", END)
259+
.compile()
260+
)
261+
final = await compiled.invoke(ParentState())
262+
await compiled.drain()
263+
# The branch failed; FailureIsolation degraded it in subgraph space
264+
# (b_out=99); ``outputs`` projected b_out -> parent beta_result.
265+
assert final.beta_result == 99
266+
267+
268+
async def test_branch_middleware_success_path_projects_subgraph_output() -> None:
269+
# Guards the other side of the fix: with branch middleware present but
270+
# the branch SUCCEEDING, the real subgraph output (not the degraded
271+
# value) must still project through ``outputs``. Confirms moving the
272+
# projection outside the middleware chain left the success path intact.
273+
isolation = FailureIsolationMiddleware(
274+
degraded_update={"a_out": 99},
275+
event_name="alpha_isolated",
276+
)
277+
compiled = (
278+
GraphBuilder(ParentState)
279+
.set_entry("dispatcher")
280+
.add_parallel_branches_node(
281+
"dispatcher",
282+
branches={
283+
"alpha": BranchSpec(
284+
subgraph=_build_alpha_succeeds(), # returns a_out=1
285+
outputs={"alpha_result": "a_out"},
286+
middleware=(isolation,),
287+
),
288+
},
289+
)
290+
.add_edge("dispatcher", END)
291+
.compile()
292+
)
293+
final = await compiled.invoke(ParentState())
294+
await compiled.drain()
295+
assert final.alpha_result == 1 # real subgraph output, not the degraded 99
296+
297+
298+
async def test_branch_middleware_degraded_update_omitting_field_skips_contribution() -> None:
299+
# Leniency: a degraded_update that does not cover a projected
300+
# ``outputs`` sub-field contributes nothing for that field rather than
301+
# raising. The §11.4 buffer-then-merge model already merges partial
302+
# contributions, so the parent field keeps its prior value. Here the
303+
# branch degrades with an EMPTY update, so ``beta_result`` is never
304+
# contributed and stays at its ParentState default (0). A hard miss
305+
# would defeat the point of failure isolation (the resilience primitive
306+
# would itself crash the invocation).
307+
isolation = FailureIsolationMiddleware(
308+
degraded_update={},
309+
event_name="beta_isolated",
310+
)
311+
compiled = (
312+
GraphBuilder(ParentState)
313+
.set_entry("dispatcher")
314+
.add_parallel_branches_node(
315+
"dispatcher",
316+
branches={
317+
"beta": BranchSpec(
318+
subgraph=_build_beta_raises("boom"),
319+
outputs={"beta_result": "b_out"},
320+
middleware=(isolation,),
321+
),
322+
},
323+
)
324+
.add_edge("dispatcher", END)
325+
.compile()
326+
)
327+
final = await compiled.invoke(ParentState())
328+
await compiled.drain()
329+
assert final.beta_result == 0 # never contributed; parent default retained
330+
331+
332+
async def test_branch_middleware_isolation_wraps_retry_degrades_after_exhaustion() -> None:
333+
# Fixture-064-Case-1-shaped at a branch: middleware [failure_isolation,
334+
# retry] (outer-to-inner). The branch's node fails on every attempt;
335+
# retry exhausts its two attempts and re-raises; failure_isolation
336+
# catches the exhausted exception and degrades in subgraph space, which
337+
# projects to the parent. Exercises the state-space fix through a real
338+
# multi-middleware chain rather than a single frame.
339+
isolation = FailureIsolationMiddleware(
340+
degraded_update={"b_out": 99},
341+
event_name="beta_isolated",
342+
)
343+
retry = RetryMiddleware(
344+
RetryConfig(
345+
max_attempts=2,
346+
classifier=lambda _exc, _state: True,
347+
backoff=deterministic_backoff(0),
348+
)
349+
)
350+
compiled = (
351+
GraphBuilder(ParentState)
352+
.set_entry("dispatcher")
353+
.add_parallel_branches_node(
354+
"dispatcher",
355+
branches={
356+
"beta": BranchSpec(
357+
subgraph=_build_beta_raises("boom"),
358+
outputs={"beta_result": "b_out"},
359+
middleware=(isolation, retry),
360+
),
361+
},
362+
)
363+
.add_edge("dispatcher", END)
364+
.compile()
365+
)
366+
final = await compiled.invoke(ParentState())
367+
await compiled.drain()
368+
assert final.beta_result == 99
369+
370+
220371
# ---------------------------------------------------------------------------
221372
# fail_fast policy
222373
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)