Skip to content

Commit 961970c

Browse files
authored
feat: Add execution history tracking with persistent data store (#384)
Introduce a persistent execution history system that records each executable run (ref, timestamp, duration, status) and surfaces it through enhanced `flow logs` commands. This replaces the previous in-memory store with a new pkg/store backed by bbolt, moves cache logic into pkg/cache for broader reuse, and adds a `flow logs clear` subcommand for cleanup. Key changes: - New pkg/store with bbolt-backed data store for execution records - Enhanced log viewer with filterable execution history (by verb, workspace, status, and time range) - Parallel/serial runners now record execution results automatically - Relocated cache package from internal/ to pkg/ for wider access - Added execution history guide and updated CLI docs - Added e2e tests for new log commands
1 parent c1e7d33 commit 961970c

36 files changed

Lines changed: 1927 additions & 1026 deletions

cmd/internal/cache.go

Lines changed: 9 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ import (
99

1010
"github.com/flowexec/flow/cmd/internal/flags"
1111
cacheIO "github.com/flowexec/flow/internal/io/cache"
12-
"github.com/flowexec/flow/internal/services/store"
1312
"github.com/flowexec/flow/pkg/context"
1413
"github.com/flowexec/flow/pkg/logger"
14+
"github.com/flowexec/flow/pkg/store"
1515
)
1616

1717
func RegisterCacheCmd(ctx *context.Context, rootCmd *cobra.Command) {
@@ -76,24 +76,12 @@ func cacheSetFunc(ctx *context.Context, cmd *cobra.Command, args []string) {
7676
value = strings.Join(args[1:], " ")
7777
}
7878

79-
s, err := store.NewStore(store.Path())
80-
if err != nil {
81-
logger.Log().FatalErr(err)
82-
}
8379
bucketName := store.EnvironmentBucket()
8480
global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false)
8581
if global {
8682
bucketName = store.RootBucket
8783
}
88-
if _, err = s.CreateAndSetBucket(bucketName); err != nil {
89-
logger.Log().FatalErr(err)
90-
}
91-
defer func() {
92-
if err = s.Close(); err != nil {
93-
logger.Log().WrapError(err, "cleanup failure")
94-
}
95-
}()
96-
if err = s.Set(key, value); err != nil {
84+
if err := ctx.DataStore.SetProcessVar(bucketName, key, value); err != nil {
9785
logger.Log().FatalErr(err)
9886
}
9987
logger.Log().PlainTextInfo(fmt.Sprintf("Key %q set in the cache", key))
@@ -114,27 +102,15 @@ func registerCacheGetCmd(ctx *context.Context, rootCmd *cobra.Command) {
114102
rootCmd.AddCommand(subCmd)
115103
}
116104

117-
func cacheGetFunc(_ *context.Context, cmd *cobra.Command, args []string) {
105+
func cacheGetFunc(ctx *context.Context, cmd *cobra.Command, args []string) {
118106
key := args[0]
119107

120-
s, err := store.NewStore(store.Path())
121-
if err != nil {
122-
logger.Log().FatalErr(err)
123-
}
124108
bucketName := store.EnvironmentBucket()
125109
global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false)
126110
if global {
127111
bucketName = store.RootBucket
128112
}
129-
if _, err = s.CreateAndSetBucket(bucketName); err != nil {
130-
logger.Log().FatalErr(err)
131-
}
132-
defer func() {
133-
if err := s.Close(); err != nil {
134-
logger.Log().WrapError(err, "cleanup failure")
135-
}
136-
}()
137-
value, err := s.Get(key)
113+
value, err := ctx.DataStore.GetProcessVar(bucketName, key)
138114
if err != nil {
139115
logger.Log().FatalErr(err)
140116
}
@@ -159,19 +135,7 @@ func registerCacheListCmd(ctx *context.Context, rootCmd *cobra.Command) {
159135
}
160136

161137
func cacheListFunc(ctx *context.Context, cmd *cobra.Command, _ []string) {
162-
s, err := store.NewStore(store.Path())
163-
if err != nil {
164-
logger.Log().FatalErr(err)
165-
}
166-
if _, err = s.CreateAndSetBucket(store.EnvironmentBucket()); err != nil {
167-
logger.Log().FatalErr(err)
168-
}
169-
defer func() {
170-
if err := s.Close(); err != nil {
171-
logger.Log().WrapError(err, "cleanup failure")
172-
}
173-
}()
174-
data, err := s.GetAll()
138+
data, err := ctx.DataStore.GetAllProcessVars(store.EnvironmentBucket())
175139
if err != nil {
176140
logger.Log().FatalErr(err)
177141
}
@@ -199,27 +163,15 @@ func registerCacheRemoveCmd(ctx *context.Context, rootCmd *cobra.Command) {
199163
rootCmd.AddCommand(subCmd)
200164
}
201165

202-
func cacheRemoveFunc(_ *context.Context, cmd *cobra.Command, args []string) {
166+
func cacheRemoveFunc(ctx *context.Context, cmd *cobra.Command, args []string) {
203167
key := args[0]
204168

205-
s, err := store.NewStore(store.Path())
206-
if err != nil {
207-
logger.Log().FatalErr(err)
208-
}
209169
bucketName := store.EnvironmentBucket()
210170
global := flags.ValueFor[bool](cmd, *flags.GlobalCacheFlag, false)
211171
if global {
212172
bucketName = store.RootBucket
213173
}
214-
if _, err = s.CreateAndSetBucket(bucketName); err != nil {
215-
logger.Log().FatalErr(err)
216-
}
217-
defer func() {
218-
if err := s.Close(); err != nil {
219-
logger.Log().WrapError(err, "cleanup failure")
220-
}
221-
}()
222-
if err = s.Delete(key); err != nil {
174+
if err := ctx.DataStore.DeleteProcessVar(bucketName, key); err != nil {
223175
logger.Log().FatalErr(err)
224176
}
225177
logger.Log().PlainTextSuccess(fmt.Sprintf("Key %q removed from the cache", key))
@@ -240,7 +192,7 @@ func registerCacheClearCmd(ctx *context.Context, rootCmd *cobra.Command) {
240192
rootCmd.AddCommand(subCmd)
241193
}
242194

243-
func cacheClearFunc(_ *context.Context, cmd *cobra.Command, _ []string) {
195+
func cacheClearFunc(ctx *context.Context, cmd *cobra.Command, _ []string) {
244196
full := flags.ValueFor[bool](cmd, *flags.StoreAllFlag, false)
245197
if full {
246198
if err := store.DestroyStore(); err != nil {
@@ -249,16 +201,7 @@ func cacheClearFunc(_ *context.Context, cmd *cobra.Command, _ []string) {
249201
logger.Log().PlainTextSuccess("Cache cleared")
250202
return
251203
}
252-
s, err := store.NewStore(store.Path())
253-
if err != nil {
254-
logger.Log().FatalErr(err)
255-
}
256-
defer func() {
257-
if err := s.Close(); err != nil {
258-
logger.Log().WrapError(err, "cleanup failure")
259-
}
260-
}()
261-
if err := s.DeleteBucket(store.EnvironmentBucket()); err != nil {
204+
if err := ctx.DataStore.DeleteProcessBucket(store.EnvironmentBucket()); err != nil {
262205
logger.Log().FatalErr(err)
263206
}
264207
logger.Log().PlainTextSuccess("Cache cleared")

cmd/internal/exec.go

Lines changed: 76 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ import (
2323
"github.com/flowexec/flow/internal/runner/render"
2424
"github.com/flowexec/flow/internal/runner/request"
2525
"github.com/flowexec/flow/internal/runner/serial"
26-
"github.com/flowexec/flow/internal/services/store"
2726
"github.com/flowexec/flow/internal/utils/env"
2827
"github.com/flowexec/flow/pkg/context"
2928
flowErrors "github.com/flowexec/flow/pkg/errors"
29+
"github.com/flowexec/flow/pkg/filesystem"
3030
"github.com/flowexec/flow/pkg/logger"
31+
"github.com/flowexec/flow/pkg/store"
3132
"github.com/flowexec/flow/types/executable"
3233
"github.com/flowexec/flow/types/workspace"
3334
)
@@ -87,9 +88,6 @@ func execPreRun(_ *context.Context, _ *cobra.Command, _ []string) {
8788
runner.RegisterRunner(parallel.NewRunner())
8889
}
8990

90-
// TODO: refactor this function to simplify the logic
91-
//
92-
//nolint:funlen,gocognit
9391
func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, args []string) {
9492
logMode := flags.ValueFor[string](cmd, *flags.LogModeFlag, false)
9593
if logMode != "" {
@@ -132,17 +130,37 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
132130
))
133131
}
134132

135-
s, err := store.NewStore(store.Path())
136-
if err != nil {
137-
logger.Log().FatalErr(err)
133+
if ctx.DataStore != nil {
134+
if err := ctx.DataStore.CreateProcessBucket(ref.String()); err != nil {
135+
logger.Log().FatalErr(err)
136+
}
137+
_ = os.Setenv(store.BucketEnv, ref.String())
138138
}
139-
if _, err = s.CreateAndSetBucket(ref.String()); err != nil {
140-
logger.Log().FatalErr(err)
139+
140+
envMap := buildExecEnv(ctx, cmd, e)
141+
142+
var execArgs []string
143+
if len(args) >= 2 {
144+
execArgs = args[1:]
145+
}
146+
147+
startTime := time.Now()
148+
eng := engine.NewExecEngine()
149+
runErr := runner.Exec(ctx, e, eng, envMap, execArgs)
150+
dur := time.Since(startTime)
151+
152+
cleanupProcessStore(ctx)
153+
recordExecution(ctx, ref, startTime, dur, runErr)
154+
155+
if runErr != nil {
156+
logger.Log().FatalErr(runErr)
141157
}
142-
_ = s.Close()
158+
logger.Log().Debug(fmt.Sprintf("%s flow completed", ref), "Elapsed", dur.Round(time.Millisecond))
159+
sendCompletionNotifications(ctx, cmd, dur)
160+
}
143161

162+
func buildExecEnv(ctx *context.Context, cmd *cobra.Command, e *executable.Executable) map[string]string {
144163
envMap := make(map[string]string)
145-
// add workspace env variables to the env map
146164
if wsData, err := ctx.WorkspacesCache.GetWorkspaceConfigList(); err != nil {
147165
logger.Log().Errorf("failed to get workspace cache data, skipping env file resolution: %v", err)
148166
} else {
@@ -153,11 +171,9 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
153171
}
154172
}
155173

156-
// add --param overrides to the env map
157174
paramOverrides := flags.ValueFor[[]string](cmd, *flags.ParameterValueFlag, false)
158175
applyParameterOverrides(paramOverrides, envMap)
159176

160-
// add values from the prompt param type to the env map
161177
textInputs := pendingFormFields(ctx, e, envMap)
162178
if len(textInputs) > 0 {
163179
form, err := views.NewForm(logger.Theme(ctx.Config.Theme.String()), ctx.StdIn(), ctx.StdOut(), textInputs...)
@@ -171,37 +187,62 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
171187
envMap[key] = fmt.Sprintf("%v", val)
172188
}
173189
}
190+
return envMap
191+
}
174192

175-
startTime := time.Now()
176-
eng := engine.NewExecEngine()
193+
func cleanupProcessStore(ctx *context.Context) {
194+
if ctx.DataStore != nil {
195+
if err := ctx.DataStore.DeleteProcessBucket(store.EnvironmentBucket()); err != nil {
196+
logger.Log().Errorf("failed clearing process store\n%v", err)
197+
}
198+
}
199+
}
177200

178-
var execArgs []string
179-
if len(args) >= 2 {
180-
execArgs = args[1:]
201+
func recordExecution(ctx *context.Context, ref executable.Ref, startTime time.Time, dur time.Duration, runErr error) {
202+
record := store.ExecutionRecord{
203+
Ref: ref.String(),
204+
StartedAt: startTime,
205+
Duration: dur,
206+
}
207+
if runErr != nil {
208+
record.ExitCode = 1
209+
record.Error = runErr.Error()
181210
}
211+
record.LogArchiveID = findArchiveByID(ctx.LogArchiveID)
212+
if ctx.DataStore != nil {
213+
if recErr := ctx.DataStore.RecordExecution(record); recErr != nil {
214+
logger.Log().Debug("failed to record execution history", "err", recErr)
215+
}
216+
}
217+
}
182218

183-
if err := runner.Exec(ctx, e, eng, envMap, execArgs); err != nil {
184-
logger.Log().FatalErr(err)
219+
// findArchiveByID searches log archive entries for one matching the given ID.
220+
// Returns the entry's path if found, empty string otherwise.
221+
func findArchiveByID(archiveID string) string {
222+
if archiveID == "" {
223+
return ""
185224
}
186-
dur := time.Since(startTime)
187-
processStore, err := store.NewStore(store.Path())
225+
entries, err := tuikitIO.ListArchiveEntries(filesystem.LogsDir())
188226
if err != nil {
189-
logger.Log().Errorf("failed clearing process store\n%v", err)
227+
return ""
190228
}
191-
if processStore != nil {
192-
if err = processStore.DeleteBucket(store.EnvironmentBucket()); err != nil {
193-
logger.Log().Errorf("failed clearing process store\n%v", err)
229+
for _, e := range entries {
230+
if e.ID == archiveID {
231+
return e.Path
194232
}
195-
_ = processStore.Close()
196233
}
197-
logger.Log().Debug(fmt.Sprintf("%s flow completed", ref), "Elapsed", dur.Round(time.Millisecond))
198-
if TUIEnabled(ctx, cmd) {
199-
if dur > 1*time.Minute && ctx.Config.SendSoundNotification() {
200-
_ = beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration)
201-
}
202-
if dur > 1*time.Minute && ctx.Config.SendTextNotification() {
203-
_ = beeep.Notify("Flow", "Flow completed", "")
204-
}
234+
return ""
235+
}
236+
237+
func sendCompletionNotifications(ctx *context.Context, cmd *cobra.Command, dur time.Duration) {
238+
if !TUIEnabled(ctx, cmd) || dur <= 1*time.Minute {
239+
return
240+
}
241+
if ctx.Config.SendSoundNotification() {
242+
_ = beeep.Beep(beeep.DefaultFreq, beeep.DefaultDuration)
243+
}
244+
if ctx.Config.SendTextNotification() {
245+
_ = beeep.Notify("Flow", "Flow completed", "")
205246
}
206247
}
207248

cmd/internal/flags/types.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,35 @@ var LastLogEntryFlag = &Metadata{
148148
Required: false,
149149
}
150150

151+
var LogFilterWorkspaceFlag = &Metadata{
152+
Name: "workspace",
153+
Shorthand: "w",
154+
Usage: "Filter history by workspace name.",
155+
Default: "",
156+
Required: false,
157+
}
158+
159+
var LogFilterStatusFlag = &Metadata{
160+
Name: "status",
161+
Usage: "Filter history by status (success or failure).",
162+
Default: "",
163+
Required: false,
164+
}
165+
166+
var LogFilterSinceFlag = &Metadata{
167+
Name: "since",
168+
Usage: "Filter history to entries after a duration (e.g. 1h, 30m, 7d).",
169+
Default: "",
170+
Required: false,
171+
}
172+
173+
var LogFilterLimitFlag = &Metadata{
174+
Name: "limit",
175+
Usage: "Maximum number of records to display.",
176+
Default: 0,
177+
Required: false,
178+
}
179+
151180
var TemplateWorkspaceFlag = &Metadata{
152181
Name: "workspace",
153182
Shorthand: "w",

0 commit comments

Comments
 (0)