From 0cabae47e2e6cc5389732e6ecc6602002ca374e2 Mon Sep 17 00:00:00 2001 From: Jahvon Dockery Date: Mon, 4 Aug 2025 15:58:12 -0400 Subject: [PATCH 1/2] fix: child execs should inherit parent argument values --- cmd/internal/exec.go | 13 ++-- internal/context/context.go | 5 +- internal/runner/exec/exec.go | 5 +- internal/runner/launch/launch.go | 7 +- internal/runner/mocks/mock_runner.go | 8 +-- internal/runner/parallel/parallel.go | 78 ++++++++++++++++++--- internal/runner/parallel/parallel_test.go | 58 +++++++++++++++- internal/runner/render/render.go | 7 +- internal/runner/request/request.go | 3 +- internal/runner/request/request_test.go | 8 +-- internal/runner/runner.go | 14 +++- internal/runner/runner_test.go | 19 +++--- internal/runner/serial/serial.go | 83 +++++++++++++++++++---- internal/runner/serial/serial_test.go | 61 ++++++++++++++++- internal/templates/templates.go | 8 +-- internal/utils/env/args.go | 68 ++++++++++++++++--- internal/utils/env/env.go | 41 ++++++++--- internal/utils/env/env_test.go | 47 +++++++++++++ types/executable/arguments.go | 10 +++ 19 files changed, 455 insertions(+), 88 deletions(-) diff --git a/cmd/internal/exec.go b/cmd/internal/exec.go index 40923179..34450e44 100644 --- a/cmd/internal/exec.go +++ b/cmd/internal/exec.go @@ -95,11 +95,6 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar logger.Log().FatalErr(err) } - // populate args for environment handling ignoring arg 0 (the ref) - if len(args) >= 2 { - ctx.Args = args[1:] - } - var ref executable.Ref if len(args) == 0 { ref = context.ExpandRef(ctx, executable.NewRef("", verb)) @@ -166,7 +161,13 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar } startTime := time.Now() eng := engine.NewExecEngine() - if err := runner.Exec(ctx, e, eng, envMap); err != nil { + + var execArgs []string + if len(args) >= 2 { + execArgs = args[1:] + } + + if err := runner.Exec(ctx, e, eng, envMap, execArgs); err != nil { logger.Log().FatalErr(err) } dur := time.Since(startTime) diff --git a/internal/context/context.go b/internal/context/context.go index fe839cf0..37f9d974 100644 --- a/internal/context/context.go +++ b/internal/context/context.go @@ -35,8 +35,9 @@ type Context struct { WorkspacesCache cache.WorkspaceCache ExecutableCache cache.ExecutableCache - // Args includes the command line arguments passed to the exec command. It is only populated when that command is used. - Args []string + // RootExecutable is the executable that is being run in the current context. + // This will be nil if the context is not associated with an executable run. + RootExecutable *executable.Executable // ProcessTmpDir is the temporary directory for the current process. If set, it will be // used to store temporary files all executable runs when the tmpDir value is specified. diff --git a/internal/runner/exec/exec.go b/internal/runner/exec/exec.go index 6821a302..47a38ee1 100644 --- a/internal/runner/exec/exec.go +++ b/internal/runner/exec/exec.go @@ -34,10 +34,11 @@ func (r *execRunner) Exec( e *executable.Executable, _ engine.Engine, inputEnv map[string]string, + inputArgs []string, ) error { execSpec := e.Exec defaultEnv := env.DefaultEnv(ctx, e) - envMap, err := env.BuildEnvMap(ctx.Config.CurrentVaultName(), e.Env(), ctx.Args, inputEnv, defaultEnv) + envMap, err := env.BuildEnvMap(ctx.Config.CurrentVaultName(), e.Env(), inputArgs, inputEnv, defaultEnv) if err != nil { return errors.Wrap(err, "unable to set parameters to env") } @@ -48,7 +49,7 @@ func (r *execRunner) Exec( e.FlowFilePath(), e.WorkspacePath(), e.Env(), - ctx.Args, + inputArgs, envMap, ); err != nil { ctx.AddCallback(cb) diff --git a/internal/runner/launch/launch.go b/internal/runner/launch/launch.go index 282ece65..45e7f992 100644 --- a/internal/runner/launch/launch.go +++ b/internal/runner/launch/launch.go @@ -37,19 +37,20 @@ func (r *launchRunner) Exec( e *executable.Executable, _ engine.Engine, inputEnv map[string]string, + inputArgs []string, ) error { launchSpec := e.Launch envMap, err := env.BuildEnvMap( ctx.Config.CurrentVaultName(), e.Env(), - ctx.Args, + inputArgs, inputEnv, env.DefaultEnv(ctx, e), ) if err != nil { return errors.Wrap(err, "unable to set parameters to env") } - if err := env.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), ctx.Args, envMap); err != nil { + if err := env.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), inputArgs, envMap); err != nil { return errors.Wrap(err, "unable to set parameters to env") } @@ -58,7 +59,7 @@ func (r *launchRunner) Exec( e.FlowFilePath(), e.WorkspacePath(), e.Env(), - ctx.Args, + inputArgs, envMap, ); err != nil { ctx.AddCallback(cb) diff --git a/internal/runner/mocks/mock_runner.go b/internal/runner/mocks/mock_runner.go index dcc9700a..bd949ebf 100644 --- a/internal/runner/mocks/mock_runner.go +++ b/internal/runner/mocks/mock_runner.go @@ -42,17 +42,17 @@ func (m *MockRunner) EXPECT() *MockRunnerMockRecorder { } // Exec mocks base method. -func (m *MockRunner) Exec(arg0 *context.Context, arg1 *executable.Executable, arg2 engine.Engine, arg3 map[string]string) error { +func (m *MockRunner) Exec(arg0 *context.Context, arg1 *executable.Executable, arg2 engine.Engine, arg3 map[string]string, arg4 []string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Exec", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "Exec", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // Exec indicates an expected call of Exec. -func (mr *MockRunnerMockRecorder) Exec(arg0, arg1, arg2, arg3 any) *gomock.Call { +func (mr *MockRunnerMockRecorder) Exec(arg0, arg1, arg2, arg3, arg4 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exec", reflect.TypeOf((*MockRunner)(nil).Exec), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exec", reflect.TypeOf((*MockRunner)(nil).Exec), arg0, arg1, arg2, arg3, arg4) } // IsCompatible mocks base method. diff --git a/internal/runner/parallel/parallel.go b/internal/runner/parallel/parallel.go index 5ab27489..2edd58b8 100644 --- a/internal/runner/parallel/parallel.go +++ b/internal/runner/parallel/parallel.go @@ -4,6 +4,9 @@ import ( stdCtx "context" "fmt" "maps" + "os" + "path/filepath" + "strings" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -41,9 +44,10 @@ func (r *parallelRunner) Exec( e *executable.Executable, eng engine.Engine, inputEnv map[string]string, + inputArgs []string, ) error { parallelSpec := e.Parallel - if err := envUtils.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), ctx.Args, inputEnv); err != nil { + if err := envUtils.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), inputArgs, inputEnv); err != nil { return errors.Wrap(err, "unable to set parameters to env") } @@ -52,7 +56,7 @@ func (r *parallelRunner) Exec( e.FlowFilePath(), e.WorkspacePath(), e.Env(), - ctx.Args, + inputArgs, inputEnv, ); err != nil { ctx.AddCallback(cb) @@ -88,7 +92,7 @@ func handleExec( ctx *context.Context, parent *executable.Executable, eng engine.Engine, parallelSpec *executable.ParallelExecutableType, - promptedEnv map[string]string, + inputEnv map[string]string, cacheData map[string]string, ) error { groupCtx, cancel := stdCtx.WithCancel(ctx.Ctx) @@ -100,18 +104,44 @@ func handleExec( } group.SetLimit(limit) - dataMap := expr.ExpressionEnv(ctx, parent, cacheData, promptedEnv) + // Expand the directory of the parallel execution. The root / parent's directory is used if one is not specified. + var root *executable.Executable + if ctx.RootExecutable != nil { + root = ctx.RootExecutable + } else { + root = parent + } + if parallelSpec.Dir == "" { + parallelSpec.Dir = executable.Directory(filepath.Dir(root.FlowFilePath())) + } + targetDir, isTmp, err := parallelSpec.Dir.ExpandDirectory( + root.WorkspacePath(), + root.FlowFilePath(), + ctx.ProcessTmpDir, + inputEnv, + ) + if err != nil { + return errors.Wrap(err, "unable to expand directory") + } else if isTmp && ctx.ProcessTmpDir == "" { + ctx.ProcessTmpDir = targetDir + } + // Build the list of steps to execute var execs []engine.Exec + conditionalData := expr.ExpressionEnv(ctx, parent, cacheData, inputEnv) + for i, refConfig := range parallelSpec.Execs { + // Skip over steps that do not match the condition if refConfig.If != "" { - if truthy, err := expr.IsTruthy(refConfig.If, &dataMap); err != nil { + if truthy, err := expr.IsTruthy(refConfig.If, &conditionalData); err != nil { return err } else if !truthy { logger.Log().Debugf("skipping execution %d/%d", i+1, len(parallelSpec.Execs)) continue } } + + // Get the executable for the step var exec *executable.Executable switch { case len(refConfig.Ref) > 0: @@ -126,8 +156,10 @@ func handleExec( return errors.New("parallel executable must have a ref or cmd") } - execPromptedEnv := make(map[string]string) - maps.Copy(promptedEnv, execPromptedEnv) + // Prepare the environment and arguments for the child executable + childEnv := make(map[string]string) + childArgs := make([]string, 0) + maps.Copy(childEnv, inputEnv) if len(refConfig.Args) > 0 { execEnv := exec.Env() if execEnv == nil || execEnv.Args == nil { @@ -136,14 +168,31 @@ func handleExec( exec.Ref().String(), ) } else { - a, err := envUtils.BuildArgsEnvMap(execEnv.Args, refConfig.Args, execPromptedEnv) + for _, arg := range os.Environ() { + kv := strings.SplitN(arg, "=", 2) + if len(kv) == 2 { + childEnv[kv[0]] = kv[1] + } + } + + if parallelSpec.Args == nil { + childArgs = refConfig.Args + } else { + childArgs = envUtils.BuildArgsFromEnv(execEnv.Args, childEnv) + if len(childArgs) == 0 { + childArgs = refConfig.Args // If no resolved args, fallback to original args + } + } + + a, err := envUtils.BuildArgsEnvMap(execEnv.Args, childArgs, childEnv) if err != nil { logger.Log().Error(err, "unable to process arguments") } - maps.Copy(execPromptedEnv, a) + maps.Copy(childEnv, a) } } + // Set log fields and directory for the executable switch { case exec.Exec != nil: fields := map[string]interface{}{"step": exec.Ref().String()} @@ -159,10 +208,18 @@ func handleExec( if parallelSpec.Dir != "" && exec.Serial.Dir == "" { exec.Serial.Dir = parallelSpec.Dir } + case exec.Request != nil: + if exec.Request.ResponseFile != nil && parallelSpec.Dir != "" && exec.Request.ResponseFile.Dir == "" { + exec.Request.ResponseFile.Dir = parallelSpec.Dir + } + case exec.Render != nil: + if parallelSpec.Dir != "" && exec.Render.Dir == "" { + exec.Render.Dir = parallelSpec.Dir + } } runExec := func() error { - err := runner.Exec(ctx, exec, eng, execPromptedEnv) + err := runner.Exec(ctx, exec, eng, childEnv, childArgs) if err != nil { return err } @@ -171,6 +228,7 @@ func handleExec( execs = append(execs, engine.Exec{ID: exec.Ref().String(), Function: runExec, MaxRetries: refConfig.Retries}) } + results := eng.Execute( ctx.Ctx, execs, engine.WithMode(engine.Parallel), diff --git a/internal/runner/parallel/parallel_test.go b/internal/runner/parallel/parallel_test.go index c10c7fa4..8b238cab 100644 --- a/internal/runner/parallel/parallel_test.go +++ b/internal/runner/parallel/parallel_test.go @@ -9,6 +9,7 @@ import ( . "github.com/onsi/gomega" "go.uber.org/mock/gomock" + "github.com/flowexec/flow/internal/context" "github.com/flowexec/flow/internal/runner" "github.com/flowexec/flow/internal/runner/engine" "github.com/flowexec/flow/internal/runner/engine/mocks" @@ -108,7 +109,7 @@ var _ = Describe("ParallelRunner", func() { mockEngine.EXPECT(). Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(results).Times(1) - Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, promptedEnv)).To(Succeed()) + Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, promptedEnv, nil)).To(Succeed()) }) It("fail when there is an engine error", func() { @@ -128,7 +129,7 @@ var _ = Describe("ParallelRunner", func() { mockEngine.EXPECT(). Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(results).Times(1) - Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, promptedEnv)).ToNot(Succeed()) + Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, promptedEnv, nil)).ToNot(Succeed()) }) It("should skip execution when condition is false", func() { @@ -145,7 +146,58 @@ var _ = Describe("ParallelRunner", func() { mockEngine.EXPECT(). Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(results).Times(1) - Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string))).To(Succeed()) + Expect(parallelRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string), nil)).To(Succeed()) + }) + + It("should pass environment args from parent to child executables", func() { + pos1 := 1 + parentExec := &executable.Executable{ + Parallel: &executable.ParallelExecutableType{ + Args: executable.ArgumentList{{EnvKey: "TEST_VAR", Pos: &pos1}}, + Execs: []executable.ParallelRefConfig{{Ref: "test:child", Args: []string{"var=$TEST_VAR"}}}, + }, + } + parentExec.SetContext("test", "/test", "test", "/test/parent.flow") + + childExec := &executable.Executable{ + Exec: &executable.ExecExecutableType{ + Cmd: "echo $TEST_VAR", + Args: executable.ArgumentList{{EnvKey: "TEST_VAR", Flag: "var"}}, + }, + } + childExec.SetContext("test", "/test", "test", "/test/child.flow") + + mockCache := ctx.ExecutableCache + mockCache.EXPECT().GetExecutableByRef(gomock.Any()).Return(childExec, nil).Times(1) + + ctx.RunnerMock.EXPECT().IsCompatible(gomock.Any()).Return(true).Times(1) + ctx.RunnerMock.EXPECT(). + Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), []string{"var=test_value"}). + DoAndReturn(func( + _ *context.Context, + exec *executable.Executable, + _ engine.Engine, + inputEnv map[string]string, + inputArgs []string, + ) error { + Expect(inputEnv).To(HaveKeyWithValue("TEST_VAR", "test_value")) + Expect(inputArgs).To(ContainElement("var=test_value")) + return nil + }).Times(1) + + results := engine.ResultSummary{Results: []engine.Result{{}}} + mockEngine.EXPECT(). + Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + _ stdCtx.Context, execs []engine.Exec, _ ...engine.OptionFunc) engine.ResultSummary { + for _, exec := range execs { + Expect(exec.Function()).To(Succeed()) + } + return results + }) + + Expect(parallelRnr.Exec(ctx.Ctx, parentExec, mockEngine, make(map[string]string), []string{"test_value"})). + To(Succeed()) }) }) }) diff --git a/internal/runner/render/render.go b/internal/runner/render/render.go index 2d355a2e..687b60f5 100644 --- a/internal/runner/render/render.go +++ b/internal/runner/render/render.go @@ -44,13 +44,14 @@ func (r *renderRunner) Exec( e *executable.Executable, _ engine.Engine, inputEnv map[string]string, + inputArgs []string, ) error { if !ctx.Config.ShowTUI() { return fmt.Errorf("unable to render when interactive mode is disabled") } renderSpec := e.Render - if err := env.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), ctx.Args, inputEnv); err != nil { + if err := env.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), inputArgs, inputEnv); err != nil { return errors.Wrap(err, "unable to set parameters to env") } @@ -59,7 +60,7 @@ func (r *renderRunner) Exec( e.FlowFilePath(), e.WorkspacePath(), e.Env(), - ctx.Args, + inputArgs, inputEnv, ); err != nil { ctx.AddCallback(cb) @@ -69,7 +70,7 @@ func (r *renderRunner) Exec( } envMap, err := env.BuildEnvMap( - ctx.Config.CurrentVaultName(), e.Env(), ctx.Args, inputEnv, env.DefaultEnv(ctx, e), + ctx.Config.CurrentVaultName(), e.Env(), inputArgs, inputEnv, env.DefaultEnv(ctx, e), ) if err != nil { return errors.Wrap(err, "unable to set parameters to env") diff --git a/internal/runner/request/request.go b/internal/runner/request/request.go index 7e3be756..7ee69051 100644 --- a/internal/runner/request/request.go +++ b/internal/runner/request/request.go @@ -41,10 +41,11 @@ func (r *requestRunner) Exec( e *executable.Executable, _ engine.Engine, inputEnv map[string]string, + inputArgs []string, ) error { requestSpec := e.Request envMap, err := env.BuildEnvMap( - ctx.Config.CurrentVaultName(), e.Env(), ctx.Args, inputEnv, env.DefaultEnv(ctx, e), + ctx.Config.CurrentVaultName(), e.Env(), inputArgs, inputEnv, env.DefaultEnv(ctx, e), ) if err != nil { return errors.Wrap(err, "unable to set parameters to env") diff --git a/internal/runner/request/request_test.go b/internal/runner/request/request_test.go index 4a77e998..f501e4d5 100644 --- a/internal/runner/request/request_test.go +++ b/internal/runner/request/request_test.go @@ -71,7 +71,7 @@ var _ = Describe("Request Runner", func() { } ctx.Logger.EXPECT().Infox(gomock.Any(), gomock.Any(), gomock.Any()).Times(1) - err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string)) + err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string), nil) Expect(err).NotTo(HaveOccurred()) }) @@ -86,7 +86,7 @@ var _ = Describe("Request Runner", func() { } ctx.Logger.EXPECT().Infox(gomock.Any(), gomock.Any(), gomock.Regex("value")).Times(1) - err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string)) + err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string), nil) Expect(err).NotTo(HaveOccurred()) }) @@ -105,7 +105,7 @@ var _ = Describe("Request Runner", func() { exec.SetContext(ctx.Ctx.CurrentWorkspace.AssignedName(), ctx.Ctx.CurrentWorkspace.Location(), "", "") ctx.Logger.EXPECT().Infof(gomock.Any(), gomock.Any()).Times(2) - err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string)) + err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string), nil) Expect(err).NotTo(HaveOccurred()) _, err = os.Stat(filepath.Clean(filepath.Join(ctx.Ctx.CurrentWorkspace.Location(), "response.json"))) @@ -123,7 +123,7 @@ var _ = Describe("Request Runner", func() { } ctx.Logger.EXPECT().Infox(gomock.Any(), gomock.Any(), gomock.Regex("HTTPS://HTTPBIN.ORG")).Times(1) - err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string)) + err := requestRnr.Exec(ctx.Ctx, exec, mockEngine, make(map[string]string), nil) Expect(err).NotTo(HaveOccurred()) }) }) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 3d033ecb..53ddc3f3 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -12,7 +12,13 @@ import ( //go:generate mockgen -destination=mocks/mock_runner.go -package=mocks github.com/flowexec/flow/internal/runner Runner type Runner interface { Name() string - Exec(ctx *context.Context, e *executable.Executable, eng engine.Engine, inputEnv map[string]string) error + Exec( + ctx *context.Context, + e *executable.Executable, + eng engine.Engine, + inputEnv map[string]string, + inputArgs []string, + ) error IsCompatible(executable *executable.Executable) bool } @@ -31,6 +37,7 @@ func Exec( executable *executable.Executable, eng engine.Engine, inputEnv map[string]string, + inputArgs []string, ) error { var assignedRunner Runner for _, runner := range registeredRunners { @@ -42,14 +49,15 @@ func Exec( if assignedRunner == nil { return fmt.Errorf("compatible runner not found for executable %s", executable.ID()) } + ctx.RootExecutable = executable if executable.Timeout == nil { - return assignedRunner.Exec(ctx, executable, eng, inputEnv) + return assignedRunner.Exec(ctx, executable, eng, inputEnv, inputArgs) } done := make(chan error, 1) go func() { - done <- assignedRunner.Exec(ctx, executable, eng, inputEnv) + done <- assignedRunner.Exec(ctx, executable, eng, inputEnv, inputArgs) }() select { diff --git a/internal/runner/runner_test.go b/internal/runner/runner_test.go index 931a77eb..3cf8fd86 100644 --- a/internal/runner/runner_test.go +++ b/internal/runner/runner_test.go @@ -48,10 +48,11 @@ var _ = Describe("Runner", func() { Name: "test-executable", } inputEnv := make(map[string]string) + inputArgs := make([]string, 0) mockRunner.EXPECT().IsCompatible(executable).Return(true) - mockRunner.EXPECT().Exec(ctx, executable, mockEngine, inputEnv).Return(nil) - Expect(runner.Exec(ctx, executable, mockEngine, inputEnv)).To(Succeed()) + mockRunner.EXPECT().Exec(ctx, executable, mockEngine, inputEnv, inputArgs).Return(nil) + Expect(runner.Exec(ctx, executable, mockEngine, inputEnv, inputArgs)).To(Succeed()) }) It("should return error when no compatible runner is found", func() { @@ -59,11 +60,12 @@ var _ = Describe("Runner", func() { exec := &executable.Executable{ Name: "test-exec", } - promptedEnv := make(map[string]string) + inputEnv := make(map[string]string) + inputArgs := make([]string, 0) mockRunner.EXPECT().IsCompatible(exec).Return(false) - err := runner.Exec(ctx, exec, mockEngine, promptedEnv) + err := runner.Exec(ctx, exec, mockEngine, inputEnv, inputArgs) Expect(err.Error()).To(ContainSubstring("compatible runner not found")) }) @@ -74,18 +76,19 @@ var _ = Describe("Runner", func() { Name: "test-exec", Timeout: &timeout, } - promptedEnv := make(map[string]string) + inputEnv := make(map[string]string) + inputArgs := make([]string, 0) mockRunner.EXPECT().IsCompatible(exec).Return(true) - mockRunner.EXPECT().Exec(ctx, exec, mockEngine, promptedEnv).DoAndReturn( + mockRunner.EXPECT().Exec(ctx, exec, mockEngine, inputEnv, inputArgs).DoAndReturn( func( - _ *context.Context, _ *executable.Executable, _ engine.Engine, _ map[string]string, + _ *context.Context, _ *executable.Executable, _ engine.Engine, _ map[string]string, _ []string, ) error { time.Sleep(2 * time.Second) return nil }) - err := runner.Exec(ctx, exec, mockEngine, promptedEnv) + err := runner.Exec(ctx, exec, mockEngine, inputEnv, inputArgs) Expect(err.Error()).To(ContainSubstring("timeout")) }) }) diff --git a/internal/runner/serial/serial.go b/internal/runner/serial/serial.go index 1f8fea01..4f781509 100644 --- a/internal/runner/serial/serial.go +++ b/internal/runner/serial/serial.go @@ -4,6 +4,7 @@ import ( "fmt" "maps" "os" + "path/filepath" "strings" "github.com/pkg/errors" @@ -41,9 +42,10 @@ func (r *serialRunner) Exec( e *executable.Executable, eng engine.Engine, inputEnv map[string]string, + inputArgs []string, ) error { serialSpec := e.Serial - if err := envUtils.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), ctx.Args, inputEnv); err != nil { + if err := envUtils.SetEnv(ctx.Config.CurrentVaultName(), e.Env(), inputArgs, inputEnv); err != nil { return errors.Wrap(err, "unable to set parameters to env") } @@ -52,7 +54,7 @@ func (r *serialRunner) Exec( e.FlowFilePath(), e.WorkspacePath(), e.Env(), - ctx.Args, + inputArgs, inputEnv, ); err != nil { ctx.AddCallback(cb) @@ -82,21 +84,45 @@ func (r *serialRunner) Exec( return fmt.Errorf("no serial executables to run") } -//nolint:gocognit +//nolint:funlen,gocognit func handleExec( ctx *context.Context, parent *executable.Executable, eng engine.Engine, serialSpec *executable.SerialExecutableType, - promptedEnv map[string]string, + inputEnv map[string]string, cacheData map[string]string, ) error { - dataMap := expr.ExpressionEnv(ctx, parent, cacheData, promptedEnv) + // Expand the directory of the serial execution. The root / parent's directory is used if one is not specified. + var root *executable.Executable + if ctx.RootExecutable != nil { + root = ctx.RootExecutable + } else { + root = parent + } + if serialSpec.Dir == "" { + serialSpec.Dir = executable.Directory(filepath.Dir(root.FlowFilePath())) + } + targetDir, isTmp, err := serialSpec.Dir.ExpandDirectory( + root.WorkspacePath(), + root.FlowFilePath(), + ctx.ProcessTmpDir, + inputEnv, + ) + if err != nil { + return errors.Wrap(err, "unable to expand directory") + } else if isTmp && ctx.ProcessTmpDir == "" { + ctx.ProcessTmpDir = targetDir + } + // Build the list of steps to execute var execs []engine.Exec + conditionalData := expr.ExpressionEnv(ctx, parent, cacheData, inputEnv) + for i, refConfig := range serialSpec.Execs { + // Skip over steps that do not match the condition if refConfig.If != "" { - truthy, err := expr.IsTruthy(refConfig.If, &dataMap) + truthy, err := expr.IsTruthy(refConfig.If, &conditionalData) if err != nil { return err } @@ -106,6 +132,8 @@ func handleExec( } logger.Log().Debugf("condition %s is true", refConfig.If) } + + // Get the executable for the step var exec *executable.Executable switch { case refConfig.Ref != "": @@ -121,8 +149,10 @@ func handleExec( } logger.Log().Debugf("executing %s (%d/%d)", exec.Ref(), i+1, len(serialSpec.Execs)) - execPromptedEnv := make(map[string]string) - maps.Copy(promptedEnv, execPromptedEnv) + // Prepare the environment and arguments for the child executable + childEnv := make(map[string]string) + childArgs := make([]string, 0) + maps.Copy(childEnv, inputEnv) if len(refConfig.Args) > 0 { execEnv := exec.Env() if execEnv == nil || execEnv.Args == nil { @@ -131,14 +161,31 @@ func handleExec( exec.Ref().String(), ) } else { - a, err := envUtils.BuildArgsEnvMap(execEnv.Args, refConfig.Args, execPromptedEnv) + for _, arg := range os.Environ() { + kv := strings.SplitN(arg, "=", 2) + if len(kv) == 2 { + childEnv[kv[0]] = kv[1] + } + } + + if serialSpec.Args == nil { + childArgs = refConfig.Args + } else { + childArgs = envUtils.BuildArgsFromEnv(execEnv.Args, childEnv) + if len(childArgs) == 0 { + childArgs = refConfig.Args // If no resolved args, fallback to original args + } + } + + a, err := envUtils.BuildArgsEnvMap(execEnv.Args, childArgs, childEnv) if err != nil { logger.Log().Error(err, "unable to process arguments") } - maps.Copy(execPromptedEnv, a) + maps.Copy(childEnv, a) } } + // Set log fields and directory for the executable switch { case exec.Exec != nil: fields := map[string]interface{}{"step": exec.Ref().String()} @@ -154,14 +201,23 @@ func handleExec( if serialSpec.Dir != "" && exec.Serial.Dir == "" { exec.Serial.Dir = serialSpec.Dir } + case exec.Request != nil: + if exec.Request.ResponseFile != nil && serialSpec.Dir != "" && exec.Request.ResponseFile.Dir == "" { + exec.Request.ResponseFile.Dir = serialSpec.Dir + } + case exec.Render != nil: + if serialSpec.Dir != "" && exec.Render.Dir == "" { + exec.Render.Dir = serialSpec.Dir + } } runExec := func() error { - return runSerialExecFunc(ctx, i, refConfig, exec, eng, execPromptedEnv, serialSpec) + return runSerialExecFunc(ctx, i, refConfig, exec, eng, childEnv, childArgs, serialSpec) } execs = append(execs, engine.Exec{ID: exec.Ref().String(), Function: runExec, MaxRetries: refConfig.Retries}) } + results := eng.Execute(ctx.Ctx, execs, engine.WithMode(engine.Serial), engine.WithFailFast(parent.Serial.FailFast)) if results.HasErrors() { return errors.New(results.String()) @@ -175,10 +231,11 @@ func runSerialExecFunc( refConfig executable.SerialRefConfig, exec *executable.Executable, eng engine.Engine, - execPromptedEnv map[string]string, + childEnv map[string]string, + childArgs []string, serialSpec *executable.SerialExecutableType, ) error { - err := runner.Exec(ctx, exec, eng, execPromptedEnv) + err := runner.Exec(ctx, exec, eng, childEnv, childArgs) if err != nil { return err } diff --git a/internal/runner/serial/serial_test.go b/internal/runner/serial/serial_test.go index f1f5f8b4..4eb58fdd 100644 --- a/internal/runner/serial/serial_test.go +++ b/internal/runner/serial/serial_test.go @@ -9,6 +9,7 @@ import ( . "github.com/onsi/gomega" "go.uber.org/mock/gomock" + "github.com/flowexec/flow/internal/context" "github.com/flowexec/flow/internal/runner" "github.com/flowexec/flow/internal/runner/engine" "github.com/flowexec/flow/internal/runner/engine/mocks" @@ -105,7 +106,7 @@ var _ = Describe("SerialRunner", func() { } results := engine.ResultSummary{Results: []engine.Result{{}}} mockEngine.EXPECT().Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(results).Times(1) - Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, promptedEnv)).To(Succeed()) + Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, promptedEnv, nil)).To(Succeed()) }) It("should fail when there is an engine failure", func() { @@ -120,7 +121,7 @@ var _ = Describe("SerialRunner", func() { } results := engine.ResultSummary{Results: []engine.Result{{Error: errors.New("error")}}} mockEngine.EXPECT().Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(results).Times(1) - Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string))).ToNot(Succeed()) + Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string), nil)).ToNot(Succeed()) }) It("should skip execution when condition is false", func() { @@ -136,7 +137,61 @@ var _ = Describe("SerialRunner", func() { results := engine.ResultSummary{Results: []engine.Result{{}}} mockEngine.EXPECT().Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(results).Times(1) - Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string))).To(Succeed()) + Expect(serialRnr.Exec(ctx.Ctx, rootExec, mockEngine, make(map[string]string), nil)).To(Succeed()) + }) + + It("should pass environment args from parent to child executables", func() { + pos1 := 1 + parentExec := &executable.Executable{ + Serial: &executable.SerialExecutableType{ + Args: executable.ArgumentList{{EnvKey: "TEST_VAR", Pos: &pos1}}, + Execs: []executable.SerialRefConfig{{ + Ref: "test:child", + Args: []string{"var=$TEST_VAR"}, + }, + }, + }, + } + parentExec.SetContext("test", "/test", "test", "/test/parent.flow") + + childExec := &executable.Executable{ + Exec: &executable.ExecExecutableType{ + Cmd: "echo $TEST_VAR", + Args: executable.ArgumentList{{EnvKey: "TEST_VAR", Flag: "var"}}, + }, + } + childExec.SetContext("test", "/test", "test", "/test/child.flow") + mockCache := ctx.ExecutableCache + mockCache.EXPECT().GetExecutableByRef(gomock.Any()).Return(childExec, nil).Times(1) + + ctx.RunnerMock.EXPECT().IsCompatible(gomock.Any()).Return(true).Times(1) + ctx.RunnerMock.EXPECT(). + Exec(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), []string{"var=test_value"}). + DoAndReturn(func( + _ *context.Context, + exec *executable.Executable, + _ engine.Engine, + inputEnv map[string]string, + inputArgs []string, + ) error { + Expect(inputEnv).To(HaveKeyWithValue("TEST_VAR", "test_value")) + Expect(inputArgs).To(ContainElement("var=test_value")) + return nil + }).Times(1) + + results := engine.ResultSummary{Results: []engine.Result{{}}} + mockEngine.EXPECT(). + Execute(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + _ stdCtx.Context, execs []engine.Exec, _ ...engine.OptionFunc) engine.ResultSummary { + for _, exec := range execs { + Expect(exec.Function()).To(Succeed()) + } + return results + }) + + Expect(serialRnr.Exec(ctx.Ctx, parentExec, mockEngine, make(map[string]string), []string{"test_value"})). + To(Succeed()) }) }) }) diff --git a/internal/templates/templates.go b/internal/templates/templates.go index 0eaf5ecf..a191da84 100644 --- a/internal/templates/templates.go +++ b/internal/templates/templates.go @@ -145,17 +145,17 @@ func runExecutables( return errors.New("post-run executable must have a ref or cmd") } inputEnv := make(map[string]string) + inputArgs := make([]string, 0) ee := expressionEnv(templateData) maps.Copy(inputEnv, ee) //nolint:nestif if len(e.Args) > 0 { - args := make([]string, 0) for _, arg := range e.Args { a, err := processAsGoTemplate(flowfileDir, arg, templateData) if err != nil { return errors.Wrap(err, fmt.Sprintf("unable to process %s executable %d", stage, i)) } - args = append(args, a.String()) + inputArgs = append(inputArgs, a.String()) } execEnv := exec.Env() if execEnv == nil || execEnv.Args == nil { @@ -164,7 +164,7 @@ func runExecutables( exec.Ref().String(), ) } else { - a, err := argUtils.BuildArgsEnvMap(execEnv.Args, args, ee) + a, err := argUtils.BuildArgsEnvMap(execEnv.Args, inputArgs, ee) if err != nil { logger.Log().Error(err, "unable to process arguments") } @@ -177,7 +177,7 @@ func runExecutables( "step": i + 1, }) } - if err := runner.Exec(ctx, exec, engine.NewExecEngine(), inputEnv); err != nil { + if err := runner.Exec(ctx, exec, engine.NewExecEngine(), inputEnv, inputArgs); err != nil { return errors.Wrap(err, fmt.Sprintf("unable to execute %s executable %d", stage, i)) } } diff --git a/internal/utils/env/args.go b/internal/utils/env/args.go index 418217a5..2fc595b2 100644 --- a/internal/utils/env/args.go +++ b/internal/utils/env/args.go @@ -2,6 +2,8 @@ package env import ( "os" + "slices" + "sort" "strings" "github.com/flowexec/flow/types/executable" @@ -19,16 +21,16 @@ func BuildArgsEnvMap( return argsToEnvMap(al), nil } -func parseArgs(args []string) (flagArgs map[string]string, posArgs []string) { +func parseArgs(args executable.ArgumentList, execArgs []string) (flagArgs map[string]string, posArgs []string) { flagArgs = make(map[string]string) posArgs = make([]string, 0) - for i := 0; i < len(args); i++ { - split := strings.Split(args[i], "=") - if len(split) >= 2 { - flagArgs[split[0]] = strings.Join(split[1:], "=") + for i := 0; i < len(execArgs); i++ { + split := strings.SplitN(execArgs[i], "=", 2) + if len(split) == 2 && slices.Contains(args.Flags(), split[0]) { + flagArgs[split[0]] = split[1] continue } - posArgs = append(posArgs, args[i]) + posArgs = append(posArgs, execArgs[i]) } return } @@ -41,7 +43,6 @@ func resolveArgValues( if len(args) == 0 { return nil, nil } - if env != nil { // Expand environment variables in arguments for i, a := range execArgs { @@ -50,7 +51,7 @@ func resolveArgValues( }) } } - flagArgs, posArgs := parseArgs(execArgs) + flagArgs, posArgs := parseArgs(args, execArgs) if err := setArgValues(args, flagArgs, posArgs, env); err != nil { return nil, err } @@ -109,3 +110,54 @@ func filterArgsWithOutputFile(args executable.ArgumentList) executable.ArgumentL return outputArgs } + +// BuildArgsFromEnv builds a list of arguments from the provided environment and expected args list. It will +// return the positional arguments in the order they are expected and then append any flag arguments at the end. +// +// TODO: Add support for overriding flag values. +func BuildArgsFromEnv( + argsList executable.ArgumentList, + inputEnv map[string]string, +) []string { + if len(argsList) == 0 { + return nil + } + + type argWithPos struct { + value string + pos int + } + var argsWithPositions []argWithPos + flagArgs := make(map[string]string) + + for _, childArg := range argsList { + if childArg.EnvKey == "" { + continue + } + + if value, found := inputEnv[childArg.EnvKey]; found { + pos := 0 + if childArg.Pos != nil { + pos = *childArg.Pos + argsWithPositions = append(argsWithPositions, argWithPos{value: value, pos: pos}) + } + if childArg.Flag != "" { + flagArgs[childArg.Flag] = value + } + } + } + + sort.Slice(argsWithPositions, func(i, j int) bool { + return argsWithPositions[i].pos < argsWithPositions[j].pos + }) + + result := make([]string, len(argsWithPositions)) + for i, arg := range argsWithPositions { + result[i] = arg.value + } + for flag, value := range flagArgs { + result = append(result, flag+"="+value) + } + + return result +} diff --git a/internal/utils/env/env.go b/internal/utils/env/env.go index 48241c3f..bdc41ac9 100644 --- a/internal/utils/env/env.go +++ b/internal/utils/env/env.go @@ -2,6 +2,7 @@ package env import ( "fmt" + "maps" "os" "path/filepath" "strings" @@ -16,16 +17,20 @@ import ( func SetEnv( currentVault string, exec *executable.ExecutableEnvironment, - args []string, - promptedEnv map[string]string, + inputArgs []string, + inputEnv map[string]string, ) error { var errs []error + + envMap := make(map[string]string) + maps.Copy(envMap, inputEnv) + for _, param := range exec.Params { if param.OutputFile != "" { // CreateTempEnvFiles will handle outputFile parameters continue } - val, err := ResolveParameterValue(currentVault, param, promptedEnv) + val, err := ResolveParameterValue(currentVault, param, envMap) if err != nil { errs = append(errs, err) } @@ -33,13 +38,20 @@ func SetEnv( if err := os.Setenv(param.EnvKey, val); err != nil { errs = append(errs, fmt.Errorf("failed to set env %s: %w", param.EnvKey, err)) } + envMap[param.EnvKey] = val } - argEnvMap, err := BuildArgsEnvMap(exec.Args, args, promptedEnv) + argEnvMap, err := BuildArgsEnvMap(exec.Args, inputArgs, envMap) if err != nil { - errs = append(errs, fmt.Errorf("failed to build args env map: %w", err)) + errs = append(errs, fmt.Errorf("failed to build inputArgs env map: %w", err)) } for key, val := range argEnvMap { + val = os.Expand(val, func(key string) string { + if v, ok := envMap[key]; ok { + return v + } + return "" + }) if err := os.Setenv(key, val); err != nil { errs = append(errs, fmt.Errorf("failed to set env %s: %w", key, err)) } @@ -112,13 +124,15 @@ func CreateTempEnvFiles( func BuildEnvMap( currentVault string, exec *executable.ExecutableEnvironment, - args []string, + inputArgs []string, inputEnv map[string]string, defaultEnv map[string]string, ) (map[string]string, error) { - envMap := make(map[string]string) var errs []error + envMap := make(map[string]string) + maps.Copy(envMap, inputEnv) + for k, v := range defaultEnv { if _, ok := envMap[k]; !ok { envMap[k] = v @@ -130,7 +144,7 @@ func BuildEnvMap( continue } - val, err := ResolveParameterValue(currentVault, param, inputEnv) + val, err := ResolveParameterValue(currentVault, param, envMap) if err != nil { errs = append(errs, err) continue @@ -138,12 +152,17 @@ func BuildEnvMap( envMap[param.EnvKey] = val } - argEnvMap, err := BuildArgsEnvMap(exec.Args, args, envMap) + argEnvMap, err := BuildArgsEnvMap(exec.Args, inputArgs, envMap) if err != nil { - return nil, fmt.Errorf("failed to build args env map: %w", err) + return nil, fmt.Errorf("failed to build inputArgs env map: %w", err) } for key, val := range argEnvMap { - envMap[key] = val + envMap[key] = os.Expand(val, func(key string) string { + if v, ok := envMap[key]; ok { + return v + } + return "" + }) } if len(errs) > 0 { diff --git a/internal/utils/env/env_test.go b/internal/utils/env/env_test.go index eccf6adb..abf3e16b 100644 --- a/internal/utils/env/env_test.go +++ b/internal/utils/env/env_test.go @@ -304,4 +304,51 @@ var _ = Describe("Env", func() { // TODO: Add more assertions for other keys in the environment map }) }) + + Describe("BuildArgsFromEnv", func() { + It("should include all expected args when they're in the environment", func() { + parentEnv := map[string]string{ + "NAMESPACE": "my-namespace", + "REPO_NAME": "bitnami", + "REPO_URL": "https://charts.bitnami.com/bitnami", + } + + pos1 := 1 + pos2 := 2 + childArgs := executable.ArgumentList{ + {EnvKey: "REPO_NAME", Pos: &pos1}, + {EnvKey: "REPO_URL", Pos: &pos2}, + {EnvKey: "NAMESPACE", Flag: "namespace"}, + } + + filteredArgs := env.BuildArgsFromEnv(childArgs, parentEnv) + Expect(filteredArgs). + To(Equal([]string{"bitnami", "https://charts.bitnami.com/bitnami", "namespace=my-namespace"})) + }) + + It("should handle missing parent env values gracefully", func() { + parentEnv := map[string]string{ + "REPO_NAME": "bitnami", + // REPO_URL is missing + } + + pos1 := 1 + pos2 := 2 + childArgs := executable.ArgumentList{ + {EnvKey: "REPO_NAME", Pos: &pos1}, + {EnvKey: "REPO_URL", Pos: &pos2}, // This won't be found + } + + filteredArgs := env.BuildArgsFromEnv(childArgs, parentEnv) + Expect(filteredArgs).To(Equal([]string{"bitnami"})) + }) + + It("should handle empty child args", func() { + parentEnv := map[string]string{"NAMESPACE": "my-namespace"} + + var childArgs executable.ArgumentList + filteredArgs := env.BuildArgsFromEnv(childArgs, parentEnv) + Expect(filteredArgs).To(BeNil()) + }) + }) }) diff --git a/types/executable/arguments.go b/types/executable/arguments.go index 61b6223d..b3038ccd 100644 --- a/types/executable/arguments.go +++ b/types/executable/arguments.go @@ -70,6 +70,16 @@ func validateArgType(t ArgumentType) error { } } +func (al *ArgumentList) Flags() []string { + var flags []string + for _, arg := range *al { + if arg.Flag != "" { + flags = append(flags, arg.Flag) + } + } + return flags +} + func (al *ArgumentList) Validate() error { var errs []error for _, arg := range *al { From 76113462f8cbe60801f0c03583269d2af9e49f20 Mon Sep 17 00:00:00 2001 From: Jahvon Dockery Date: Mon, 4 Aug 2025 16:07:27 -0400 Subject: [PATCH 2/2] linting --- .golangci.yaml | 8 ++++++++ internal/runner/parallel/parallel.go | 1 - internal/runner/serial/serial.go | 1 - internal/utils/env/args.go | 9 +++++---- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/.golangci.yaml b/.golangci.yaml index f9747cdc..b9ac935e 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -95,6 +95,14 @@ linters: text: (SA5011|SA5001) - path: (.+)\.go$ text: declaration of "(err|ctx)" shadows declaration at + # The serial and parallel runners have inherently complex logic. + - path: internal/runner/(parallel|serial) + linters: + - cyclop + - nestif + - funlen + - gocognit + - gocyclo paths: - third_party$ - builtin$ diff --git a/internal/runner/parallel/parallel.go b/internal/runner/parallel/parallel.go index 2edd58b8..cfd2befc 100644 --- a/internal/runner/parallel/parallel.go +++ b/internal/runner/parallel/parallel.go @@ -87,7 +87,6 @@ func (r *parallelRunner) Exec( return fmt.Errorf("no parallel executables to run") } -//nolint:funlen,gocognit func handleExec( ctx *context.Context, parent *executable.Executable, eng engine.Engine, diff --git a/internal/runner/serial/serial.go b/internal/runner/serial/serial.go index 4f781509..670878bf 100644 --- a/internal/runner/serial/serial.go +++ b/internal/runner/serial/serial.go @@ -84,7 +84,6 @@ func (r *serialRunner) Exec( return fmt.Errorf("no serial executables to run") } -//nolint:funlen,gocognit func handleExec( ctx *context.Context, parent *executable.Executable, diff --git a/internal/utils/env/args.go b/internal/utils/env/args.go index 2fc595b2..2dfb8a36 100644 --- a/internal/utils/env/args.go +++ b/internal/utils/env/args.go @@ -136,9 +136,8 @@ func BuildArgsFromEnv( } if value, found := inputEnv[childArg.EnvKey]; found { - pos := 0 if childArg.Pos != nil { - pos = *childArg.Pos + pos := *childArg.Pos argsWithPositions = append(argsWithPositions, argWithPos{value: value, pos: pos}) } if childArg.Flag != "" { @@ -151,12 +150,14 @@ func BuildArgsFromEnv( return argsWithPositions[i].pos < argsWithPositions[j].pos }) - result := make([]string, len(argsWithPositions)) + result := make([]string, len(argsWithPositions)+len(flagArgs)) for i, arg := range argsWithPositions { result[i] = arg.value } + pos := len(argsWithPositions) for flag, value := range flagArgs { - result = append(result, flag+"="+value) + result[pos] = flag + "=" + value + pos++ } return result