Skip to content

Commit a2a4181

Browse files
Vishal TakAxel von Bertoldi
andcommitted
Merge branch 'avonbertoldi/pulp-push-retry' into 'main'
Retry pulp content push commands on specific errors See merge request https://gitlab.com/gitlab-org/gitlab-runner/-/merge_requests/6197 Merged-by: Vishal Tak <vtak@gitlab.com> Approved-by: Balasankar 'Balu' C <balasankar@gitlab.com> Approved-by: Vishal Tak <vtak@gitlab.com> Reviewed-by: GitLab Duo <gitlab-duo@gitlab.com> Co-authored-by: Axel von Bertoldi <avonbertoldi@gitlab.com>
2 parents a564a25 + bdb5ec7 commit a2a4181

2 files changed

Lines changed: 233 additions & 5 deletions

File tree

magefiles/pulp/push.go

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import (
1212
"path/filepath"
1313
"regexp"
1414
"strings"
15+
"time"
1516

17+
"github.com/jpillora/backoff"
1618
"github.com/magefile/mage/sh"
1719
"github.com/samber/lo"
1820
"github.com/sourcegraph/conc/pool"
@@ -105,15 +107,28 @@ type (
105107
)
106108

107109
func (p *basePusher) runPulpCmd(args ...string) error {
108-
fmt.Println("executing", "pulp", strings.Join(args, " "))
110+
slog.Info("executing", "cmd", "pulp", "args", args)
109111
if p.dryrun {
110112
return nil
111113
}
112114
return p.run("pulp", args...)
113115
}
114116

117+
var pulpRetryErrors = []*regexp.Regexp{
118+
regexp.MustCompile(`Artifact with sha256 checksum of '.*' already exists`),
119+
}
120+
121+
func (p *basePusher) retryPulpCmd(args []string, out io.Writer) error {
122+
slog.Info("executing", "cmd", "pulp", "args", args)
123+
if p.dryrun {
124+
return nil
125+
}
126+
127+
return newRetryCommand("pulp", args, pulpRetryErrors, out, p.exec).run()
128+
}
129+
115130
func (p *basePusher) execCmd(out io.Writer, cmd string, args ...string) error {
116-
fmt.Println("executing", cmd, strings.Join(args, " "))
131+
slog.Info("executing", "cmd", cmd, "args", args)
117132
if p.dryrun {
118133
return nil
119134
}
@@ -132,8 +147,7 @@ func (p *debPusher) Push(releases, pkgFiles []string) error {
132147
for _, release := range releases {
133148
for _, pkgFile := range pkgFiles {
134149
pool.Go(func() error {
135-
slog.Debug("Pushing", "package", pkgFile, "release", release)
136-
return p.runPulpCmd(p.pushArgs(release, pkgFile)...)
150+
return p.retryPulpCmd(p.pushArgs(release, pkgFile), io.Discard)
137151
})
138152
}
139153
}
@@ -241,7 +255,7 @@ func (p *rpmPusher) doPush(pkgFile, repo string) (string, error) {
241255
args := p.pushArgs(pkgFile, repo)
242256

243257
out := bytes.Buffer{}
244-
if err := p.execCmd(&out, "pulp", args...); err != nil {
258+
if err := p.retryPulpCmd(args, &out); err != nil {
245259
return "", err
246260
}
247261

@@ -326,3 +340,59 @@ func parseRPMInfo(out io.Reader) (rpmInfo, error) {
326340

327341
return info, nil
328342
}
343+
344+
type retryCommand struct {
345+
cmd string
346+
args []string
347+
backoff backoff.Backoff
348+
out io.Writer
349+
retryableErrs []*regexp.Regexp
350+
exec shExec
351+
}
352+
353+
func newRetryCommand(cmd string, args []string, retryableErrs []*regexp.Regexp, out io.Writer, exec shExec) *retryCommand {
354+
return &retryCommand{
355+
cmd: cmd,
356+
args: args,
357+
backoff: backoff.Backoff{
358+
Min: time.Second,
359+
Max: 5 * time.Second,
360+
},
361+
out: out,
362+
retryableErrs: retryableErrs,
363+
exec: exec,
364+
}
365+
}
366+
367+
func (c *retryCommand) run() error {
368+
for i := range 5 {
369+
slog.Info("attempting to run command", "attempt", i+1, "command", c.cmd, "args", c.args)
370+
371+
outBuf, errBuf := bytes.Buffer{}, bytes.Buffer{}
372+
stdout := io.MultiWriter(&outBuf, os.Stdout)
373+
stderr := io.MultiWriter(&errBuf, os.Stderr)
374+
375+
_, err := c.exec(nil, stdout, stderr, c.cmd, c.args...)
376+
377+
if err == nil {
378+
_, _ = io.Copy(c.out, &outBuf)
379+
return nil
380+
}
381+
if c.isRetryable(errBuf.String()) {
382+
time.Sleep(c.backoff.Duration())
383+
continue
384+
}
385+
return fmt.Errorf("execution of command (%s %s) failed: %s", c.cmd, c.args, errBuf.String())
386+
}
387+
388+
return fmt.Errorf("execution of command (%s %s) failed after 5 retries ", c.cmd, c.args)
389+
}
390+
391+
func (c *retryCommand) isRetryable(stderr string) bool {
392+
for _, re := range c.retryableErrs {
393+
if re.MatchString(stderr) {
394+
return true
395+
}
396+
}
397+
return false
398+
}

magefiles/pulp/push_test.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ package pulp
55
import (
66
"fmt"
77
"io"
8+
"regexp"
89
"strings"
910
"testing"
11+
"time"
1012

13+
"github.com/jpillora/backoff"
1114
"github.com/stretchr/testify/require"
1215
)
1316

@@ -193,6 +196,161 @@ Description : GitLab Runner
193196
}
194197
}
195198

199+
func TestRetryCommandRun(t *testing.T) {
200+
tests := map[string]struct {
201+
execBehavior func(attempt int) (bool, string, error) // returns (success, stderr, error )
202+
retryableErrs []*regexp.Regexp
203+
expectedError bool
204+
errorContains string
205+
expectedAttempt int
206+
}{
207+
"successful on first attempt": {
208+
execBehavior: func(attempt int) (bool, string, error) {
209+
return true, "", nil
210+
},
211+
retryableErrs: []*regexp.Regexp{},
212+
expectedError: false,
213+
expectedAttempt: 1,
214+
},
215+
"successful on second attempt with retryable error": {
216+
execBehavior: func(attempt int) (bool, string, error) {
217+
if attempt == 1 {
218+
return false, "Artifact with checksum of 'abc123' already exists.", fmt.Errorf("artifact error")
219+
}
220+
return true, "", nil
221+
},
222+
retryableErrs: []*regexp.Regexp{
223+
regexp.MustCompile(`Artifact with checksum of '.*' already exists\.`),
224+
},
225+
expectedError: false,
226+
expectedAttempt: 2,
227+
},
228+
"successful on third attempt with retryable error": {
229+
execBehavior: func(attempt int) (bool, string, error) {
230+
if attempt <= 2 {
231+
return false, "Artifact with checksum of 'xyz789' already exists.", fmt.Errorf("artifact error")
232+
}
233+
return true, "", nil
234+
},
235+
retryableErrs: []*regexp.Regexp{
236+
regexp.MustCompile(`Artifact with checksum of '.*' already exists\.`),
237+
},
238+
expectedError: false,
239+
expectedAttempt: 3,
240+
},
241+
"fails with non-retryable error on first attempt": {
242+
execBehavior: func(attempt int) (bool, string, error) {
243+
return false, "Permission denied: cannot access repository", fmt.Errorf("permission denied")
244+
},
245+
retryableErrs: []*regexp.Regexp{
246+
regexp.MustCompile(`Artifact with checksum of '.*' already exists\.`),
247+
},
248+
expectedError: true,
249+
errorContains: "Permission denied",
250+
expectedAttempt: 1,
251+
},
252+
"fails after max retries with retryable error": {
253+
execBehavior: func(attempt int) (bool, string, error) {
254+
return false, "Artifact with checksum of 'def456' already exists.", fmt.Errorf("artifact error")
255+
},
256+
retryableErrs: []*regexp.Regexp{
257+
regexp.MustCompile(`Artifact with checksum of '.*' already exists\.`),
258+
},
259+
expectedError: true,
260+
errorContains: "failed after 5 retries",
261+
expectedAttempt: 5,
262+
},
263+
"multiple retryable error patterns": {
264+
execBehavior: func(attempt int) (bool, string, error) {
265+
if attempt == 1 {
266+
return false, "Connection timeout: server not responding", fmt.Errorf("timeout")
267+
}
268+
if attempt == 2 {
269+
return false, "Artifact with checksum of 'ghi012' already exists.", fmt.Errorf("artifact error")
270+
}
271+
return true, "", nil
272+
},
273+
retryableErrs: []*regexp.Regexp{
274+
regexp.MustCompile(`Artifact with checksum of '.*' already exists\.`),
275+
regexp.MustCompile(`Connection timeout:.*`),
276+
},
277+
expectedError: false,
278+
expectedAttempt: 3,
279+
},
280+
"retryable error on last attempt succeeds": {
281+
execBehavior: func(attempt int) (bool, string, error) {
282+
if attempt < 5 {
283+
return false, "Artifact with checksum of 'jkl345' already exists.", fmt.Errorf("artifact error")
284+
}
285+
return true, "", nil
286+
},
287+
retryableErrs: []*regexp.Regexp{
288+
regexp.MustCompile(`Artifact with checksum of '.*' already exists\.`),
289+
},
290+
expectedError: false,
291+
expectedAttempt: 5,
292+
},
293+
"no retryable errors configured": {
294+
execBehavior: func(attempt int) (bool, string, error) {
295+
return false, "Some error message", fmt.Errorf("some error")
296+
},
297+
retryableErrs: []*regexp.Regexp{},
298+
expectedError: true,
299+
errorContains: "Some error message",
300+
expectedAttempt: 1,
301+
},
302+
"empty stderr with error": {
303+
execBehavior: func(attempt int) (bool, string, error) {
304+
return false, "", fmt.Errorf("command failed")
305+
},
306+
retryableErrs: []*regexp.Regexp{
307+
regexp.MustCompile(`Artifact with checksum of '.*' already exists\.`),
308+
},
309+
expectedError: true,
310+
errorContains: "execution of command",
311+
expectedAttempt: 1,
312+
},
313+
}
314+
315+
for tn, tt := range tests {
316+
t.Run(tn, func(t *testing.T) {
317+
attempt := 0
318+
319+
// Create mock exec function that tracks attempts
320+
execMock := func(env map[string]string, out io.Writer, stderr io.Writer, cmd string, args ...string) (bool, error) {
321+
attempt++
322+
success, stderrMsg, err := tt.execBehavior(attempt)
323+
324+
if stderrMsg != "" {
325+
_, _ = io.WriteString(stderr, stderrMsg)
326+
}
327+
328+
return success, err
329+
}
330+
331+
// Create retryCommand with mocked exec
332+
cmd := newRetryCommand("test-cmd", []string{"arg1", "arg2"}, tt.retryableErrs, io.Discard, execMock)
333+
// make it a bit faster
334+
cmd.backoff = backoff.Backoff{Min: 10 * time.Millisecond, Max: 50 * time.Millisecond}
335+
336+
// Run the command
337+
err := cmd.run()
338+
339+
// Verify results
340+
if tt.expectedError {
341+
require.Error(t, err)
342+
if tt.errorContains != "" {
343+
require.Contains(t, err.Error(), tt.errorContains)
344+
}
345+
} else {
346+
require.NoError(t, err)
347+
}
348+
349+
require.Equal(t, tt.expectedAttempt, attempt, "expected %d attempts, got %d", tt.expectedAttempt, attempt)
350+
})
351+
}
352+
}
353+
196354
func TestRpmPusherPush(t *testing.T) {
197355
tests := map[string]struct {
198356
releases []string

0 commit comments

Comments
 (0)