diff --git a/cmd/pgbackrest/main.go b/cmd/pgbackrest/main.go index 6d12229e0e..0a0c6c71ef 100644 --- a/cmd/pgbackrest/main.go +++ b/cmd/pgbackrest/main.go @@ -16,7 +16,6 @@ package main */ import ( - "bytes" "context" "io" "os" @@ -96,18 +95,13 @@ func main() { cmd := createPGBackRestCommand(cfg) log.Infof("command to execute is [%s]", strings.Join(cmd, " ")) - var output, stderr string // now run the proper exec command depending on whether or not the config hashes should first // be compared prior to executing the PGBackRest command if !cfg.compareHash { - output, stderr, err = runCommand(ctx, k, cfg, cmd) + err = runCommand(ctx, k, cfg, cmd) } else { - output, stderr, err = compareHashAndRunCommand(ctx, k, cfg, cmd) + err = compareHashAndRunCommand(ctx, k, cfg, cmd) } - - // log any output and check for errors - log.Info("output=[" + output + "]") - log.Info("stderr=[" + stderr + "]") if err != nil { log.Fatal(err) } @@ -117,13 +111,11 @@ func main() { // Exec returns the stdout and stderr from running a command inside an existing // container. -func (k *KubeAPI) Exec(ctx context.Context, namespace, pod, container string, stdin io.Reader, command []string) (string, string, error) { - var stdout, stderr bytes.Buffer - +func (k *KubeAPI) Exec(ctx context.Context, namespace, pod, container string, stdin io.Reader, command []string) error { Scheme := runtime.NewScheme() if err := corev1.AddToScheme(Scheme); err != nil { log.Error(err) - return "", "", err + return err } ParameterCodec := runtime.NewParameterCodec(Scheme) @@ -140,15 +132,58 @@ func (k *KubeAPI) Exec(ctx context.Context, namespace, pod, container string, st exec, err := remotecommand.NewSPDYExecutor(k.Config, "POST", request.URL()) - if err == nil { - err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ - Stdin: stdin, - Stdout: &stdout, - Stderr: &stderr, - }) - } + stdoutReader, stdoutWriter := io.Pipe() + defer func() { + if err := stdoutWriter.Close(); err != nil { + log.Errorf("error closing stdoutWriter: %v", err) + } + }() + + stderrReader, stderrWriter := io.Pipe() + defer func() { + if err := stderrWriter.Close(); err != nil { + log.Errorf("error closing stderrWriter: %v", err) + } + }() + + go streamUsingPrefix("[pgbackrest:stdout]", stdoutReader) + go streamUsingPrefix("[pgbackrest:stderr]", stderrReader) + + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: stdoutWriter, + Stderr: stderrWriter, + }) - return stdout.String(), stderr.String(), err + return err +} + +// streamUsingPrefix reads from an io.Reader line by line and logs each line +// prefixing it with a custom label provided as input. +func streamUsingPrefix(prefix string, reader io.Reader) { + buf := make([]byte, 4096) + line := "" + for { + n, err := reader.Read(buf) + if n > 0 { + line += string(buf[:n]) + for strings.Contains(line, "\n") { + idx := strings.Index(line, "\n") + part := line[:idx] + log.Infof("%s %s", prefix, part) + line = line[idx+1:] + } + } + if err != nil { + if err != io.EOF { + log.Errorf("%s error reading: %v", prefix, err) + } + break + } + } + if line != "" { + log.Infof("%s %s", prefix, line) + } } func NewConfig() (*rest.Config, error) { @@ -313,7 +348,7 @@ func createPGBackRestCommand(cfg config) []string { // command. Only if the hashes match will the pgBackRest command be run, otherwise and error will // be written and exit code 1 will be returned. This is done to ensure a pgBackRest command is only // run when it can be verified that the exepected configuration is present. -func compareHashAndRunCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) (string, string, error) { +func compareHashAndRunCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) error { // the base script used in both the local and exec commands created below baseScript := ` shopt -s globstar @@ -352,7 +387,7 @@ fi // runCommand runs the provided pgBackRest command according to the configuration // provided -func runCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) (string, string, error) { +func runCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) error { bashCmd := []string{"bash"} reader := strings.NewReader(strings.Join(cmd, " ")) return kubeapi.Exec(ctx, cfg.namespace, cfg.podName, cfg.container, reader, bashCmd)