Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 9 additions & 66 deletions cmd/internal/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/flowexec/flow/cmd/internal/flags"
cacheIO "github.com/flowexec/flow/internal/io/cache"
"github.com/flowexec/flow/internal/services/store"
"github.com/flowexec/flow/pkg/context"
"github.com/flowexec/flow/pkg/logger"
"github.com/flowexec/flow/pkg/store"
)

func RegisterCacheCmd(ctx *context.Context, rootCmd *cobra.Command) {
Expand Down Expand Up @@ -76,24 +76,12 @@ func cacheSetFunc(ctx *context.Context, cmd *cobra.Command, args []string) {
value = strings.Join(args[1:], " ")
}

s, err := store.NewStore(store.Path())
if err != nil {
logger.Log().FatalErr(err)
}
bucketName := store.EnvironmentBucket()
global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false)
if global {
bucketName = store.RootBucket
}
if _, err = s.CreateAndSetBucket(bucketName); err != nil {
logger.Log().FatalErr(err)
}
defer func() {
if err = s.Close(); err != nil {
logger.Log().WrapError(err, "cleanup failure")
}
}()
if err = s.Set(key, value); err != nil {
if err := ctx.DataStore.SetProcessVar(bucketName, key, value); err != nil {
logger.Log().FatalErr(err)
}
logger.Log().PlainTextInfo(fmt.Sprintf("Key %q set in the cache", key))
Expand All @@ -114,27 +102,15 @@ func registerCacheGetCmd(ctx *context.Context, rootCmd *cobra.Command) {
rootCmd.AddCommand(subCmd)
}

func cacheGetFunc(_ *context.Context, cmd *cobra.Command, args []string) {
func cacheGetFunc(ctx *context.Context, cmd *cobra.Command, args []string) {
key := args[0]

s, err := store.NewStore(store.Path())
if err != nil {
logger.Log().FatalErr(err)
}
bucketName := store.EnvironmentBucket()
global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false)
if global {
bucketName = store.RootBucket
}
if _, err = s.CreateAndSetBucket(bucketName); err != nil {
logger.Log().FatalErr(err)
}
defer func() {
if err := s.Close(); err != nil {
logger.Log().WrapError(err, "cleanup failure")
}
}()
value, err := s.Get(key)
value, err := ctx.DataStore.GetProcessVar(bucketName, key)
if err != nil {
logger.Log().FatalErr(err)
}
Expand All @@ -159,19 +135,7 @@ func registerCacheListCmd(ctx *context.Context, rootCmd *cobra.Command) {
}

func cacheListFunc(ctx *context.Context, cmd *cobra.Command, _ []string) {
s, err := store.NewStore(store.Path())
if err != nil {
logger.Log().FatalErr(err)
}
if _, err = s.CreateAndSetBucket(store.EnvironmentBucket()); err != nil {
logger.Log().FatalErr(err)
}
defer func() {
if err := s.Close(); err != nil {
logger.Log().WrapError(err, "cleanup failure")
}
}()
data, err := s.GetAll()
data, err := ctx.DataStore.GetAllProcessVars(store.EnvironmentBucket())
if err != nil {
logger.Log().FatalErr(err)
}
Expand Down Expand Up @@ -199,27 +163,15 @@ func registerCacheRemoveCmd(ctx *context.Context, rootCmd *cobra.Command) {
rootCmd.AddCommand(subCmd)
}

func cacheRemoveFunc(_ *context.Context, cmd *cobra.Command, args []string) {
func cacheRemoveFunc(ctx *context.Context, cmd *cobra.Command, args []string) {
key := args[0]

s, err := store.NewStore(store.Path())
if err != nil {
logger.Log().FatalErr(err)
}
bucketName := store.EnvironmentBucket()
global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false)
if global {
bucketName = store.RootBucket
}
if _, err = s.CreateAndSetBucket(bucketName); err != nil {
logger.Log().FatalErr(err)
}
defer func() {
if err := s.Close(); err != nil {
logger.Log().WrapError(err, "cleanup failure")
}
}()
if err = s.Delete(key); err != nil {
if err := ctx.DataStore.DeleteProcessVar(bucketName, key); err != nil {
logger.Log().FatalErr(err)
}
logger.Log().PlainTextSuccess(fmt.Sprintf("Key %q removed from the cache", key))
Expand All @@ -240,7 +192,7 @@ func registerCacheClearCmd(ctx *context.Context, rootCmd *cobra.Command) {
rootCmd.AddCommand(subCmd)
}

func cacheClearFunc(_ *context.Context, cmd *cobra.Command, _ []string) {
func cacheClearFunc(ctx *context.Context, cmd *cobra.Command, _ []string) {
full := flags.ValueFor[bool](cmd, *flags.StoreAllFlag, false)
if full {
if err := store.DestroyStore(); err != nil {
Expand All @@ -249,16 +201,7 @@ func cacheClearFunc(_ *context.Context, cmd *cobra.Command, _ []string) {
logger.Log().PlainTextSuccess("Cache cleared")
return
}
s, err := store.NewStore(store.Path())
if err != nil {
logger.Log().FatalErr(err)
}
defer func() {
if err := s.Close(); err != nil {
logger.Log().WrapError(err, "cleanup failure")
}
}()
if err := s.DeleteBucket(store.EnvironmentBucket()); err != nil {
if err := ctx.DataStore.DeleteProcessBucket(store.EnvironmentBucket()); err != nil {
logger.Log().FatalErr(err)
}
logger.Log().PlainTextSuccess("Cache cleared")
Expand Down
111 changes: 76 additions & 35 deletions cmd/internal/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
"github.com/flowexec/flow/internal/runner/render"
"github.com/flowexec/flow/internal/runner/request"
"github.com/flowexec/flow/internal/runner/serial"
"github.com/flowexec/flow/internal/services/store"
"github.com/flowexec/flow/internal/utils/env"
"github.com/flowexec/flow/pkg/context"
flowErrors "github.com/flowexec/flow/pkg/errors"
"github.com/flowexec/flow/pkg/filesystem"
"github.com/flowexec/flow/pkg/logger"
"github.com/flowexec/flow/pkg/store"
"github.com/flowexec/flow/types/executable"
"github.com/flowexec/flow/types/workspace"
)
Expand Down Expand Up @@ -87,9 +88,6 @@ func execPreRun(_ *context.Context, _ *cobra.Command, _ []string) {
runner.RegisterRunner(parallel.NewRunner())
}

// TODO: refactor this function to simplify the logic
//
//nolint:funlen,gocognit
func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, args []string) {
logMode := flags.ValueFor[string](cmd, *flags.LogModeFlag, false)
if logMode != "" {
Expand Down Expand Up @@ -132,17 +130,37 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
))
}

s, err := store.NewStore(store.Path())
if err != nil {
logger.Log().FatalErr(err)
if ctx.DataStore != nil {
if err := ctx.DataStore.CreateProcessBucket(ref.String()); err != nil {
logger.Log().FatalErr(err)
}
_ = os.Setenv(store.BucketEnv, ref.String())
}
if _, err = s.CreateAndSetBucket(ref.String()); err != nil {
logger.Log().FatalErr(err)

envMap := buildExecEnv(ctx, cmd, e)

var execArgs []string
if len(args) >= 2 {
execArgs = args[1:]
}

startTime := time.Now()
eng := engine.NewExecEngine()
runErr := runner.Exec(ctx, e, eng, envMap, execArgs)
dur := time.Since(startTime)

cleanupProcessStore(ctx)
recordExecution(ctx, ref, startTime, dur, runErr)

if runErr != nil {
logger.Log().FatalErr(runErr)
}
_ = s.Close()
logger.Log().Debug(fmt.Sprintf("%s flow completed", ref), "Elapsed", dur.Round(time.Millisecond))
sendCompletionNotifications(ctx, cmd, dur)
}

func buildExecEnv(ctx *context.Context, cmd *cobra.Command, e *executable.Executable) map[string]string {
envMap := make(map[string]string)
// add workspace env variables to the env map
if wsData, err := ctx.WorkspacesCache.GetWorkspaceConfigList(); err != nil {
logger.Log().Errorf("failed to get workspace cache data, skipping env file resolution: %v", err)
} else {
Expand All @@ -153,11 +171,9 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
}
}

// add --param overrides to the env map
paramOverrides := flags.ValueFor[[]string](cmd, *flags.ParameterValueFlag, false)
applyParameterOverrides(paramOverrides, envMap)

// add values from the prompt param type to the env map
textInputs := pendingFormFields(ctx, e, envMap)
if len(textInputs) > 0 {
form, err := views.NewForm(logger.Theme(ctx.Config.Theme.String()), ctx.StdIn(), ctx.StdOut(), textInputs...)
Expand All @@ -171,37 +187,62 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
envMap[key] = fmt.Sprintf("%v", val)
}
}
return envMap
}

startTime := time.Now()
eng := engine.NewExecEngine()
func cleanupProcessStore(ctx *context.Context) {
if ctx.DataStore != nil {
if err := ctx.DataStore.DeleteProcessBucket(store.EnvironmentBucket()); err != nil {
logger.Log().Errorf("failed clearing process store\n%v", err)
}
}
}

var execArgs []string
if len(args) >= 2 {
execArgs = args[1:]
func recordExecution(ctx *context.Context, ref executable.Ref, startTime time.Time, dur time.Duration, runErr error) {
record := store.ExecutionRecord{
Ref: ref.String(),
StartedAt: startTime,
Duration: dur,
}
if runErr != nil {
record.ExitCode = 1
record.Error = runErr.Error()
}
record.LogArchiveID = findArchiveByID(ctx.LogArchiveID)
if ctx.DataStore != nil {
if recErr := ctx.DataStore.RecordExecution(record); recErr != nil {
logger.Log().Debug("failed to record execution history", "err", recErr)
}
}
}

if err := runner.Exec(ctx, e, eng, envMap, execArgs); err != nil {
logger.Log().FatalErr(err)
// findArchiveByID searches log archive entries for one matching the given ID.
// Returns the entry's path if found, empty string otherwise.
func findArchiveByID(archiveID string) string {
if archiveID == "" {
return ""
}
dur := time.Since(startTime)
processStore, err := store.NewStore(store.Path())
entries, err := tuikitIO.ListArchiveEntries(filesystem.LogsDir())
if err != nil {
logger.Log().Errorf("failed clearing process store\n%v", err)
return ""
}
if processStore != nil {
if err = processStore.DeleteBucket(store.EnvironmentBucket()); err != nil {
logger.Log().Errorf("failed clearing process store\n%v", err)
for _, e := range entries {
if e.ID == archiveID {
return e.Path
}
_ = processStore.Close()
}
logger.Log().Debug(fmt.Sprintf("%s flow completed", ref), "Elapsed", dur.Round(time.Millisecond))
if TUIEnabled(ctx, cmd) {
if dur > 1*time.Minute && ctx.Config.SendSoundNotification() {
_ = beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration)
}
if dur > 1*time.Minute && ctx.Config.SendTextNotification() {
_ = beeep.Notify("Flow", "Flow completed", "")
}
return ""
}

func sendCompletionNotifications(ctx *context.Context, cmd *cobra.Command, dur time.Duration) {
if !TUIEnabled(ctx, cmd) || dur <= 1*time.Minute {
return
}
if ctx.Config.SendSoundNotification() {
_ = beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration)
}
if ctx.Config.SendTextNotification() {
_ = beeep.Notify("Flow", "Flow completed", "")
}
}

Expand Down
29 changes: 29 additions & 0 deletions cmd/internal/flags/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,35 @@ var LastLogEntryFlag = &Metadata{
Required: false,
}

var LogFilterWorkspaceFlag = &Metadata{
Name: "workspace",
Shorthand: "w",
Usage: "Filter history by workspace name.",
Default: "",
Required: false,
}

var LogFilterStatusFlag = &Metadata{
Name: "status",
Usage: "Filter history by status (success or failure).",
Default: "",
Required: false,
}

var LogFilterSinceFlag = &Metadata{
Name: "since",
Usage: "Filter history to entries after a duration (e.g. 1h, 30m, 7d).",
Default: "",
Required: false,
}

var LogFilterLimitFlag = &Metadata{
Name: "limit",
Usage: "Maximum number of records to display.",
Default: 0,
Required: false,
}

var TemplateWorkspaceFlag = &Metadata{
Name: "workspace",
Shorthand: "w",
Expand Down
Loading
Loading