Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions internal/runner/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (rs ResultSummary) String() string {
type Exec struct {
ID string
Function func() error
Condition func() (bool, error)
MaxRetries int
}

Expand Down Expand Up @@ -127,6 +128,25 @@ func (e *execEngine) executeParallel(ctx context.Context, execs []Exec, opts Opt

for i, exec := range execs {
runExec := func() error {
if exec.Condition != nil {
shouldRun, err := exec.Condition()
if err != nil {
results[i] = Result{
ID: exec.ID,
Error: fmt.Errorf("condition evaluation failed: %w", err),
}
ff := opts.FailFast == nil || *opts.FailFast
if ff {
return err
}
return nil
}
if !shouldRun {
// Skip this execution - leave result empty/zero value
return nil
}
}

rh := retry.NewRetryHandler(exec.MaxRetries, 0)
err := rh.Execute(exec.Function)
results[i] = Result{
Expand All @@ -153,27 +173,45 @@ func (e *execEngine) executeParallel(ctx context.Context, execs []Exec, opts Opt
}

func (e *execEngine) executeSerial(ctx context.Context, execs []Exec, opts Options) []Result {
results := make([]Result, len(execs))
for i, exec := range execs {
results := make([]Result, 0, len(execs))
for _, exec := range execs {
select {
case <-ctx.Done():
results[i] = Result{
results = append(results, Result{
ID: exec.ID,
Error: ctx.Err(),
}
})
return results
default:
if exec.Condition != nil {
shouldRun, err := exec.Condition()
if err != nil {
results = append(results, Result{
ID: exec.ID,
Error: fmt.Errorf("condition evaluation failed: %w", err),
})
ff := opts.FailFast == nil || *opts.FailFast
if ff {
return results
}
continue
}
if !shouldRun {
continue
}
}

rh := retry.NewRetryHandler(exec.MaxRetries, 0)
err := rh.Execute(exec.Function)
results[i] = Result{
results = append(results, Result{
ID: exec.ID,
Error: err,
Retries: rh.GetStats().Attempts - 1,
}
})

ff := opts.FailFast == nil || *opts.FailFast
if err != nil && ff {
return results[:i+1]
return results
}
}
}
Expand Down
227 changes: 227 additions & 0 deletions internal/runner/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,98 @@ var _ = Describe("e.Execute", func() {
Expect(summary.Results[4].Error).NotTo(HaveOccurred())
Expect(duration).To(BeNumerically(">=", 250*time.Millisecond))
})

It("should skip exec when condition returns false", func() {
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { return nil },
},
{
ID: "exec2",
Function: func() error { return nil },
Condition: func() (bool, error) { return false, nil },
},
{
ID: "exec3",
Function: func() error { return nil },
},
}

summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel))

// Results array has fixed size, but skipped execs have zero-value Result
Expect(summary.Results).To(HaveLen(3))
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
Expect(summary.Results[0].ID).To(Equal("exec1"))
// exec2 was skipped but still has entry (zero-value)
Expect(summary.Results[1].ID).To(Equal(""))
Expect(summary.Results[2].Error).NotTo(HaveOccurred())
Expect(summary.Results[2].ID).To(Equal("exec3"))
})

It("should execute when condition returns true", func() {
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { return nil },
Condition: func() (bool, error) { return true, nil },
},
{
ID: "exec2",
Function: func() error { return nil },
Condition: func() (bool, error) { return true, nil },
},
}

summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel))

Expect(summary.Results).To(HaveLen(2))
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
})

It("should fail when condition evaluation fails", func() {
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { return nil },
},
{
ID: "exec2",
Function: func() error { return nil },
Condition: func() (bool, error) { return false, errors.New("condition error") },
},
}

summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel))

Expect(summary.Results).To(HaveLen(2))
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
Expect(summary.Results[1].Error).To(HaveOccurred())
Expect(summary.Results[1].Error.Error()).To(ContainSubstring("condition evaluation failed"))
Expect(summary.HasErrors()).To(BeTrue())
})

It("should stop on condition error with fail fast", func() {
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { time.Sleep(100 * time.Millisecond); return nil },
Condition: func() (bool, error) { return false, errors.New("condition error") },
},
{
ID: "exec2",
Function: func() error { time.Sleep(100 * time.Millisecond); return nil },
},
}

ff := true
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel), engine.WithFailFast(&ff))

Expect(summary.Results).To(HaveLen(2))
Expect(summary.HasErrors()).To(BeTrue())
})
})

Context("Serial execution", func() {
Expand Down Expand Up @@ -122,5 +214,140 @@ var _ = Describe("e.Execute", func() {
Expect(summary.Results[0].Error).To(HaveOccurred())
Expect(summary.HasErrors()).To(BeTrue())
})

It("should skip exec when condition returns false", func() {
executed := []string{}
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { executed = append(executed, "exec1"); return nil },
},
{
ID: "exec2",
Function: func() error { executed = append(executed, "exec2"); return nil },
Condition: func() (bool, error) { return false, nil },
},
{
ID: "exec3",
Function: func() error { executed = append(executed, "exec3"); return nil },
},
}

summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial))

Expect(summary.Results).To(HaveLen(2)) // Only exec1 and exec3
Expect(executed).To(Equal([]string{"exec1", "exec3"}))
Expect(summary.HasErrors()).To(BeFalse())
})

It("should execute when condition returns true", func() {
executed := []string{}
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { executed = append(executed, "exec1"); return nil },
Condition: func() (bool, error) { return true, nil },
},
{
ID: "exec2",
Function: func() error { executed = append(executed, "exec2"); return nil },
Condition: func() (bool, error) { return true, nil },
},
}

summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial))

Expect(summary.Results).To(HaveLen(2))
Expect(executed).To(Equal([]string{"exec1", "exec2"}))
Expect(summary.HasErrors()).To(BeFalse())
})

It("should evaluate conditions between executions", func() {
sharedState := ""
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { sharedState = "updated"; return nil },
},
{
ID: "exec2",
Function: func() error { return nil },
Condition: func() (bool, error) {
// Condition can see update from exec1
return sharedState == "updated", nil
},
},
}

summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial))

Expect(summary.Results).To(HaveLen(2))
Expect(summary.HasErrors()).To(BeFalse())
})

It("should fail when condition evaluation fails", func() {
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { return nil },
},
{
ID: "exec2",
Function: func() error { return nil },
Condition: func() (bool, error) { return false, errors.New("condition error") },
},
}

summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial))

Expect(summary.Results).To(HaveLen(2))
Expect(summary.Results[0].Error).NotTo(HaveOccurred())
Expect(summary.Results[1].Error).To(HaveOccurred())
Expect(summary.Results[1].Error.Error()).To(ContainSubstring("condition evaluation failed"))
Expect(summary.HasErrors()).To(BeTrue())
})

It("should stop on condition error with fail fast", func() {
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { return nil },
Condition: func() (bool, error) { return false, errors.New("condition error") },
},
{
ID: "exec2",
Function: func() error { return nil },
},
}

ff := true
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff))

Expect(summary.Results).To(HaveLen(1))
Expect(summary.Results[0].Error).To(HaveOccurred())
Expect(summary.HasErrors()).To(BeTrue())
})

It("should continue on condition error without fail fast", func() {
execs := []engine.Exec{
{
ID: "exec1",
Function: func() error { return nil },
Condition: func() (bool, error) { return false, errors.New("condition error") },
},
{
ID: "exec2",
Function: func() error { return nil },
},
}

ff := false
summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff))

Expect(summary.Results).To(HaveLen(2))
Expect(summary.Results[0].Error).To(HaveOccurred())
Expect(summary.Results[1].Error).NotTo(HaveOccurred())
Expect(summary.HasErrors()).To(BeTrue())
})
})
})
Loading