Skip to content

Commit e410bba

Browse files
test(unit): parallel_branches + docs + CHANGELOG
Adds unit-test coverage and doc surface for proposal 0011. tests/unit/test_parallel_branches.py: - Compile-time validation: empty branches mapping raises ParallelBranchesNoBranches; empty branch_name raises ValueError; inputs / outputs / errors_field projection-field-existence checks raise MappingReferencesUndeclaredField on the correct side. - Runtime happy path: three heterogeneous branches merge their projected outputs into the parent. - fail_fast: ParallelBranchesBranchFailed surfaces branch_name and the original cause via __cause__; recoverable_state holds the parent's pre-dispatch snapshot with no buffered contributions applied. - collect: per-branch failures land in errors_field with branch_name + category; failed branches' outputs do not fire. - Determinism: when two branches write the same merge-reducer parent field, insertion order determines fan-in order regardless of inner-node completion timing. - Cancellation drain: a second branch racing past cancellation has its exception absorbed silently by gather(*, return_exceptions=True). docs/concepts/parallel-branches.md: new concepts page covering when to reach for parallel branches vs. fan-out, the BranchSpec shape, per-branch state / inputs / outputs, the two error policies (fail_fast buffer-and-apply semantics, collect with errors_field), branch middleware (with the v0.16.1 attempt_index propagation note), composition with other constructs, resume semantics, and when parallel branches is not the right shape. docs/concepts/index.md: link to the new page. docs/concepts/observability.md: NodeEvent shape gets branch_name; attempt_index description notes the v0.16.1 transitive-retry propagation; new branch_name bullet describes independence from fan_out_index and the openarmature.branch_name OTel attribute. CHANGELOG: adds the parallel-branches Unreleased entry plus the attempt-index ContextVar propagation entry; bumps the spec pin note from v0.16.0 to v0.16.1; flips the release-gate note since all five proposals in the batch are now landed.
1 parent 97cc787 commit e410bba

5 files changed

Lines changed: 657 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
88

99
### Added
1010

11+
- **Parallel branches (proposal 0011, introduced in spec v0.11.0; attempt-index propagation clarified in spec v0.16.1).** New `GraphBuilder.add_parallel_branches_node(name, *, branches, error_policy, errors_field, middleware)` surface dispatches M heterogeneous compiled subgraphs concurrently per pipeline-utilities §11. `BranchSpec` (subgraph + inputs/outputs projection + branch middleware) and `ParallelBranchesNode` types exported from `openarmature.graph`. Branch insertion order determines fan-in merge order regardless of completion timing (§11.8). Two error policies: `"fail_fast"` raises `ParallelBranchesBranchFailed` (a `NodeException` subtype) with `branch_name`, original cause as `__cause__`, and `recoverable_state` carrying the parent's pre-dispatch snapshot — no buffered branch contributions are visible (§11.5 buffer-and-apply). `"collect"` records per-branch failures in an optional `errors_field` (each record carries `branch_name` + `category` + implementation-defined extras) and continues. Two new error categories: `ParallelBranchesNoBranches` (compile time, empty branches map) and `ParallelBranchesBranchFailed` (runtime, fail_fast branch raise).
12+
- **`NodeEvent.branch_name: str | None` (proposal 0011 / graph-engine §6).** Populated on events from nodes inside a parallel-branches branch, absent outside. Independent of `fan_out_index` — both may be present simultaneously when a branch contains a fan-out (or a fan-out instance contains a parallel-branches node). The combined `(namespace, branch_name, fan_out_index, attempt_index, phase)` tuple is the event-source uniqueness key.
13+
- **`openarmature.branch_name` OTel span attribute.** Mirrors the existing `openarmature.node.fan_out_index`. Emitted on synthesized inner-node spans when `branch_name` is populated on the event. The two attributes coexist on inner nodes of a fan-out-inside-a-branch composition.
14+
- **Attempt-index ContextVar propagation through transitive retry (graph-engine §6 v0.16.1).** Retry middleware now sets the `attempt_index` ContextVar before each `next` call; the engine reads `current_attempt_index()` when emitting events. This makes retry semantics symmetric across direct (per-node middleware) and transitive (instance / branch / fan-out instance_middleware) wrapping — events from inner nodes of a subgraph the retry re-invokes carry the wrapping retry's counter, not a freshly-zeroed inner counter. Innermost-wins precedence falls out of Python's ContextVar set/reset token stack. Pre-existing node-level retry behavior is unchanged.
1115
- **State migration for checkpointed graphs (proposal 0014, introduced in spec v0.15.0; refined by proposal 0018 in spec v0.16.0).** Saved checkpoints whose `schema_version` doesn't match the current state class now route through a registered migration chain instead of failing on resume. Surface: `State.schema_version: ClassVar[str] = ""` (declare a non-empty value to opt in), `GraphBuilder.with_state_migration(from_version, to_version, migrate)` and `with_state_migrations(*migrations)` for registration, `StateMigration` and `MigrationRegistry` types exported from `openarmature.checkpoint`. Chain resolution is BFS over the registered edges; the shortest path wins. Three new error categories: `CheckpointStateMigrationChainAmbiguous` (proposal 0018: duplicate `(from, to)` pair at registration time, or multiple distinct shortest paths between the saved and current versions at resume time), `CheckpointStateMigrationMissing` (no chain bridges the versions), and `CheckpointStateMigrationFailed` (a migration function raised). All non-transient. Post-migration deserialization failures still route to `CheckpointRecordInvalid` per §10.12.4. The same chain applies to each entry in `parent_states` in lockstep with the outer state per §10.12.2. Routing precedence per §10.10 (v0.16.0): chain-ambiguous → missing → failed → record-invalid.
1216
- **`Checkpointer.supports_state_migration` Protocol attribute.** Marks whether a backend can expose the structural intermediate form (a plain dict, JSON tree) the migration registry consumes. `SQLiteCheckpointer(serialization="json")` opts in; `SQLiteCheckpointer(serialization="pickle")` and `InMemoryCheckpointer` opt out. On version mismatch against a non-migration-eligible backend the engine raises `CheckpointRecordInvalid` per spec §10.12.1.
1317
- **`openarmature.checkpoint.migrate` OTel span (proposal 0014 §6 cross-ref).** Versioned resumes whose migration chain runs emit a zero-duration `openarmature.checkpoint.migrate` span on the OTel observer, parented under the invocation root span. Attributes: `openarmature.checkpoint.migrate.from_version`, `openarmature.checkpoint.migrate.to_version` (the final target), `openarmature.checkpoint.migrate.chain_length`. The §10.12.3 fast path (versions match, registry not consulted) emits no span. Engine-side: a synthetic `checkpoint_migrated` observer phase carries a `_MigrationSummary` payload from `_migrate_record` through to the OTel observer; the new phase is gated off default subscriptions (observers opt in explicitly via `phases={..., "checkpoint_migrated"}`).
@@ -25,13 +29,13 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
2529

2630
### Changed
2731

28-
- **Pinned spec version: 0.10.0 → 0.16.0.** Adopts the skip-ahead governance principle: the submodule jumps across v0.11.0–v0.16.0 (proposals 0009, 0011, 0014, 0015, 0016, 0017, 0018) in one bump. Only the surfaces introduced by proposals 0014–0017 are implemented in the batch's release; fixtures from 0011 are deferred-skip in the conformance suite and unmark with PR-5.
32+
- **Pinned spec version: 0.10.0 → 0.16.1.** Adopts the skip-ahead governance principle: the submodule jumps across v0.11.0–v0.16.1 (proposals 0009, 0011, 0014, 0015, 0016, 0017, 0018) in one bump. All five proposals (0011, 0014, 0015, 0016, 0017) are implemented in the batch's release; the v0.16.1 clarification of attempt-index propagation through transitive retry middleware lands with the proposal 0011 implementation.
2933
- **`CheckpointRecord.schema_version` semantic shift (proposal 0014).** Previously a backend-internal record-shape version (`CHECKPOINT_SCHEMA_VERSION = "1"` constant), now the user-facing state-schema version per spec §10.2. The framework reads `type(state).schema_version` at save time. Pre-PR-4 records carrying `"1"` are reinterpreted as user-facing v1 identifiers; users with such records either declare `schema_version="1"` on their state class or discard the pre-PR-4 records. `SQLiteCheckpointer` no longer rejects records with non-default `schema_version` at the backend boundary; version-mismatch routing is now an engine concern at resume time. The `CHECKPOINT_SCHEMA_VERSION` module constant is removed; future record-shape evolution can add backend-private metadata fields if needed.
3034
- **`NodeEvent.pre_state` typed `Any` (was `State`).** Required by the new `checkpoint_migrated` phase which carries a `_MigrationSummary` payload rather than a `State` instance. Observer authors who type-narrowed `pre_state` to `State` should treat it as `Any` and narrow per-phase (e.g., `if event.phase == "completed": ...`). The `checkpoint_saved` phase already carried a State-flavored shape (not necessarily a typed `State` subclass instance), so this widens the declared type to match runtime reality rather than introducing a new constraint.
3135

3236
### Notes
3337

34-
- **Release gate: do not tag until all of {0011, 0014, 0015, 0016, 0017} are merged.** This batch implements one proposal per PR and lands a consolidated release after the fifth PR. Cutting a release tag before the batch is complete would ship a partial spec implementation against the v0.15.0 pin.
38+
- **Release gate cleared with PR-5 (proposal 0011).** All five proposals in the batch ({0011, 0014, 0015, 0016, 0017}) are now implemented. Tag the consolidated release once this PR merges.
3539
- **Pre-1.0 MINOR.** Existing free-form callers (no `response_schema`) see no behavior change — the new field defaults to `None`, the wire body omits `response_format`, and `Response.parsed` remains absent.
3640

3741
## [0.5.0] — 2026-05-10

docs/concepts/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ the framework, or jump to whichever concept you need.
1212
data seam.
1313
- [Fan-out](fan-out.md): running the same subgraph many times in
1414
parallel, results merged back deterministically.
15+
- [Parallel branches](parallel-branches.md): dispatching M
16+
heterogeneous subgraphs concurrently with per-branch state schemas
17+
and middleware.
1518
- [LLMs](llms.md): how LLM calls fit into nodes, structured output,
1619
routing on parsed fields, errors at the LLM boundary.
1720
- [Observability](observability.md): node-boundary hooks, OTel mapping,

docs/concepts/observability.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class NodeEvent:
7878
attempt_index: int = 0
7979
fan_out_index: int | None = None
8080
fan_out_config: FanOutEventConfig | None = None
81+
branch_name: str | None = None
8182
```
8283

8384
A walk-through:
@@ -130,7 +131,11 @@ A walk-through:
130131
`len(parent_states) == len(namespace) - 1`.
131132

132133
- **`attempt_index`**: 0-based retry attempt counter. `0` for nodes
133-
not wrapped by retry middleware; `1+` for retries.
134+
not wrapped by retry middleware; `1+` for retries. Retry middleware
135+
may wrap transitively — a retry on a [parallel-branches
136+
branch](parallel-branches.md) or fan-out `instance_middleware`
137+
re-runs the whole subgraph; events from inner nodes carry the
138+
wrapping retry's attempt counter.
134139

135140
- **`fan_out_index`**: 0-based per-instance index for events inside
136141
a fan-out instance; `None` outside.
@@ -140,6 +145,17 @@ A walk-through:
140145
`item_count` / `concurrency` / `error_policy` / `parent_node_name`.
141146
`None` on every other event.
142147

148+
- **`branch_name`**: populated on events from nodes inside a
149+
[parallel-branches branch](parallel-branches.md), carrying the
150+
branch's name as declared on the dispatcher. `None` outside.
151+
Independent of `fan_out_index` — both may be present simultaneously
152+
when a parallel-branches branch contains a fan-out (or a fan-out
153+
instance contains a parallel-branches node). The combination
154+
`(namespace, branch_name, fan_out_index, attempt_index, phase)`
155+
uniquely identifies each event source. On the OTel mapping
156+
side, an `openarmature.branch_name` span attribute is added in
157+
parallel to the existing `openarmature.node.fan_out_index`.
158+
143159
## Routing errors and the completed event
144160

145161
When a conditional edge raises or returns an invalid target:

docs/concepts/parallel-branches.md

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
# Parallel branches
2+
3+
Dispatch M heterogeneous subgraphs concurrently, projected outputs
4+
merged back into the parent via the parent's reducers in branch
5+
insertion order.
6+
7+
Sibling to [fan-out](fan-out.md) (same `for each thing, do work in
8+
parallel` shape), but the *thing* is different per branch: a research
9+
subgraph, a categorize subgraph, a sentiment subgraph — each with its
10+
own state schema, its own middleware, its own observer events —
11+
running in parallel and joining their results into one parent state.
12+
13+
## When to reach for parallel branches
14+
15+
The signal: a fixed set of named operations, each with its own
16+
behavior and state schema, that don't depend on each other. Three
17+
classifiers running independently against the same input. A research
18+
step, a translate step, and a fact-check step that all want the
19+
parent's prompt. M is known at build time and small (typically 2–6),
20+
and each branch is its own subgraph because each has its own
21+
internal pipeline worth modelling separately.
22+
23+
Fan-out is the right pick when you have N similar pieces of work,
24+
N depends on runtime state, and the work is the same across instances.
25+
Parallel branches is the right pick when M is a small fixed set of
26+
different operations that happen to run concurrently.
27+
28+
## The shape
29+
30+
```python
31+
from openarmature.graph import BranchSpec, GraphBuilder
32+
33+
builder.add_parallel_branches_node(
34+
"dispatcher",
35+
branches={
36+
"research": BranchSpec(
37+
subgraph=research_subgraph, # CompiledGraph[ResearchState]
38+
inputs={"question": "prompt"}, # subgraph_field -> parent_field
39+
outputs={"facts": "facts"}, # parent_field -> subgraph_field
40+
),
41+
"translate": BranchSpec(
42+
subgraph=translate_subgraph, # CompiledGraph[TranslateState]
43+
inputs={"source": "prompt"},
44+
outputs={"translation": "translated"},
45+
),
46+
"fact_check": BranchSpec(
47+
subgraph=fact_check_subgraph, # CompiledGraph[FactCheckState]
48+
inputs={"claim": "prompt"},
49+
outputs={"verdict": "verdict"},
50+
),
51+
},
52+
error_policy="fail_fast", # or "collect"
53+
)
54+
```
55+
56+
Each branch's `subgraph` is a compiled graph; `inputs` and `outputs`
57+
mirror the explicit projection shape from
58+
[composition](composition.md#explicitmapping-declarative). The
59+
branches dict's key is the branch name — used as the branch identity
60+
on observer events (see [observability](observability.md)) and in
61+
the per-branch error records that `error_policy: "collect"`
62+
produces.
63+
64+
## Per-branch state, inputs and outputs
65+
66+
Each branch runs its own subgraph against its own state — heterogeneous
67+
schemas are explicit. Subgraph fields named in `inputs` are seeded
68+
from the parent's corresponding field at branch entry; other subgraph
69+
fields take their schema defaults. At branch exit, only the parent
70+
fields named in `outputs` receive contributions; the rest of the
71+
branch's final state is discarded.
72+
73+
When two branches contribute to the same parent field, the parent's
74+
reducer for that field applies both values in **branch insertion
75+
order** — first the branch declared first in the `branches` dict,
76+
then the next, and so on. This is deterministic regardless of which
77+
branch's inner work finishes first.
78+
79+
## Error policy
80+
81+
- **`"fail_fast"`** (default): the first branch failure cancels
82+
the in-flight siblings and propagates as
83+
`ParallelBranchesBranchFailed` (a `NodeException` subtype) carrying
84+
the failing `branch_name` and the original cause as `__cause__`.
85+
`recoverable_state` is the parent's snapshot at the moment the
86+
dispatcher entered — **no buffered branch contributions are
87+
applied**, including those of branches that successfully completed
88+
before the failure. Buffer-and-apply semantics: contributions are
89+
held until every branch finishes, then either all apply (success)
90+
or none apply (fail_fast failure).
91+
- **`"collect"`**: every branch runs to completion. Successful
92+
branches' contributions merge in insertion order; failed branches'
93+
`outputs` projections do NOT fire (their named parent fields stay
94+
at their defaults). If you declare `errors_field` on the dispatcher,
95+
each failed branch produces a record with at minimum
96+
`{"branch_name": <name>, "category": <category>}` appended to that
97+
parent list field; the implementation may include additional keys
98+
(message, cause_type) and tests should match by the spec-mandated
99+
keys rather than strict equality.
100+
101+
## Branch middleware
102+
103+
Each `BranchSpec` accepts a `middleware` tuple — middlewares that
104+
wrap that branch's whole subgraph invocation as a unit. Retry
105+
middleware on a branch retries the **whole branch**: a fresh
106+
subgraph invocation each time, fresh inner-node execution. The
107+
wrapping retry's attempt counter propagates to events emitted from
108+
inner nodes (per graph-engine §6 v0.16.1), so observer events
109+
inside the branch correctly show `attempt_index` ticking across
110+
retries.
111+
112+
Branch middleware is independent across branches — branch A may
113+
have `[retry, timing]`; branch B may have `[]`; branch C may have
114+
some custom breaker. Each branch's chain composes in isolation.
115+
116+
## Composition with other constructs
117+
118+
Parallel branches compose with the rest of the engine the way
119+
subgraphs and fan-outs do:
120+
121+
- A branch's subgraph can itself contain a fan-out node — inner-node
122+
events inside that fan-out carry **both** `branch_name` (this
123+
branch) and `fan_out_index` (the instance within this branch).
124+
The two fields are independent.
125+
- The parallel-branches node itself can be invoked from inside a
126+
fan-out instance — inner events then carry the outer fan-out's
127+
`fan_out_index` and the inner branch's `branch_name`.
128+
- Per-graph and per-node middleware on the parallel-branches node
129+
wrap the dispatcher as a single unit — one `started` event before
130+
dispatch begins, one `completed` event after all branches finish
131+
and fan-in lands. The parent's retry middleware retries the **whole
132+
parallel-branches node**, not individual branches.
133+
134+
## Resume semantics
135+
136+
Parallel-branches nodes use the same **atomic restart** model as
137+
fan-out (per spec §10.7): if a checkpoint resume lands on a
138+
parallel-branches node, all branches re-dispatch from scratch.
139+
Per-branch progress is not individually persisted in v1.
140+
141+
## When parallel branches is NOT the right shape
142+
143+
- **Not the same as N copies of one subgraph.** If you want "run
144+
this subgraph for each item in a list," reach for
145+
[fan-out](fan-out.md).
146+
- **Not a router.** A router is a conditional-edge pattern — pick
147+
one branch based on state. Parallel branches runs *all* branches
148+
concurrently.
149+
- **Not a coordinator.** Branches don't communicate with each other
150+
during execution; if branch B's work depends on branch A's
151+
output, you want a linear pipeline (A → B), not parallel branches.

0 commit comments

Comments
 (0)