@@ -16,7 +16,6 @@ package main
1616*/
1717
1818import (
19- "bytes"
2019 "context"
2120 "io"
2221 "os"
@@ -96,18 +95,13 @@ func main() {
9695 cmd := createPGBackRestCommand (cfg )
9796 log .Infof ("command to execute is [%s]" , strings .Join (cmd , " " ))
9897
99- var output , stderr string
10098 // now run the proper exec command depending on whether or not the config hashes should first
10199 // be compared prior to executing the PGBackRest command
102100 if ! cfg .compareHash {
103- output , stderr , err = runCommand (ctx , k , cfg , cmd )
101+ err = runCommand (ctx , k , cfg , cmd )
104102 } else {
105- output , stderr , err = compareHashAndRunCommand (ctx , k , cfg , cmd )
103+ err = compareHashAndRunCommand (ctx , k , cfg , cmd )
106104 }
107-
108- // log any output and check for errors
109- log .Info ("output=[" + output + "]" )
110- log .Info ("stderr=[" + stderr + "]" )
111105 if err != nil {
112106 log .Fatal (err )
113107 }
@@ -117,13 +111,11 @@ func main() {
117111
118112// Exec returns the stdout and stderr from running a command inside an existing
119113// container.
120- func (k * KubeAPI ) Exec (ctx context.Context , namespace , pod , container string , stdin io.Reader , command []string ) (string , string , error ) {
121- var stdout , stderr bytes.Buffer
122-
114+ func (k * KubeAPI ) Exec (ctx context.Context , namespace , pod , container string , stdin io.Reader , command []string ) error {
123115 Scheme := runtime .NewScheme ()
124116 if err := corev1 .AddToScheme (Scheme ); err != nil {
125117 log .Error (err )
126- return "" , "" , err
118+ return err
127119 }
128120 ParameterCodec := runtime .NewParameterCodec (Scheme )
129121
@@ -140,15 +132,58 @@ func (k *KubeAPI) Exec(ctx context.Context, namespace, pod, container string, st
140132
141133 exec , err := remotecommand .NewSPDYExecutor (k .Config , "POST" , request .URL ())
142134
143- if err == nil {
144- err = exec .StreamWithContext (ctx , remotecommand.StreamOptions {
145- Stdin : stdin ,
146- Stdout : & stdout ,
147- Stderr : & stderr ,
148- })
149- }
135+ stdoutReader , stdoutWriter := io .Pipe ()
136+ defer func () {
137+ if err := stdoutWriter .Close (); err != nil {
138+ log .Errorf ("error closing stdoutWriter: %v" , err )
139+ }
140+ }()
141+
142+ stderrReader , stderrWriter := io .Pipe ()
143+ defer func () {
144+ if err := stderrWriter .Close (); err != nil {
145+ log .Errorf ("error closing stderrWriter: %v" , err )
146+ }
147+ }()
148+
149+ go streamUsingPrefix ("[pgbackrest:stdout]" , stdoutReader )
150+ go streamUsingPrefix ("[pgbackrest:stderr]" , stderrReader )
151+
152+ err = exec .StreamWithContext (ctx , remotecommand.StreamOptions {
153+ Stdin : stdin ,
154+ Stdout : stdoutWriter ,
155+ Stderr : stderrWriter ,
156+ })
150157
151- return stdout .String (), stderr .String (), err
158+ return err
159+ }
160+
161+ // streamUsingPrefix reads from an io.Reader line by line and logs each line
162+ // prefixing it with a custom label provided as input.
163+ func streamUsingPrefix (prefix string , reader io.Reader ) {
164+ buf := make ([]byte , 4096 )
165+ line := ""
166+ for {
167+ n , err := reader .Read (buf )
168+ if n > 0 {
169+ line += string (buf [:n ])
170+ for strings .Contains (line , "\n " ) {
171+ idx := strings .Index (line , "\n " )
172+ part := line [:idx ]
173+ log .Infof ("%s %s" , prefix , part )
174+ line = line [idx + 1 :]
175+ }
176+ }
177+ if err != nil {
178+ if err != io .EOF {
179+ log .Errorf ("%s error reading: %v" , prefix , err )
180+ }
181+ break
182+ }
183+ }
184+ if line != "" {
185+ log .Infof ("%s %s" , prefix , line )
186+ }
152187}
153188
154189func NewConfig () (* rest.Config , error ) {
@@ -313,7 +348,7 @@ func createPGBackRestCommand(cfg config) []string {
313348// command. Only if the hashes match will the pgBackRest command be run, otherwise and error will
314349// be written and exit code 1 will be returned. This is done to ensure a pgBackRest command is only
315350// run when it can be verified that the exepected configuration is present.
316- func compareHashAndRunCommand (ctx context.Context , kubeapi * KubeAPI , cfg config , cmd []string ) ( string , string , error ) {
351+ func compareHashAndRunCommand (ctx context.Context , kubeapi * KubeAPI , cfg config , cmd []string ) error {
317352 // the base script used in both the local and exec commands created below
318353 baseScript := `
319354shopt -s globstar
352387
353388// runCommand runs the provided pgBackRest command according to the configuration
354389// provided
355- func runCommand (ctx context.Context , kubeapi * KubeAPI , cfg config , cmd []string ) ( string , string , error ) {
390+ func runCommand (ctx context.Context , kubeapi * KubeAPI , cfg config , cmd []string ) error {
356391 bashCmd := []string {"bash" }
357392 reader := strings .NewReader (strings .Join (cmd , " " ))
358393 return kubeapi .Exec (ctx , cfg .namespace , cfg .podName , cfg .container , reader , bashCmd )
0 commit comments