Skip to content

Commit a04571e

Browse files
committed
feat: working fwatcher integration in Runfile
1 parent 58e2f5d commit a04571e

7 files changed

Lines changed: 183 additions & 70 deletions

File tree

cmd/run/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ import (
1414

1515
"github.com/muesli/termenv"
1616
"github.com/nxtcoder17/fastlog"
17-
term "golang.org/x/term"
1817
"github.com/nxtcoder17/go.errors"
1918
"github.com/nxtcoder17/runfile/pkg/runfile"
2019
"github.com/urfave/cli/v3"
20+
term "golang.org/x/term"
2121
)
2222

2323
var Version string

examples/Runfile.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ tasks:
6868
k4:
6969
sh: echo "1234"
7070
cmd:
71-
# - run: cook
71+
- run: cook
7272
- console.log(process.env.k4)
7373
- console.log("hello from laundry")
7474

@@ -129,3 +129,11 @@ tasks:
129129
cmd:
130130
- node
131131

132+
test-watch:
133+
watch:
134+
enabled: true
135+
dirs:
136+
- .
137+
cmd:
138+
- echo "STARTED"
139+

nixy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ packages:
66
- pre-commit
77
- gotestfmt
88

9+
910
onShellEnter: |+
1011
export PATH="/workspace/bin:$PATH"
1112
source $HOME/.profile

pkg/executor/shell-command.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"os/exec"
88
"os/signal"
99
"syscall"
10-
"time"
1110

1211
"github.com/creack/pty"
1312
"golang.org/x/term"
@@ -49,9 +48,23 @@ func NewInteractiveShellCommand(handler func(context.Context) *exec.Cmd) *comman
4948

5049
// Copy I/O
5150
go io.Copy(ptmx, os.Stdin)
52-
io.Copy(os.Stdout, ptmx)
51+
go io.Copy(os.Stdout, ptmx)
5352

54-
return cmd.Wait()
53+
done := make(chan error, 1)
54+
go func() {
55+
done <- cmd.Wait()
56+
}()
57+
58+
select {
59+
case err := <-done:
60+
return err
61+
case <-ctx.Done():
62+
if cmd.Process != nil {
63+
syscall.Kill(cmd.Process.Pid, syscall.SIGKILL)
64+
}
65+
<-done
66+
return ctx.Err()
67+
}
5568
})
5669
}
5770

@@ -76,14 +89,8 @@ func NewShellCommand(handler func(context.Context) *exec.Cmd) *command {
7689
case err := <-done:
7790
return err
7891
case <-ctx.Done():
79-
syscall.Kill(-pid, syscall.SIGTERM)
80-
81-
select {
82-
case <-done:
83-
case <-time.After(2 * time.Second):
84-
syscall.Kill(-pid, syscall.SIGKILL)
85-
<-done
86-
}
92+
syscall.Kill(-pid, syscall.SIGKILL)
93+
<-done
8794
return ctx.Err()
8895
}
8996
})

pkg/runfile/resolver/task.go

Lines changed: 89 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os/exec"
1212
"strings"
1313
"sync"
14+
"time"
1415

1516
"github.com/alecthomas/chroma/v2/quick"
1617
"github.com/charmbracelet/lipgloss"
@@ -104,15 +105,14 @@ func (r *Resolver) RunTask(ctx context.Context, name string) error {
104105
return err
105106
}
106107

107-
pipeline := executor.NewPipeline(slog.Default(), steps)
108+
currentPipeline := executor.NewPipeline(slog.Default(), steps)
108109

109110
notWatching := rt.Watch == nil || rt.Watch.Enabled == false
110111

111112
if notWatching {
112-
return pipeline.Start(ctx)
113+
return currentPipeline.Start(ctx)
113114
}
114115

115-
var wg sync.WaitGroup
116116
watch, err := watcher.NewWatcher(ctx, watcher.WatcherArgs{
117117
WatchDirs: rt.Watch.Dirs,
118118
IgnoreDirs: rt.Watch.IgnoreDirs,
@@ -126,57 +126,63 @@ func (r *Resolver) RunTask(ctx context.Context, name string) error {
126126
return err
127127
}
128128

129-
wg.Add(1)
130-
go func() {
131-
defer wg.Done()
132-
<-ctx.Done()
133-
// ctx.Logger().Info("fwatcher is closing ...")
134-
watch.Close()
135-
}()
136-
137-
// executors := []executor.Executor{pipeline}
138-
//
139-
// if rt.Watch.SSE != nil && rt.Watch.SSE.Addr != "" {
140-
// executors = append(executors, executor.NewSSEExecutor(executor.SSEExecutorArgs{Addr: rt.Watch.SSE.Addr}))
141-
// }
142-
143-
wg.Add(1)
144-
go func() {
145-
defer wg.Done()
146-
if err := pipeline.Start(ctx); err != nil {
147-
slog.Error("starting command", "err", err)
129+
go watch.Watch(ctx)
130+
defer watch.Close()
131+
132+
var pMu sync.Mutex
133+
134+
run := func() {
135+
pMu.Lock()
136+
defer pMu.Unlock()
137+
138+
if currentPipeline != nil {
139+
currentPipeline.Stop()
148140
}
149-
slog.Debug("final executor start finished")
150-
}()
151-
152-
wg.Add(1)
153-
go func() {
154-
defer wg.Done()
155-
<-ctx.Done()
156-
pipeline.Stop()
157-
slog.Debug("2. context cancelled")
158-
}()
159-
160-
wg.Add(1)
161-
go func() {
162-
defer wg.Done()
163-
watch.Watch(ctx)
164-
slog.Debug("3. watcher closed")
165-
}()
141+
142+
currentPipeline = executor.NewPipeline(slog.Default(), steps)
143+
go func() {
144+
if err := currentPipeline.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
145+
// slog.Error("pipeline finished with error", "err", err)
146+
}
147+
}()
148+
}
149+
150+
run()
151+
// // initial run
152+
// go func() {
153+
// if err := currentPipeline.Start(ctx); err != nil && !errors.Is(err, context.Canceled) {
154+
// // slog.Error("pipeline finished with error", "err", err)
155+
// }
156+
// }()
166157

167158
counter := 0
168-
for ev := range watch.GetEvents() {
169-
slog.Debug("received", "event", ev)
170-
counter += 1
171-
slog.Info(fmt.Sprintf("[RELOADING (%d)] due changes in %s", counter, ev.Name))
159+
debounceDuration := 300 * time.Millisecond
160+
timer := time.NewTimer(debounceDuration)
161+
if !timer.Stop() {
162+
<-timer.C
172163
}
173164

174-
// if err := watch.WatchAndExecute(ctx, executors); err != nil {
175-
// return err
176-
// }
177-
//
178-
wg.Wait()
179-
return nil
165+
for {
166+
select {
167+
case <-ctx.Done():
168+
pMu.Lock()
169+
if currentPipeline != nil {
170+
currentPipeline.Stop()
171+
}
172+
pMu.Unlock()
173+
return nil
174+
case ev, ok := <-watch.GetEvents():
175+
if !ok {
176+
return nil
177+
}
178+
slog.Debug("received", "event", ev)
179+
timer.Reset(debounceDuration)
180+
case <-timer.C:
181+
counter += 1
182+
slog.Info(fmt.Sprintf("[RELOADING (%d)]", counter))
183+
run()
184+
}
185+
}
180186
}
181187

182188
func parseCommands(commands []any) ([]*Command, error) {
@@ -311,8 +317,8 @@ func printCommand(w *writer.LogWriter, prefix, lang, cmd string) {
311317
hlCode.WriteString(cmdStr)
312318
}
313319

314-
// INFO: 2 for spaces around prefix
315-
longestLen := longestLineLen(cmd) + len(prefix) + 2
320+
// Use display width so unicode prefixes like "≫" don't skew the box layout.
321+
longestLen := longestLineWidth(cmd) + prefixDisplayWidth(prefix)
316322

317323
if width > 0 && longestLen >= width-2 {
318324
s = s.Width(width - 2)
@@ -323,12 +329,12 @@ func printCommand(w *writer.LogWriter, prefix, lang, cmd string) {
323329
fmt.Fprintf(w, "\r\033[K%s%s\n", padString(s.Render(hlCode.String()), prefix), s.UnsetBorderStyle())
324330
}
325331

326-
func longestLineLen(str string) int {
332+
func longestLineWidth(str string) int {
327333
sp := strings.Split(str, "\n")
328-
l := len(sp[0])
334+
l := lipgloss.Width(sp[0])
329335
for i := 1; i < len(sp); i++ {
330-
if len(sp[i]) > l {
331-
l = len(sp[i])
336+
if lipgloss.Width(sp[i]) > l {
337+
l = lipgloss.Width(sp[i])
332338
}
333339
}
334340

@@ -337,17 +343,26 @@ func longestLineLen(str string) int {
337343

338344
func padString(str string, withPrefix string) string {
339345
sp := strings.Split(str, "\n")
346+
indent := strings.Repeat(" ", prefixDisplayWidth(withPrefix))
340347
for i := range sp {
341348
if i == 0 {
342349
sp[i] = fmt.Sprintf("%s %s", writer.GetStyledPrefix(withPrefix), sp[i])
343350
continue
344351
}
345-
sp[i] = fmt.Sprintf("%s %s", strings.Repeat(" ", len(withPrefix)+2), sp[i])
352+
sp[i] = indent + sp[i]
346353
}
347354

348355
return strings.Join(sp, "\n")
349356
}
350357

358+
func prefixDisplayWidth(prefix string) int {
359+
if prefix == "" {
360+
return 0
361+
}
362+
363+
return lipgloss.Width("[" + prefix + "] ")
364+
}
365+
351366
type createCommandGroupArgs struct {
352367
Stdout *writer.LogWriter
353368
Stderr *writer.LogWriter
@@ -384,6 +399,10 @@ func (r *Resolver) createSteps(task *ResolvedTask, args createCommandGroupArgs)
384399
}
385400

386401
if cmd.IsRunTarget {
402+
if cycle := appendCycle(taskTrail, cmd.Text); cycle != nil {
403+
return nil, errors.New("Circular Task Dependency").KV("cycle", strings.Join(cycle, " -> "))
404+
}
405+
387406
rt, err := r.GetTask(cmd.Text)
388407
if err != nil {
389408
return nil, err
@@ -448,3 +467,17 @@ func (r *Resolver) createSteps(task *ResolvedTask, args createCommandGroupArgs)
448467
slog.Debug("created command groups", "len", len(steps))
449468
return steps, nil
450469
}
470+
471+
func appendCycle(taskTrail []string, next string) []string {
472+
for i := range taskTrail {
473+
if taskTrail[i] != next {
474+
continue
475+
}
476+
477+
cycle := append([]string{}, taskTrail[i:]...)
478+
cycle = append(cycle, next)
479+
return cycle
480+
}
481+
482+
return nil
483+
}

pkg/runfile/resolver/task_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package resolver
22

33
import (
4+
"strings"
45
"testing"
56

67
"github.com/nxtcoder17/runfile/pkg/runfile/spec"
@@ -350,6 +351,61 @@ func TestCreateSteps(t *testing.T) {
350351
subStepIndices: []int{1}, // only step at index 1 should have substeps
351352
wantErr: false,
352353
},
354+
{
355+
name: "when task has circular dependency, it must fail",
356+
resolver: &Resolver{
357+
Env: map[string]string{},
358+
Tasks: map[string]extendedTaskSpec{
359+
"a": {
360+
TaskSpec: spec.TaskSpec{
361+
Commands: []any{map[string]any{"run": "b"}},
362+
},
363+
},
364+
"b": {
365+
TaskSpec: spec.TaskSpec{
366+
Commands: []any{map[string]any{"run": "a"}},
367+
},
368+
},
369+
},
370+
},
371+
task: &ResolvedTask{
372+
Name: "a",
373+
Shell: []string{"bash", "-c"},
374+
Commands: []*Command{
375+
{Text: "b", IsRunTarget: true},
376+
},
377+
},
378+
args: createCommandGroupArgs{
379+
Stdout: &writer.LogWriter{},
380+
Stderr: &writer.LogWriter{},
381+
},
382+
wantErr: true,
383+
},
384+
{
385+
name: "when task runs itself, it must fail",
386+
resolver: &Resolver{
387+
Env: map[string]string{},
388+
Tasks: map[string]extendedTaskSpec{
389+
"a": {
390+
TaskSpec: spec.TaskSpec{
391+
Commands: []any{map[string]any{"run": "a"}},
392+
},
393+
},
394+
},
395+
},
396+
task: &ResolvedTask{
397+
Name: "a",
398+
Shell: []string{"bash", "-c"},
399+
Commands: []*Command{
400+
{Text: "a", IsRunTarget: true},
401+
},
402+
},
403+
args: createCommandGroupArgs{
404+
Stdout: &writer.LogWriter{},
405+
Stderr: &writer.LogWriter{},
406+
},
407+
wantErr: true,
408+
},
353409
}
354410

355411
for _, tt := range tests {
@@ -360,6 +416,14 @@ func TestCreateSteps(t *testing.T) {
360416
if err == nil {
361417
t.Error("expected error, got nil")
362418
}
419+
420+
if strings.Contains(tt.name, "circular dependency") && !strings.Contains(err.Error(), "a -> b -> a") {
421+
t.Fatalf("expected circular dependency error to include cycle path, got %v", err)
422+
}
423+
424+
if strings.Contains(tt.name, "runs itself") && !strings.Contains(err.Error(), "a -> a") {
425+
t.Fatalf("expected self dependency error to include cycle path, got %v", err)
426+
}
363427
return
364428
}
365429

0 commit comments

Comments
 (0)