diff --git a/internal/runner/engine/engine.go b/internal/runner/engine/engine.go index edced17b..b4b87450 100644 --- a/internal/runner/engine/engine.go +++ b/internal/runner/engine/engine.go @@ -50,6 +50,7 @@ func (rs ResultSummary) String() string { type Exec struct { ID string Function func() error + Condition func() (bool, error) MaxRetries int } @@ -127,6 +128,25 @@ func (e *execEngine) executeParallel(ctx context.Context, execs []Exec, opts Opt for i, exec := range execs { runExec := func() error { + if exec.Condition != nil { + shouldRun, err := exec.Condition() + if err != nil { + results[i] = Result{ + ID: exec.ID, + Error: fmt.Errorf("condition evaluation failed: %w", err), + } + ff := opts.FailFast == nil || *opts.FailFast + if ff { + return err + } + return nil + } + if !shouldRun { + // Skip this execution - leave result empty/zero value + return nil + } + } + rh := retry.NewRetryHandler(exec.MaxRetries, 0) err := rh.Execute(exec.Function) results[i] = Result{ @@ -153,27 +173,45 @@ func (e *execEngine) executeParallel(ctx context.Context, execs []Exec, opts Opt } func (e *execEngine) executeSerial(ctx context.Context, execs []Exec, opts Options) []Result { - results := make([]Result, len(execs)) - for i, exec := range execs { + results := make([]Result, 0, len(execs)) + for _, exec := range execs { select { case <-ctx.Done(): - results[i] = Result{ + results = append(results, Result{ ID: exec.ID, Error: ctx.Err(), - } + }) return results default: + if exec.Condition != nil { + shouldRun, err := exec.Condition() + if err != nil { + results = append(results, Result{ + ID: exec.ID, + Error: fmt.Errorf("condition evaluation failed: %w", err), + }) + ff := opts.FailFast == nil || *opts.FailFast + if ff { + return results + } + continue + } + if !shouldRun { + continue + } + } + rh := retry.NewRetryHandler(exec.MaxRetries, 0) err := rh.Execute(exec.Function) - results[i] = Result{ + results = append(results, Result{ ID: exec.ID, Error: err, Retries: rh.GetStats().Attempts - 1, - } + }) ff := opts.FailFast == nil || *opts.FailFast if err != nil && ff { - return results[:i+1] + return results } } } diff --git a/internal/runner/engine/engine_test.go b/internal/runner/engine/engine_test.go index 5a168f84..4962fe0a 100644 --- a/internal/runner/engine/engine_test.go +++ b/internal/runner/engine/engine_test.go @@ -89,6 +89,98 @@ var _ = Describe("e.Execute", func() { Expect(summary.Results[4].Error).NotTo(HaveOccurred()) Expect(duration).To(BeNumerically(">=", 250*time.Millisecond)) }) + + It("should skip exec when condition returns false", func() { + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { return nil }, + }, + { + ID: "exec2", + Function: func() error { return nil }, + Condition: func() (bool, error) { return false, nil }, + }, + { + ID: "exec3", + Function: func() error { return nil }, + }, + } + + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel)) + + // Results array has fixed size, but skipped execs have zero-value Result + Expect(summary.Results).To(HaveLen(3)) + Expect(summary.Results[0].Error).NotTo(HaveOccurred()) + Expect(summary.Results[0].ID).To(Equal("exec1")) + // exec2 was skipped but still has entry (zero-value) + Expect(summary.Results[1].ID).To(Equal("")) + Expect(summary.Results[2].Error).NotTo(HaveOccurred()) + Expect(summary.Results[2].ID).To(Equal("exec3")) + }) + + It("should execute when condition returns true", func() { + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { return nil }, + Condition: func() (bool, error) { return true, nil }, + }, + { + ID: "exec2", + Function: func() error { return nil }, + Condition: func() (bool, error) { return true, nil }, + }, + } + + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel)) + + Expect(summary.Results).To(HaveLen(2)) + Expect(summary.Results[0].Error).NotTo(HaveOccurred()) + Expect(summary.Results[1].Error).NotTo(HaveOccurred()) + }) + + It("should fail when condition evaluation fails", func() { + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { return nil }, + }, + { + ID: "exec2", + Function: func() error { return nil }, + Condition: func() (bool, error) { return false, errors.New("condition error") }, + }, + } + + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel)) + + Expect(summary.Results).To(HaveLen(2)) + Expect(summary.Results[0].Error).NotTo(HaveOccurred()) + Expect(summary.Results[1].Error).To(HaveOccurred()) + Expect(summary.Results[1].Error.Error()).To(ContainSubstring("condition evaluation failed")) + Expect(summary.HasErrors()).To(BeTrue()) + }) + + It("should stop on condition error with fail fast", func() { + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { time.Sleep(100 * time.Millisecond); return nil }, + Condition: func() (bool, error) { return false, errors.New("condition error") }, + }, + { + ID: "exec2", + Function: func() error { time.Sleep(100 * time.Millisecond); return nil }, + }, + } + + ff := true + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Parallel), engine.WithFailFast(&ff)) + + Expect(summary.Results).To(HaveLen(2)) + Expect(summary.HasErrors()).To(BeTrue()) + }) }) Context("Serial execution", func() { @@ -122,5 +214,140 @@ var _ = Describe("e.Execute", func() { Expect(summary.Results[0].Error).To(HaveOccurred()) Expect(summary.HasErrors()).To(BeTrue()) }) + + It("should skip exec when condition returns false", func() { + executed := []string{} + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { executed = append(executed, "exec1"); return nil }, + }, + { + ID: "exec2", + Function: func() error { executed = append(executed, "exec2"); return nil }, + Condition: func() (bool, error) { return false, nil }, + }, + { + ID: "exec3", + Function: func() error { executed = append(executed, "exec3"); return nil }, + }, + } + + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial)) + + Expect(summary.Results).To(HaveLen(2)) // Only exec1 and exec3 + Expect(executed).To(Equal([]string{"exec1", "exec3"})) + Expect(summary.HasErrors()).To(BeFalse()) + }) + + It("should execute when condition returns true", func() { + executed := []string{} + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { executed = append(executed, "exec1"); return nil }, + Condition: func() (bool, error) { return true, nil }, + }, + { + ID: "exec2", + Function: func() error { executed = append(executed, "exec2"); return nil }, + Condition: func() (bool, error) { return true, nil }, + }, + } + + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial)) + + Expect(summary.Results).To(HaveLen(2)) + Expect(executed).To(Equal([]string{"exec1", "exec2"})) + Expect(summary.HasErrors()).To(BeFalse()) + }) + + It("should evaluate conditions between executions", func() { + sharedState := "" + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { sharedState = "updated"; return nil }, + }, + { + ID: "exec2", + Function: func() error { return nil }, + Condition: func() (bool, error) { + // Condition can see update from exec1 + return sharedState == "updated", nil + }, + }, + } + + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial)) + + Expect(summary.Results).To(HaveLen(2)) + Expect(summary.HasErrors()).To(BeFalse()) + }) + + It("should fail when condition evaluation fails", func() { + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { return nil }, + }, + { + ID: "exec2", + Function: func() error { return nil }, + Condition: func() (bool, error) { return false, errors.New("condition error") }, + }, + } + + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial)) + + Expect(summary.Results).To(HaveLen(2)) + Expect(summary.Results[0].Error).NotTo(HaveOccurred()) + Expect(summary.Results[1].Error).To(HaveOccurred()) + Expect(summary.Results[1].Error.Error()).To(ContainSubstring("condition evaluation failed")) + Expect(summary.HasErrors()).To(BeTrue()) + }) + + It("should stop on condition error with fail fast", func() { + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { return nil }, + Condition: func() (bool, error) { return false, errors.New("condition error") }, + }, + { + ID: "exec2", + Function: func() error { return nil }, + }, + } + + ff := true + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff)) + + Expect(summary.Results).To(HaveLen(1)) + Expect(summary.Results[0].Error).To(HaveOccurred()) + Expect(summary.HasErrors()).To(BeTrue()) + }) + + It("should continue on condition error without fail fast", func() { + execs := []engine.Exec{ + { + ID: "exec1", + Function: func() error { return nil }, + Condition: func() (bool, error) { return false, errors.New("condition error") }, + }, + { + ID: "exec2", + Function: func() error { return nil }, + }, + } + + ff := false + summary := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(&ff)) + + Expect(summary.Results).To(HaveLen(2)) + Expect(summary.Results[0].Error).To(HaveOccurred()) + Expect(summary.Results[1].Error).NotTo(HaveOccurred()) + Expect(summary.HasErrors()).To(BeTrue()) + }) }) }) diff --git a/internal/runner/parallel/parallel.go b/internal/runner/parallel/parallel.go index d6f9c7d6..c72a116c 100644 --- a/internal/runner/parallel/parallel.go +++ b/internal/runner/parallel/parallel.go @@ -66,22 +66,7 @@ func (r *parallelRunner) Exec( } if len(parallelSpec.Execs) > 0 { - str, err := store.NewStore(store.Path()) - if err != nil { - return err - } - if err := str.CreateBucket(store.EnvironmentBucket()); err != nil { - return err - } - cacheData, err := str.GetAll() - if err != nil { - return err - } - if err := str.Close(); err != nil { - logger.Log().Error(err, "unable to close store") - } - - return handleExec(ctx, e, eng, parallelSpec, inputEnv, cacheData) + return handleExec(ctx, e, eng, parallelSpec, inputEnv) } return fmt.Errorf("no parallel executables to run") @@ -92,7 +77,6 @@ func handleExec( eng engine.Engine, parallelSpec *executable.ParallelExecutableType, inputEnv map[string]string, - cacheData map[string]string, ) error { groupCtx, cancel := stdCtx.WithCancel(ctx) defer cancel() @@ -127,19 +111,8 @@ func handleExec( // Build the list of steps to execute var execs []engine.Exec - conditionalData := runner.ExpressionEnv(ctx, parent, cacheData, inputEnv) for i, refConfig := range parallelSpec.Execs { - // Skip over steps that do not match the condition - if refConfig.If != "" { - if truthy, err := expression.IsTruthy(refConfig.If, conditionalData); err != nil { - return err - } else if !truthy { - logger.Log().Debugf("skipping execution %d/%d", i+1, len(parallelSpec.Execs)) - continue - } - } - // Get the executable for the step var exec *executable.Executable switch { @@ -225,7 +198,50 @@ func handleExec( return nil } - execs = append(execs, engine.Exec{ID: exec.Ref().String(), Function: runExec, MaxRetries: refConfig.Retries}) + // Create condition function if needed + var conditionFunc func() (bool, error) + if refConfig.If != "" { + ifCondition := refConfig.If + stepNum := i + 1 + totalSteps := len(parallelSpec.Execs) + conditionFunc = func() (bool, error) { + str, err := store.NewStore(store.Path()) + if err != nil { + return false, err + } + if _, err := str.CreateAndSetBucket(store.EnvironmentBucket()); err != nil { + _ = str.Close() + return false, err + } + cacheData, err := str.GetAll() + if err != nil { + _ = str.Close() + return false, err + } + if err := str.Close(); err != nil { + logger.Log().Error(err, "unable to close store") + } + + conditionalData := runner.ExpressionEnv(ctx, parent, cacheData, inputEnv) + truthy, err := expression.IsTruthy(ifCondition, conditionalData) + if err != nil { + return false, err + } + if !truthy { + logger.Log().Debugf("skipping execution %d/%d", stepNum, totalSteps) + } else { + logger.Log().Debugf("condition %s is true", ifCondition) + } + return truthy, nil + } + } + + execs = append(execs, engine.Exec{ + ID: exec.Ref().String(), + Function: runExec, + Condition: conditionFunc, + MaxRetries: refConfig.Retries, + }) } results := eng.Execute( diff --git a/internal/runner/parallel/parallel_test.go b/internal/runner/parallel/parallel_test.go index a8565509..43b33343 100644 --- a/internal/runner/parallel/parallel_test.go +++ b/internal/runner/parallel/parallel_test.go @@ -137,15 +137,23 @@ var _ = Describe("ParallelRunner", func() { parallelSpec.Execs[0].If = "false" parallelSpec.Execs[1].If = "true" mockCache := ctx.ExecutableCache + // Only the first two execs use Ref (third uses Cmd, so no cache call) for i, e := range subExecs { - if i == 1 { + if i < 2 { mockCache.EXPECT().GetExecutableByRef(e.Ref()).Return(e, nil).Times(1) } } + results := engine.ResultSummary{Results: []engine.Result{{}}} mockEngine.EXPECT(). Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(results).Times(1) + DoAndReturn(func(_ stdCtx.Context, execs []engine.Exec, _ ...engine.OptionFunc) engine.ResultSummary { + Expect(execs).To(HaveLen(len(parallelSpec.Execs))) + Expect(execs[0].Condition).ToNot(BeNil()) + Expect(execs[1].Condition).ToNot(BeNil()) + Expect(execs[2].Condition).To(BeNil()) + return results + }).Times(1) Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string), nil)).To(Succeed()) }) diff --git a/internal/runner/serial/serial.go b/internal/runner/serial/serial.go index 458bc34d..c900918e 100644 --- a/internal/runner/serial/serial.go +++ b/internal/runner/serial/serial.go @@ -64,22 +64,7 @@ func (r *serialRunner) Exec( } if len(serialSpec.Execs) > 0 { - str, err := store.NewStore(store.Path()) - if err != nil { - return err - } - if err := str.CreateBucket(store.EnvironmentBucket()); err != nil { - return err - } - cacheData, err := str.GetAll() - if err != nil { - return err - } - if err := str.Close(); err != nil { - logger.Log().Error(err, "unable to close store") - } - - return handleExec(ctx, e, eng, serialSpec, inputEnv, cacheData) + return handleExec(ctx, e, eng, serialSpec, inputEnv) } return fmt.Errorf("no serial executables to run") } @@ -90,7 +75,6 @@ func handleExec( eng engine.Engine, serialSpec *executable.SerialExecutableType, inputEnv map[string]string, - cacheData map[string]string, ) error { // Expand the directory of the serial execution. The root / parent's directory is used if one is not specified. var root *executable.Executable @@ -116,22 +100,8 @@ func handleExec( // Build the list of steps to execute var execs []engine.Exec - conditionalData := runner.ExpressionEnv(ctx, parent, cacheData, inputEnv) for i, refConfig := range serialSpec.Execs { - // Skip over steps that do not match the condition - if refConfig.If != "" { - truthy, err := expression.IsTruthy(refConfig.If, conditionalData) - if err != nil { - return err - } - if !truthy { - logger.Log().Debugf("skipping execution %d/%d", i+1, len(serialSpec.Execs)) - continue - } - logger.Log().Debugf("condition %s is true", refConfig.If) - } - // Get the executable for the step var exec *executable.Executable switch { @@ -146,7 +116,6 @@ func handleExec( default: return errors.New("serial executable must have a ref or cmd") } - logger.Log().Debugf("executing %s (%d/%d)", exec.Ref(), i+1, len(serialSpec.Execs)) // Prepare the environment and arguments for the child executable childEnv := make(map[string]string) @@ -214,7 +183,50 @@ func handleExec( return runSerialExecFunc(ctx, i, refConfig, exec, eng, childEnv, childArgs, serialSpec) } - execs = append(execs, engine.Exec{ID: exec.Ref().String(), Function: runExec, MaxRetries: refConfig.Retries}) + // Create condition function if needed + var conditionFunc func() (bool, error) + if refConfig.If != "" { + ifCondition := refConfig.If + stepNum := i + 1 + totalSteps := len(serialSpec.Execs) + conditionFunc = func() (bool, error) { + str, err := store.NewStore(store.Path()) + if err != nil { + return false, err + } + if _, err := str.CreateAndSetBucket(store.EnvironmentBucket()); err != nil { + _ = str.Close() + return false, err + } + cacheData, err := str.GetAll() + if err != nil { + _ = str.Close() + return false, err + } + if err := str.Close(); err != nil { + logger.Log().Error(err, "unable to close store") + } + + conditionalData := runner.ExpressionEnv(ctx, parent, cacheData, inputEnv) + truthy, err := expression.IsTruthy(ifCondition, conditionalData) + if err != nil { + return false, err + } + if !truthy { + logger.Log().Debugf("skipping execution %d/%d", stepNum, totalSteps) + } else { + logger.Log().Debugf("condition %s is true", ifCondition) + } + return truthy, nil + } + } + + execs = append(execs, engine.Exec{ + ID: exec.Ref().String(), + Function: runExec, + Condition: conditionFunc, + MaxRetries: refConfig.Retries, + }) } results := eng.Execute(ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(parent.Serial.FailFast)) diff --git a/internal/runner/serial/serial_test.go b/internal/runner/serial/serial_test.go index 8a2a28b9..fe6d22a2 100644 --- a/internal/runner/serial/serial_test.go +++ b/internal/runner/serial/serial_test.go @@ -3,6 +3,7 @@ package serial_test import ( stdCtx "context" "errors" + "fmt" "testing" . "github.com/onsi/ginkgo/v2" @@ -129,14 +130,24 @@ var _ = Describe("SerialRunner", func() { serialSpec.Execs[0].If = "false" serialSpec.Execs[1].If = "true" mockCache := ctx.ExecutableCache + // Only the first two execs use Ref (third uses Cmd, so no cache call) for i, e := range subExecs { - if i == 1 { + if i < 2 { mockCache.EXPECT().GetExecutableByRef(e.Ref()).Return(e, nil).Times(1) } } + results := engine.ResultSummary{Results: []engine.Result{{}}} mockEngine.EXPECT().Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(results).Times(1) + DoAndReturn(func(_ stdCtx.Context, execs []engine.Exec, _ ...engine.OptionFunc) engine.ResultSummary { + // Verify all execs from rootExec are in the list (conditions included) + Expect(execs).To(HaveLen(len(serialSpec.Execs))) + // Verify first two have conditions, third doesn't + Expect(execs[0].Condition).ToNot(BeNil()) + Expect(execs[1].Condition).ToNot(BeNil()) + Expect(execs[2].Condition).To(BeNil()) + return results + }).Times(1) Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string), nil)).To(Succeed()) }) @@ -193,5 +204,64 @@ var _ = Describe("SerialRunner", func() { Expect(serialRnr.Exec(ctx.Ctx, parentExec, mockEngine, make(map[string]string), []string{"test_value"})). To(Succeed()) }) + + It("refreshes cache between steps so later conditions see updates", func() { + ns := "examples" + parentExec := &executable.Executable{ + Serial: &executable.SerialExecutableType{ + Execs: []executable.SerialRefConfig{ + {Ref: executable.Ref(fmt.Sprintf("exec %s:first", ns))}, + {Ref: executable.Ref(fmt.Sprintf("exec %s:second", ns)), If: `store["flow-test_serial_updated"] == 'true'`}, + }, + }, + } + parentExec.SetContext( + ctx.Ctx.CurrentWorkspace.AssignedName(), ctx.Ctx.CurrentWorkspace.Location(), + ns, "/test/parent.flow", + ) + + firstChild := &executable.Executable{ + Verb: "exec", Name: "first", + Exec: &executable.ExecExecutableType{Cmd: "echo first"}, + } + firstChild.SetContext( + ctx.Ctx.CurrentWorkspace.AssignedName(), ctx.Ctx.CurrentWorkspace.Location(), + ns, "/test/first.flow") + + secondChild := &executable.Executable{ + Verb: "exec", Name: "second", + Exec: &executable.ExecExecutableType{Cmd: "echo second"}, + } + secondChild.SetContext( + ctx.Ctx.CurrentWorkspace.AssignedName(), ctx.Ctx.CurrentWorkspace.Location(), + ns, "/test/second.flow") + + mockCache := ctx.ExecutableCache + mockCache.EXPECT().GetExecutableByRef(firstChild.Ref()).Return(firstChild, nil).Times(1) + mockCache.EXPECT().GetExecutableByRef(secondChild.Ref()).Return(secondChild, nil).Times(1) + + ctx.RunnerMock.EXPECT().IsCompatible(gomock.Any()).Return(true).AnyTimes() + ctx.RunnerMock.EXPECT().Exec(gomock.Any(), firstChild, gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ *context.Context, _ *executable.Executable, _ engine.Engine, _ map[string]string, _ []string) error { + return nil + }).Times(1) + ctx.RunnerMock. + EXPECT(). + Exec(gomock.Any(), secondChild, gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil). + Times(1) + + results := engine.ResultSummary{Results: []engine.Result{{}, {}}} + mockEngine.EXPECT().Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ stdCtx.Context, execs []engine.Exec, _ ...engine.OptionFunc) engine.ResultSummary { + for _, ex := range execs { + Expect(ex.Function()).To(Succeed()) + } + return results + }).Times(1) + + Expect(serialRnr.Exec(ctx.Ctx, parentExec, mockEngine, make(map[string]string), nil)).To(Succeed()) + }) }) })