Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions cmd/pgbackrest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package main
*/

import (
"bytes"
"context"
"io"
"os"
Expand Down Expand Up @@ -96,18 +95,13 @@ func main() {
cmd := createPGBackRestCommand(cfg)
log.Infof("command to execute is [%s]", strings.Join(cmd, " "))

var output, stderr string
// now run the proper exec command depending on whether or not the config hashes should first
// be compared prior to executing the PGBackRest command
if !cfg.compareHash {
output, stderr, err = runCommand(ctx, k, cfg, cmd)
err = runCommand(ctx, k, cfg, cmd)
} else {
output, stderr, err = compareHashAndRunCommand(ctx, k, cfg, cmd)
err = compareHashAndRunCommand(ctx, k, cfg, cmd)
}

// log any output and check for errors
log.Info("output=[" + output + "]")
log.Info("stderr=[" + stderr + "]")
if err != nil {
log.Fatal(err)
}
Expand All @@ -117,13 +111,11 @@ func main() {

// Exec returns the stdout and stderr from running a command inside an existing
// container.
func (k *KubeAPI) Exec(ctx context.Context, namespace, pod, container string, stdin io.Reader, command []string) (string, string, error) {
var stdout, stderr bytes.Buffer

func (k *KubeAPI) Exec(ctx context.Context, namespace, pod, container string, stdin io.Reader, command []string) error {
Scheme := runtime.NewScheme()
if err := corev1.AddToScheme(Scheme); err != nil {
log.Error(err)
return "", "", err
return err
}
ParameterCodec := runtime.NewParameterCodec(Scheme)

Expand All @@ -140,15 +132,58 @@ func (k *KubeAPI) Exec(ctx context.Context, namespace, pod, container string, st

exec, err := remotecommand.NewSPDYExecutor(k.Config, "POST", request.URL())

if err == nil {
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: &stdout,
Stderr: &stderr,
})
}
stdoutReader, stdoutWriter := io.Pipe()
defer func() {
if err := stdoutWriter.Close(); err != nil {
log.Errorf("error closing stdoutWriter: %v", err)
}
}()

stderrReader, stderrWriter := io.Pipe()
defer func() {
if err := stderrWriter.Close(); err != nil {
log.Errorf("error closing stderrWriter: %v", err)
}
}()

go streamUsingPrefix("[pgbackrest:stdout]", stdoutReader)
go streamUsingPrefix("[pgbackrest:stderr]", stderrReader)

err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdoutWriter,
Stderr: stderrWriter,
})

return stdout.String(), stderr.String(), err
return err
}

// streamUsingPrefix reads from an io.Reader line by line and logs each line
// prefixing it with a custom label provided as input.
func streamUsingPrefix(prefix string, reader io.Reader) {
buf := make([]byte, 4096)
line := ""
for {
n, err := reader.Read(buf)
if n > 0 {
line += string(buf[:n])
for strings.Contains(line, "\n") {
idx := strings.Index(line, "\n")
part := line[:idx]
log.Infof("%s %s", prefix, part)
line = line[idx+1:]
}
}
if err != nil {
if err != io.EOF {
log.Errorf("%s error reading: %v", prefix, err)
}
break
}
}
if line != "" {
log.Infof("%s %s", prefix, line)
}
}

func NewConfig() (*rest.Config, error) {
Expand Down Expand Up @@ -313,7 +348,7 @@ func createPGBackRestCommand(cfg config) []string {
// command. Only if the hashes match will the pgBackRest command be run, otherwise and error will
// be written and exit code 1 will be returned. This is done to ensure a pgBackRest command is only
// run when it can be verified that the exepected configuration is present.
func compareHashAndRunCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) (string, string, error) {
func compareHashAndRunCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) error {
// the base script used in both the local and exec commands created below
baseScript := `
shopt -s globstar
Expand Down Expand Up @@ -352,7 +387,7 @@ fi

// runCommand runs the provided pgBackRest command according to the configuration
// provided
func runCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) (string, string, error) {
func runCommand(ctx context.Context, kubeapi *KubeAPI, cfg config, cmd []string) error {
bashCmd := []string{"bash"}
reader := strings.NewReader(strings.Join(cmd, " "))
return kubeapi.Exec(ctx, cfg.namespace, cfg.podName, cfg.container, reader, bashCmd)
Expand Down
Loading