diff --git a/.golangci.yaml b/.golangci.yaml index d3c8f29..098d05d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -15,7 +15,6 @@ linters: - goconst - gocritic - gocyclo - - goprintffuncname - gosec - govet - ineffassign diff --git a/docker/background.go b/docker/background.go new file mode 100644 index 0000000..12d5b65 --- /dev/null +++ b/docker/background.go @@ -0,0 +1,205 @@ +package docker + +import ( + "context" + "crypto/rand" + "fmt" + "os" + "os/exec" + "regexp" + "sync" + "syscall" + "time" + + "github.com/anchore/go-make/file" + . "github.com/anchore/go-make/lang" + "github.com/anchore/go-make/log" + "github.com/anchore/go-make/run" + "github.com/anchore/go-make/shell" + "github.com/anchore/go-make/stream" +) + +type Process struct { + cmd *exec.Cmd + name string + containerID string + started sync.WaitGroup + exited sync.WaitGroup + cmdOut stream.TeeWriter + cmdErr stream.TeeWriter +} + +// Background runs a container in the background, returning a *Process to execute commands and otherwise +// interact with the container +func Background(containerOrDockerfile string, opts ...Option) *Process { + proc := &Process{ + name: randomString(), + } + proc.started.Add(1) + proc.exited.Add(1) + + cfg := makeConfig(containerOrDockerfile, append(opts, func(cfg *commandConfig) error { + waiter := sync.OnceFunc(func() { + // execute postExecs in the command routine, so that we can wait for the container to start + for _, postExec := range cfg.postExec { + err := postExec(proc) + if err != nil { + proc.Kill() + } + } + + // signal we have the last setup completed + proc.started.Done() // TODO is there a better way to determine the process has actually started? + }) + cfg.dockerArgs = append(cfg.dockerArgs, run.Stdout(os.Stderr), func(ctx context.Context, cmd *exec.Cmd) error { + cmd.Args = append(cmd.Args, "--name", proc.name) + + proc.cmd = cmd + + // inject a TeeWriter into stdout and stderr in order to wait for log text + proc.cmdOut = stream.Tee() + if cmd.Stdout == nil { + proc.cmdOut.AddWriter(cmd.Stdout) + } + cmd.Stdout = proc.cmdOut + + proc.cmdErr = stream.Tee() + if cmd.Stderr == nil { + proc.cmdErr.AddWriter(cmd.Stderr) + } + cmd.Stderr = proc.cmdErr + + go waiter() + + return nil + }) + return nil + })...) + + go func() { + err := Catch(func() { + defer proc.exited.Done() + runConfig(cfg) + }) + if err != nil { + log.Error(err) + _ = Catch(func() { // may already be done, don't panic + proc.started.Done() + }) + } + }() + + proc.started.Wait() + + for proc.containerID == "" { + proc.containerID = Return(run.Command("docker", run.Args("ps", "--all", "--quiet", "--filter", "name="+proc.name))) + } + + return proc +} + +func makeConfig(containerOrDockerfile string, opts ...Option) *commandConfig { + cfg := commandConfig{} + for _, opt := range opts { + Throw(opt(&cfg)) + } + if file.IsRegular(containerOrDockerfile) { + h := file.Sha256Hash(containerOrDockerfile) + if cfg.name == "" { + cfg.name = DefaultContainerPrefix + h + } + Build(containerOrDockerfile, cfg.name) + } else { + // otherwise we assume it's a container name + cfg.name = containerOrDockerfile + } + return &cfg +} + +func (r *Process) Kill() { + if r == nil { + log.Info("nil process, not sending signals") + return + } + if r.cmd != nil && r.cmd.Process != nil { + for _, signal := range []os.Signal{os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL} { + // for _, signal := range []os.Signal{syscall.SIGINT, syscall.SIGSTOP, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGABRT} { + if r.cmd != nil && r.cmd.Process != nil { + if signal == syscall.SIGKILL { + log.Error(r.cmd.Process.Kill()) + } else { + log.Error(r.cmd.Process.Signal(signal)) + } + } + time.Sleep(10 * time.Millisecond) + } + log.Info("sent kill signals to container: %s", r.name) + } + r.WaitUntilExit() +} + +func (r *Process) WaitUntilExit() { + r.exited.Wait() +} + +// Exec runs a command in the running container +func (r *Process) Exec(containerCommand string, opts ...run.Option) string { + cmd := shell.Split(containerCommand) + return Return(run.Command("docker", run.Args("exec", r.containerID, cmd[0]), run.Args(cmd[1:]...), run.Options(opts...))) +} + +// WaitLog waits for the provided text to be present in the stdout or stderr of the container, this is guaranteed +// to capture all the text from startup +func WaitLog(text string) Option { + return func(cfg *commandConfig) error { + cfg.postExec = append(cfg.postExec, func(proc *Process) error { + proc.NextLogMatch(regexp.MustCompile(regexp.QuoteMeta(text))) + return nil + }) + return nil + } +} + +// WaitFor waits for the given condition to be true by polling +func WaitFor(condition func() bool) { + for !condition() { + time.Sleep(100 * time.Millisecond) + } +} + +// WaitLogText waits for the given text to appear in the container's stdout or stderr and returns +func (r *Process) WaitLogText(text string) { + WaitFor(func() bool { + return r.NextLogMatch(regexp.MustCompile(regexp.QuoteMeta(text))) != nil + }) +} + +// NextLogMatch waits for the given regexp to appear in the container's stdout or stderr and returns the next match, +// organized with the full match at the "" and named subexpression matches. NOTE: this starts reading from the log +// when the function is called, so any text written to the log before the capture begins will not match, which may be +// problematic if an action is able to be scheduled before log capture begins +func (r *Process) NextLogMatch(re *regexp.Regexp, opts ...stream.Option) map[string]string { + stdout, m1 := stream.NewRegexpScanner(re, opts...) + r.cmdOut.AddWriter(stdout) + defer r.cmdOut.RemoveWriter(stdout) + + stderr, m2 := stream.NewRegexpScanner(re, opts...) + r.cmdErr.AddWriter(stderr) + defer r.cmdErr.RemoveWriter(stderr) + + log.Info("waiting for: %s", re.String()) + + // block until a match is found in either stdout or stderr + var match map[string]string + select { + case match = <-m1: + case match = <-m2: + } + return match +} + +func randomString() string { + b := make([]byte, 32) + _, _ = rand.Read(b) // read is always supposed to succeed + return fmt.Sprintf("%x", b) +} diff --git a/docker/options.go b/docker/options.go new file mode 100644 index 0000000..d39f499 --- /dev/null +++ b/docker/options.go @@ -0,0 +1,98 @@ +package docker + +import ( + "fmt" + "io" + "net" + "path/filepath" + + . "github.com/anchore/go-make/lang" + "github.com/anchore/go-make/log" + "github.com/anchore/go-make/run" +) + +type Option func(*commandConfig) error + +// Flags are passed to the docker command itself, e.g. before the container name in `docker run ` +func Flags(args ...string) Option { + return func(cfg *commandConfig) error { + cfg.dockerArgs = append(cfg.dockerArgs, run.Args(args...)) + return nil + } +} + +// Args args are passed to the command being run, e.g. following the container name in `docker run ` +func Args(args ...string) Option { + return func(cfg *commandConfig) error { + cfg.commandArgs = append(cfg.commandArgs, run.Args(args...)) + return nil + } +} + +func Stdout(writer io.Writer) Option { + return func(cfg *commandConfig) error { + cfg.dockerArgs = append(cfg.dockerArgs, run.Stdout(writer)) + return nil + } +} + +func Entrypoint(command string) Option { + return Flags("--entrypoint", command) +} + +func Envs(env map[string]string) Option { + return func(cfg *commandConfig) error { + for k, v := range env { + err := Env(k, v)(cfg) + if err != nil { + return err + } + } + return nil + } +} + +func Env(key, value string) Option { + return func(cfg *commandConfig) error { + cfg.dockerArgs = append(cfg.dockerArgs, run.Args("--env", fmt.Sprintf("%s=%s", key, value))) + return nil + } +} + +func ExposeRandomPort(randomPort *int, containerPort int) Option { + *randomPort = Return(unusedPort()) + return ExposePort(*randomPort, containerPort) +} + +func ExposePort(localPort, containerPort int) Option { + return Flags("-p", fmt.Sprintf("%d:%d", localPort, containerPort)) +} + +func MountVolume(local, container string) Option { + local = Return(filepath.Abs(local)) + return Flags("-v", fmt.Sprintf("%s:%s", local, container)) +} + +// InDir runs docker in the specific directory, also mounting it to DefaultContainerDir or containerDir, if provided +func InDir(localDir string, containerDir ...string) Option { + return func(cfg *commandConfig) error { + d := DefaultContainerDir + if len(containerDir) > 0 { + d = containerDir[0] + } + cfg.dockerArgs = append(cfg.dockerArgs, run.InDir(localDir), run.Args("--workdir", d)) + return MountVolume(localDir, d)(cfg) + } +} + +func unusedPort() (int, error) { + addr, err := net.Listen("tcp", ":0") //nolint:gosec + if err != nil { + return 0, err + } + defer func() { + log.Error(addr.Close()) + }() + + return addr.Addr().(*net.TCPAddr).Port, nil +} diff --git a/docker/run.go b/docker/run.go new file mode 100644 index 0000000..2f3a9d8 --- /dev/null +++ b/docker/run.go @@ -0,0 +1,53 @@ +package docker + +import ( + "context" + "os" + "os/exec" + "path/filepath" + "syscall" + + . "github.com/anchore/go-make/lang" + "github.com/anchore/go-make/log" + "github.com/anchore/go-make/run" +) + +var DefaultContainerPrefix = "localhost/go-make-auto-build:" +var DefaultContainerDir = "/.data" + +type commandConfig struct { + name string + dockerArgs []run.Option + commandArgs []run.Option + postExec []func(*Process) error +} + +func Run(containerOrDockerfile string, opts ...Option) string { + return runConfig(makeConfig(containerOrDockerfile, opts...)) +} + +func runConfig(cfg *commandConfig) string { + var cmd *exec.Cmd + defer func() { + if cmd != nil && cmd.Process != nil { + log.Error(cmd.Process.Kill()) + log.Error(cmd.Process.Signal(syscall.SIGTERM)) + } + }() + // , "--env", "TERM=xterm-256color" + return Return(run.Command("docker", run.Stderr(os.Stderr), func(_ context.Context, c *exec.Cmd) error { + cmd = c + // cmd.SysProcAttr = &syscall.SysProcAttr{ + // Setpgid: true, + // //Setsid: true, + // //Pdeathsig: syscall.SIGKILL, + //} + return nil + // }, run.Args("run", "--rm", "--init", "--interactive"), run.Options(cfg.dockerArgs...), run.Args(cfg.name), run.Options(cfg.commandArgs...))) + }, run.Args("run", "--rm", "--interactive"), run.Options(cfg.dockerArgs...), run.Args(cfg.name), run.Options(cfg.commandArgs...))) +} + +func Build(dockerfile, tag string) { + f := Return(filepath.Abs(dockerfile)) + Return(run.Command("docker", run.Args("build", "--tag", tag, "--file", filepath.Base(dockerfile), "."), run.Stdout(os.Stderr), run.Stderr(os.Stderr), run.InDir(filepath.Dir(f)))) +} diff --git a/log/log.go b/log/log.go index 3a6cde4..2ae9267 100644 --- a/log/log.go +++ b/log/log.go @@ -48,9 +48,17 @@ func init() { } func debugLog(format string, args ...any) { - _, _ = fmt.Fprintf(os.Stderr, Prefix+color.Grey(template.Render(format))+"\n", args...) + if len(args) == 0 { + _, _ = fmt.Fprint(os.Stderr, Prefix+color.Grey(template.Render(format))+"\n") + } else { + _, _ = fmt.Fprintf(os.Stderr, Prefix+color.Grey(template.Render(format))+"\n", args...) + } } func traceLog(format string, args ...any) { - _, _ = fmt.Fprintf(os.Stderr, Prefix+color.Grey(template.Render(format))+"\n", args...) + if len(args) == 0 { + _, _ = fmt.Fprint(os.Stderr, Prefix+color.Grey(template.Render(format))+"\n") + } else { + _, _ = fmt.Fprintf(os.Stderr, Prefix+color.Grey(template.Render(format))+"\n", args...) + } } diff --git a/stream/passthrough.go b/stream/passthrough.go new file mode 100644 index 0000000..2e4ab32 --- /dev/null +++ b/stream/passthrough.go @@ -0,0 +1,32 @@ +package stream + +import ( + "io" + "strings" + + "github.com/anchore/go-make/log" +) + +type Passthrough struct { + io.Writer +} + +func NewPassthrough(w io.Writer) *Passthrough { + return &Passthrough{w} +} + +func (w *Passthrough) Write(p []byte) (int, error) { + if strings.Contains(string(p), "start") { + log.Info("start") + } + if strings.Contains(string(p), "ready") { + log.Info("ready") + } + if strings.Contains(string(p), "ready for") { + log.Info("ready for") + } + if strings.Contains(string(p), "start up") { + log.Info("start up") + } + return w.Writer.Write(p) +} diff --git a/stream/regex_scanner.go b/stream/regex_scanner.go new file mode 100644 index 0000000..0788709 --- /dev/null +++ b/stream/regex_scanner.go @@ -0,0 +1,110 @@ +package stream + +import ( + "io" + "regexp" + "sync" +) + +type Option func(*regexpScanner) + +// Size is the guaranteed scan size to return results reliably up to this size +func Size(size int) Option { + return func(r *regexpScanner) { + // the buffer size is twice, so we are able to match expressions across the entire stream up to `size` bytes + r.buf = make([]byte, size*2) + } +} + +func NewRegexpScanner(re *regexp.Regexp, opts ...Option) (io.Writer, chan map[string]string) { + r := ®expScanner{ + re: re, + result: make(chan map[string]string), + } + for _, opt := range opts { + opt(r) + } + if r.buf == nil { + r.buf = make([]byte, defaultSize) + } + return r, r.result +} + +const ( + defaultSize = 1024 +) + +type regexpScanner struct { + result chan map[string]string + re *regexp.Regexp + lock sync.Mutex + buf []byte + pos int +} + +func (r *regexpScanner) SetRegex(re *regexp.Regexp) { + r.lock.Lock() + defer r.lock.Unlock() + r.re = re +} + +func (r *regexpScanner) Write(p []byte) (n int, err error) { + r.lock.Lock() + defer r.lock.Unlock() + total := len(p) + for len(p) != 0 { + remain := len(r.buf) - r.pos + if len(p) <= remain { + // if the incoming buffer fits into the remaining space in the ring, just copy it in + copy(r.buf[r.pos:], p) + r.pos += len(p) + p = nil + } else { + start := r.pos - len(r.buf)/2 + if start < 0 { + start = 0 + } + // copy half of buf to the beginning, so the next scan can find matches that span writes + copy(r.buf, r.buf[start:r.pos]) + r.pos -= start + + remain = len(r.buf) - r.pos + if remain > len(p) { + remain = len(p) + } + + // copy as much of p to our ring as we can + copy(r.buf[r.pos:], p[:remain]) + p = p[remain:] + r.pos += remain + } + + // each iteration, test to find matches + s := string(r.buf[:r.pos]) + indexes := r.re.FindAllStringSubmatchIndex(s, -1) + if indexes != nil { + lastMatch := 0 + names := r.re.SubexpNames() + for _, match := range indexes { + matches := map[string]string{} + for i, name := range names { + start := match[2*i] + end := match[2*i+1] + matches[name] = s[start:end] + if end > lastMatch { + lastMatch = end + } + } + r.result <- matches + } + + // don't want to return duplicate matches: remove all contents through the last match so subsequent + // searches won't return it + if lastMatch > 0 { + copy(r.buf, r.buf[lastMatch:r.pos]) + r.pos -= lastMatch + } + } + } + return total, err +} diff --git a/stream/regex_scanner_test.go b/stream/regex_scanner_test.go new file mode 100644 index 0000000..8c300a7 --- /dev/null +++ b/stream/regex_scanner_test.go @@ -0,0 +1,115 @@ +package stream_test + +import ( + "regexp" + "testing" + + "github.com/anchore/go-make/require" + "github.com/anchore/go-make/stream" +) + +func Test_regexScanner(t *testing.T) { + tests := []struct { + name string + input []string + regex *regexp.Regexp + size int + match []map[string]string + }{ + { + name: "input chunks smaller than buf", + input: []string{"a", "b", "c"}, + regex: regexp.MustCompile(regexp.QuoteMeta("ab")), + match: []map[string]string{ + {"": "ab"}, + }, + }, + { + name: "input chunks larger than buf", + size: 4, + input: []string{"abcdefg", "abcdefg"}, + regex: regexp.MustCompile(regexp.QuoteMeta("b")), + match: []map[string]string{ + {"": "b"}, + {"": "b"}, + }, + }, + { + name: "end input chunks larger than buf", + size: 4, + input: []string{"abcdefg", "abcdefg"}, + regex: regexp.MustCompile(regexp.QuoteMeta("fg")), + match: []map[string]string{ + {"": "fg"}, + {"": "fg"}, + }, + }, + { + name: "search longer than inputs", + size: 4, + input: []string{"ab", "cd", "ef", "g", "ab", "cd", "ef", "g"}, + regex: regexp.MustCompile(regexp.QuoteMeta("abc")), + match: []map[string]string{ + {"": "abc"}, + {"": "abc"}, + }, + }, + { + name: "sub expressions", + size: 4, + input: []string{"abcdefg", "abcdefg"}, + regex: regexp.MustCompile("a(?Pb)"), + match: []map[string]string{ + {"": "ab", "thename": "b"}, + {"": "ab", "thename": "b"}, + }, + }, + { + name: "large input", + size: 16, + input: chunk(4, `init process ; init process ; ready for start up.`), + regex: regexp.MustCompile(regexp.QuoteMeta("ready for start")), + match: []map[string]string{ + {"": "ready for start"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var opts []stream.Option + if tt.size > 0 { + opts = append(opts, stream.Size(tt.size)) + } + r, result := stream.NewRegexpScanner(tt.regex, opts...) + f := func() { + for _, s := range tt.input { + _, err := r.Write([]byte(s)) + require.NoError(t, err) + } + } + go func() { + f() + close(result) + }() + + var matches []map[string]string + for m := range result { + matches = append(matches, m) + } + + require.Equal(t, tt.match, matches) + }) + } +} + +func chunk(size int, input string) []string { + var out []string + for i := 0; i < len(input); i += size { + if i+size > len(input) { + size = len(input) - i + } + out = append(out, input[i:i+size]) + } + return out +}