Skip to content

Commit 059f8a9

Browse files
bobakemamianclaude
andauthored
feat(drawer): parallelism inside for_each — concurrent iteration (#155)
Adds Parallelism int to kind=for_each. 0 or 1 keeps the existing serial fail-fast semantics. N > 1 spawns a worker pool so N iterations run concurrently: { "kind": "for_each", "over": "${scrape-list.output.urls}", "as": "url", "parallelism": 10, "steps": [ ... ] } ## Correctness story - **Ordering:** results[i] maps to items[i] regardless of completion order. Downstream refs like ${step.output.results.0.item} stay deterministic. - **Stop-on-failure:** the parallel path allocates a sub-context (workerCtx). First failure cancels it so in-flight iterations abort promptly instead of finishing work after the step has already been declared failed. The recorded failure is the lowest-indexed one so traces are reproducible. - **Continue-on-failure:** wg.Wait() still runs every scheduled iteration to completion; failures are marked per-item; the overall step completes ok with partial failure markers. - **Race-free:** each worker owns its slot in results[]/errs[] (preallocated by index); only first-failure bookkeeping is mutex-protected. `go test ./... -race` clean. - **Resource bounds:** buffered semaphore of size Parallelism caps inflight goroutines + child processes. No implicit 1:1 mapping between iterations and processes running simultaneously. ## Implementation - Serial path was inlined for its fast-return case (avoids channel/goroutine overhead for small N). Parallel path uses chan+WaitGroup (no x/sync dependency). - Per-iter body extracted into runForEachIter so serial and parallel share one implementation of per-iteration semantics. ## set extended New STEP.parallelism=N path addressable from the CLI. ## Tests - TestForEach_ParallelismSpeedsUpRuns — wall-clock assertion: 4×400ms iterations with parallelism=4 completes in <1.2s (serial baseline is ~1.6s). Catches regressions to serial. - TestForEach_ParallelismStopOnFirstFailure — cancels in-flight workers on first failure; step reports FOREACH_ITEM_FAILED. - TestForEach_ParallelismContinueCollectsAll — all 4 iterations run; 2 odd-indexed failures marked per-item; overall step ok. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7fae61a commit 059f8a9

8 files changed

Lines changed: 400 additions & 55 deletions

File tree

cmd/drawer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Usage:
5252
buttons drawer NAME set STEP.args.FIELD=value literal or ${ref} into a step arg
5353
buttons drawer NAME set STEP.over=EXPR for_each: the array to iterate
5454
buttons drawer NAME set STEP.as=NAME for_each: loop variable name
55+
buttons drawer NAME set STEP.parallelism=N for_each: max concurrent iterations (0/1 = serial)
5556
buttons drawer NAME set STEP.from=EXPR aggregate: input array
5657
buttons drawer NAME set STEP.pluck=EXPR aggregate: per-item expression
5758
buttons drawer NAME set STEP.steps.N.args.F=value reach a nested step's arg

docs/cli/buttons_drawer.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Usage:
2424
buttons drawer NAME set STEP.args.FIELD=value literal or ${ref} into a step arg
2525
buttons drawer NAME set STEP.over=EXPR for_each: the array to iterate
2626
buttons drawer NAME set STEP.as=NAME for_each: loop variable name
27+
buttons drawer NAME set STEP.parallelism=N for_each: max concurrent iterations (0/1 = serial)
2728
buttons drawer NAME set STEP.from=EXPR aggregate: input array
2829
buttons drawer NAME set STEP.pluck=EXPR aggregate: per-item expression
2930
buttons drawer NAME set STEP.steps.N.args.F=value reach a nested step's arg

docs/schemas/drawer.schema.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@
208208
"description": "CEL expression producing the array to iterate over (kind=for_each)",
209209
"type": "string"
210210
},
211+
"parallelism": {
212+
"description": "Max concurrent iterations (kind=for_each); 0 or 1 = serial",
213+
"type": "integer"
214+
},
211215
"pluck": {
212216
"description": "CEL expression evaluated per item; 'item' is bound to the current entry (kind=aggregate)",
213217
"type": "string"

internal/drawer/entity.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ type Step struct {
125125
// "fail-fast overall, but tolerate per-item errors here."
126126
OnItemFailure string `json:"on_item_failure,omitempty" jsonschema:"enum=stop,enum=continue,description=How to react to a per-item failure (kind=for_each)"`
127127

128+
// Parallelism bounds how many iterations run concurrently. 0 or
129+
// 1 means serial (same as before). N > 1 spawns a worker pool
130+
// of size N. Ordering of the results array still matches the
131+
// input order regardless of completion order — downstream refs
132+
// like ${step.output.results.0.item} stay deterministic.
133+
Parallelism int `json:"parallelism,omitempty" jsonschema:"description=Max concurrent iterations (kind=for_each); 0 or 1 = serial"`
134+
128135
// Steps are the nested steps executed per item (for_each) OR
129136
// the steps under a matching case's body (switch). Each child
130137
// context is layered on the outer one.

internal/drawer/executor.go

Lines changed: 142 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"os"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
"github.com/autonoco/buttons/internal/battery"
@@ -410,13 +411,12 @@ func (e *Executor) runDrawerStep(ctx context.Context, step *Step, ctxMap Context
410411
// its own child context with the outer context plus the loop
411412
// variable (step.As) bound to the current item.
412413
//
413-
// v1 is serial (no parallelism). Per-item failures stop the loop by
414-
// default; OnItemFailure="continue" records the error and moves on.
415-
// Output is {results: [...]} — one entry per iteration, each a map
416-
// of {step_id: step.Output} for the nested steps. Downstream refs
417-
// walk this via ${<for_each_id>.output.results.0.step_id.field}
418-
// (indexed access) but will typically be consumed by an aggregator
419-
// step in later stages.
414+
// Concurrency: Parallelism <= 1 runs serially (deterministic order,
415+
// fail-fast). Parallelism > 1 spawns a worker pool. Results are
416+
// still written in input order regardless of completion order so
417+
// downstream refs like ${step.output.results.0.item} stay stable.
418+
// OnItemFailure="stop" in parallel mode cancels in-flight workers
419+
// via a sub-context the moment the first failure is observed.
420420
func (e *Executor) runForEachStep(ctx context.Context, step *Step, ctxMap Context) (StepRun, error) {
421421
sr := StepRun{ID: step.ID}
422422

@@ -462,65 +462,153 @@ func (e *Executor) runForEachStep(ctx context.Context, step *Step, ctxMap Contex
462462
}
463463

464464
continueOnFail := step.OnItemFailure == "continue"
465-
// []any (not []map[string]any) so downstream ref resolvers and
466-
// aggregate steps can walk it without type gymnastics.
467-
results := make([]any, 0, len(items))
465+
parallelism := step.Parallelism
466+
if parallelism < 1 {
467+
parallelism = 1
468+
}
469+
470+
// Preallocated, index-addressable results so writers can drop
471+
// their output at position i without a mutex — each worker owns
472+
// exactly one index.
473+
results := make([]any, len(items))
474+
errs := make([]error, len(items))
475+
476+
// Parallelism == 1 keeps the serial, fail-fast semantics that
477+
// stage 3 shipped. Inlined to avoid the channel/goroutine cost
478+
// for small loops where serial was already fine.
479+
if parallelism == 1 {
480+
for i, item := range items {
481+
results[i], errs[i] = e.runForEachIter(ctx, step, asName, item, i, ctxMap)
482+
if errs[i] != nil && !continueOnFail {
483+
sr.Status = "failed"
484+
sr.Output = map[string]any{
485+
"results": trimNilTail(results, i+1),
486+
"completed": i + 1,
487+
"total": len(items),
488+
}
489+
sr.Error = &StepError{
490+
Code: "FOREACH_ITEM_FAILED",
491+
Message: fmt.Sprintf("iteration %d failed: %v", i, errs[i]),
492+
Remediation: "fix the failing nested step or set on_item_failure: continue to tolerate per-item errors",
493+
}
494+
return sr, errs[i]
495+
}
496+
}
497+
sr.Status = "ok"
498+
sr.Output = map[string]any{"results": results, "total": len(items)}
499+
return sr, nil
500+
}
501+
502+
// Parallel mode. workerCtx gets cancelled on first failure
503+
// when OnItemFailure != "continue" so in-flight iterations
504+
// abort promptly instead of finishing their work after we've
505+
// already decided to fail the step.
506+
workerCtx, cancel := context.WithCancel(ctx)
507+
defer cancel()
508+
509+
// Bounded semaphore + sync primitives. A plain buffered chan
510+
// doubles as the semaphore (acquire by send, release by recv)
511+
// without pulling in x/sync.
512+
sem := make(chan struct{}, parallelism)
513+
var wg sync.WaitGroup
514+
var failMu sync.Mutex
515+
var firstFailIdx int = -1
516+
var firstFailErr error
468517

469518
for i, item := range items {
470-
// Per-iteration context: start from the outer ctxMap and
471-
// overlay the loop variable so nested ${<as>.field} refs
472-
// resolve. Don't mutate the outer map — each iter gets its
473-
// own.
474-
iterCtx := Context{}
475-
for k, v := range ctxMap {
476-
iterCtx[k] = v
519+
// Respect early-cancel decisions made by previous workers.
520+
if workerCtx.Err() != nil {
521+
break
477522
}
478-
iterCtx[asName] = item
479-
480-
iterOut := map[string]any{}
481-
var iterErr error
482-
for _, nested := range step.Steps {
483-
nestedStep := nested
484-
stepRes, serr := e.runStep(ctx, nil, &nestedStep, iterCtx)
485-
iterOut[nestedStep.ID] = map[string]any{
486-
"status": stepRes.Status,
487-
"output": stepRes.Output,
488-
"error": stepRes.Error,
523+
wg.Add(1)
524+
sem <- struct{}{}
525+
go func(idx int, it any) {
526+
defer wg.Done()
527+
defer func() { <-sem }()
528+
out, err := e.runForEachIter(workerCtx, step, asName, it, idx, ctxMap)
529+
results[idx] = out
530+
errs[idx] = err
531+
if err != nil && !continueOnFail {
532+
failMu.Lock()
533+
if firstFailIdx == -1 || idx < firstFailIdx {
534+
firstFailIdx = idx
535+
firstFailErr = err
536+
}
537+
failMu.Unlock()
538+
cancel()
489539
}
490-
// Expose the just-completed nested step's output so
491-
// later nested steps in the same iteration can chain.
492-
iterCtx[nestedStep.ID] = map[string]any{"output": stepRes.Output}
493-
if serr != nil {
494-
iterErr = serr
495-
break
540+
}(i, item)
541+
}
542+
wg.Wait()
543+
544+
if firstFailIdx != -1 {
545+
completed := 0
546+
for i := range results {
547+
if results[i] != nil {
548+
completed++
496549
}
497550
}
551+
sr.Status = "failed"
552+
sr.Output = map[string]any{
553+
"results": results,
554+
"completed": completed,
555+
"total": len(items),
556+
}
557+
sr.Error = &StepError{
558+
Code: "FOREACH_ITEM_FAILED",
559+
Message: fmt.Sprintf("iteration %d failed: %v", firstFailIdx, firstFailErr),
560+
Remediation: "fix the failing nested step or set on_item_failure: continue to tolerate per-item errors",
561+
}
562+
return sr, firstFailErr
563+
}
498564

499-
results = append(results, map[string]any{
500-
"index": i,
501-
"item": item,
502-
"steps": iterOut,
503-
"failed": iterErr != nil,
504-
})
565+
sr.Status = "ok"
566+
sr.Output = map[string]any{"results": results, "total": len(items)}
567+
return sr, nil
568+
}
505569

506-
if iterErr != nil && !continueOnFail {
507-
sr.Status = "failed"
508-
sr.Output = map[string]any{"results": results, "completed": i + 1, "total": len(items)}
509-
sr.Error = &StepError{
510-
Code: "FOREACH_ITEM_FAILED",
511-
Message: fmt.Sprintf("iteration %d failed: %v", i, iterErr),
512-
Remediation: "fix the failing nested step or set on_item_failure: continue to tolerate per-item errors",
513-
}
514-
return sr, iterErr
570+
// runForEachIter executes the nested Steps for one iteration against
571+
// a per-iteration context. Extracted so both the serial and parallel
572+
// paths share exactly one implementation of the per-iter semantics.
573+
func (e *Executor) runForEachIter(ctx context.Context, step *Step, asName string, item any, idx int, outerCtx Context) (map[string]any, error) {
574+
iterCtx := Context{}
575+
for k, v := range outerCtx {
576+
iterCtx[k] = v
577+
}
578+
iterCtx[asName] = item
579+
580+
iterOut := map[string]any{}
581+
var iterErr error
582+
for _, nested := range step.Steps {
583+
nestedStep := nested
584+
stepRes, serr := e.runStep(ctx, nil, &nestedStep, iterCtx)
585+
iterOut[nestedStep.ID] = map[string]any{
586+
"status": stepRes.Status,
587+
"output": stepRes.Output,
588+
"error": stepRes.Error,
589+
}
590+
iterCtx[nestedStep.ID] = map[string]any{"output": stepRes.Output}
591+
if serr != nil {
592+
iterErr = serr
593+
break
515594
}
516595
}
596+
return map[string]any{
597+
"index": idx,
598+
"item": item,
599+
"steps": iterOut,
600+
"failed": iterErr != nil,
601+
}, iterErr
602+
}
517603

518-
sr.Status = "ok"
519-
sr.Output = map[string]any{
520-
"results": results,
521-
"total": len(items),
604+
// trimNilTail is used by the serial path to return a results slice
605+
// that only includes actually-completed iterations, keeping the
606+
// shape compatible with the pre-parallelism stage 3 behavior.
607+
func trimNilTail(xs []any, upTo int) []any {
608+
if upTo > len(xs) {
609+
upTo = len(xs)
522610
}
523-
return sr, nil
611+
return xs[:upTo]
524612
}
525613

526614
// runSwitchStep handles kind=switch — an if/elif/else chain. Walks

internal/drawer/schema_embedded.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@
208208
"description": "CEL expression producing the array to iterate over (kind=for_each)",
209209
"type": "string"
210210
},
211+
"parallelism": {
212+
"description": "Max concurrent iterations (kind=for_each); 0 or 1 = serial",
213+
"type": "integer"
214+
},
211215
"pluck": {
212216
"description": "CEL expression evaluated per item; 'item' is bound to the current entry (kind=aggregate)",
213217
"type": "string"

internal/drawer/service.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,19 @@ func (s *Service) SetField(drawerName, stepID, field, value string) (*Drawer, er
351351
d.Steps[idx].Duration = value
352352
case "until":
353353
d.Steps[idx].Until = value
354+
case "parallelism":
355+
var n int
356+
if _, err := fmt.Sscanf(value, "%d", &n); err != nil {
357+
return nil, &ServiceError{Code: "VALIDATION_ERROR", Message: fmt.Sprintf("parallelism must be an integer, got %q", value)}
358+
}
359+
if n < 0 {
360+
return nil, &ServiceError{Code: "VALIDATION_ERROR", Message: "parallelism must be >= 0"}
361+
}
362+
d.Steps[idx].Parallelism = n
354363
default:
355364
return nil, &ServiceError{
356365
Code: "STEP_FIELD_UNKNOWN",
357-
Message: fmt.Sprintf("unknown step field %q — allowed: over, as, on_item_failure, from, pluck, button, drawer, duration, until", field),
366+
Message: fmt.Sprintf("unknown step field %q — allowed: over, as, on_item_failure, parallelism, from, pluck, button, drawer, duration, until", field),
358367
}
359368
}
360369
d.UpdatedAt = time.Now().UTC()

0 commit comments

Comments
 (0)