Skip to content

Commit 2c377e2

Browse files
committed
improve deploymentrecord client to support rate limiting backoffs provided in headers
Signed-off-by: Eric Pickard <piceri@github.com>
1 parent 6dc89ca commit 2c377e2

File tree

2 files changed

+226
-21
lines changed

2 files changed

+226
-21
lines changed

pkg/deploymentrecord/client.go

Lines changed: 109 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"regexp"
1515
"strconv"
1616
"strings"
17+
"sync"
1718
"time"
1819

1920
"github.com/bradleyfalzon/ghinstallation/v2"
@@ -37,6 +38,10 @@ type Client struct {
3738
apiToken string
3839
transport *ghinstallation.Transport
3940
rateLimiter *rate.Limiter
41+
42+
// rateLimitDelay is shared across workers
43+
rateLimitDelayMu sync.Mutex
44+
rateLimitDelay time.Time
4045
}
4146

4247
// NewClient creates a new API client with the given base URL and
@@ -197,23 +202,12 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
197202
var lastErr error
198203
// The first attempt is not a retry!
199204
for attempt := range c.retries + 1 {
200-
if attempt > 0 {
201-
backoff := time.Duration(math.Pow(2,
202-
float64(attempt))) * 100 * time.Millisecond
203-
//nolint:gosec
204-
jitter := time.Duration(rand.Int64N(50)) * time.Millisecond
205-
delay := backoff + jitter
206-
207-
if delay > 5*time.Second {
208-
delay = 5 * time.Second
209-
}
205+
if err = waitForBackoff(ctx, attempt); err != nil {
206+
return err
207+
}
210208

211-
// Wait with context cancellation support
212-
select {
213-
case <-time.After(delay):
214-
case <-ctx.Done():
215-
return fmt.Errorf("context cancelled during retry backoff: %w", ctx.Err())
216-
}
209+
if err = c.waitForSecondaryRateLimit(ctx); err != nil {
210+
return err
217211
}
218212

219213
// Reset reader position for retries
@@ -268,7 +262,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
268262

269263
switch {
270264
case resp.StatusCode == 404:
271-
// No artifact found
265+
// No artifact found - do not retry
272266
dtmetrics.PostDeploymentRecordNoAttestation.Inc()
273267
slog.Debug("no artifact attestation found, no record created",
274268
"attempt", attempt,
@@ -279,14 +273,15 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
279273
)
280274
return &NoArtifactError{err: fmt.Errorf("no attestation found for %s", record.Digest)}
281275
case resp.StatusCode >= 400 && resp.StatusCode < 500:
282-
if resp.Header.Get("retry-after") != "" || resp.Header.Get("x-ratelimit-remaining") == "0" {
283-
// Rate limited — retry with backoff
284-
// Could be 403 or 429
276+
// Check headers that indicate rate limiting
277+
if resp.Header.Get("Retry-After") != "" || resp.Header.Get("X-Ratelimit-Remaining") == "0" {
278+
retryDelay := parseRateLimitDelay(resp)
279+
c.setRetryAfter(retryDelay)
285280
dtmetrics.PostDeploymentRecordRateLimited.Inc()
286281
slog.Warn("rate limited, retrying",
287282
"attempt", attempt,
288283
"status_code", resp.StatusCode,
289-
"retry_after", resp.Header.Get("Retry-After"),
284+
"retry_delay", retryDelay.Seconds(),
290285
"container_name", record.Name,
291286
"resp_msg", string(respBody),
292287
)
@@ -323,3 +318,96 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
323318
)
324319
return fmt.Errorf("all retries exhausted: %w", lastErr)
325320
}
321+
322+
// waitForSecondaryRateLimit blocks until the global secondary rate limit backoff has elapsed.
323+
// All workers sharing this client observe the same deadline.
324+
func (c *Client) waitForSecondaryRateLimit(ctx context.Context) error {
325+
c.rateLimitDelayMu.Lock()
326+
waitUntil := c.rateLimitDelay
327+
c.rateLimitDelayMu.Unlock()
328+
329+
delay := time.Until(waitUntil)
330+
if delay <= 0 {
331+
return nil
332+
}
333+
334+
slog.Info("waiting for secondary rate limit backoff",
335+
"delay", delay.Round(time.Millisecond),
336+
)
337+
338+
select {
339+
case <-time.After(delay):
340+
return nil
341+
case <-ctx.Done():
342+
return fmt.Errorf("context cancelled during secondary rate limit wait: %w", ctx.Err())
343+
}
344+
}
345+
346+
// setRetryAfter records a global backoff deadline.
347+
// Ensures deadline can only be extended, not shortened.
348+
func (c *Client) setRetryAfter(d time.Duration) {
349+
until := time.Now().Add(d)
350+
c.rateLimitDelayMu.Lock()
351+
defer c.rateLimitDelayMu.Unlock()
352+
if until.After(c.rateLimitDelay) {
353+
c.rateLimitDelay = until
354+
}
355+
}
356+
357+
// parseRateLimitDelay extracts the backoff duration from a rate-limit response:
358+
// Return largest delay from header options.
359+
// If no headers are set, default to 1 minute.
360+
func parseRateLimitDelay(resp *http.Response) time.Duration {
361+
// GitHub docs show Retry-After header will always be an int
362+
var replyAfterDelay *time.Duration
363+
if ra := resp.Header.Get("Retry-After"); ra != "" {
364+
if seconds, err := strconv.Atoi(ra); err == nil {
365+
rad := time.Duration(seconds) * time.Second
366+
replyAfterDelay = &rad
367+
}
368+
}
369+
370+
var rateLimitResetDelay *time.Duration
371+
if resp.Header.Get("X-Ratelimit-Remaining") == "0" {
372+
if resetStr := resp.Header.Get("X-Ratelimit-Reset"); resetStr != "" {
373+
if epoch, err := strconv.ParseInt(resetStr, 10, 64); err == nil {
374+
if d := time.Until(time.Unix(epoch, 0)); d > 0 {
375+
rateLimitResetDelay = &d
376+
}
377+
}
378+
}
379+
}
380+
381+
switch {
382+
case replyAfterDelay != nil && rateLimitResetDelay != nil:
383+
return max(*replyAfterDelay, *rateLimitResetDelay)
384+
case replyAfterDelay != nil:
385+
return *replyAfterDelay
386+
case rateLimitResetDelay != nil:
387+
return *rateLimitResetDelay
388+
default:
389+
return time.Minute
390+
}
391+
}
392+
393+
func waitForBackoff(ctx context.Context, attempt int) error {
394+
if attempt > 0 {
395+
backoff := time.Duration(math.Pow(2,
396+
float64(attempt))) * 100 * time.Millisecond
397+
//nolint:gosec
398+
jitter := time.Duration(rand.Int64N(50)) * time.Millisecond
399+
delay := backoff + jitter
400+
401+
if delay > 5*time.Second {
402+
delay = 5 * time.Second
403+
}
404+
405+
// Wait with context cancellation support
406+
select {
407+
case <-time.After(delay):
408+
case <-ctx.Done():
409+
return fmt.Errorf("context cancelled during retry backoff: %w", ctx.Err())
410+
}
411+
}
412+
return nil
413+
}

pkg/deploymentrecord/client_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"errors"
66
"net/http"
77
"net/http/httptest"
8+
"strconv"
89
"strings"
10+
"sync"
911
"sync/atomic"
1012
"testing"
1113
"time"
@@ -421,6 +423,7 @@ func TestPostOne(t *testing.T) {
421423
retries: 1,
422424
handler: func(w http.ResponseWriter, _ *http.Request) {
423425
w.Header().Set("X-Ratelimit-Remaining", "0")
426+
w.Header().Set("X-Ratelimit-Reset", strconv.FormatInt(time.Now().Add(1*time.Second).Unix(), 10))
424427
w.WriteHeader(http.StatusForbidden)
425428
},
426429
wantErr: true,
@@ -604,3 +607,117 @@ func TestPostOneSendsCorrectRequest(t *testing.T) {
604607
t.Fatalf("unexpected error: %v", err)
605608
}
606609
}
610+
611+
func TestParseRateLimitDelay(t *testing.T) {
612+
tests := []struct {
613+
name string
614+
headers http.Header
615+
wantMin time.Duration
616+
wantMax time.Duration
617+
}{
618+
{
619+
name: "Retry-After in seconds",
620+
headers: http.Header{"Retry-After": []string{"5"}},
621+
wantMin: 5 * time.Second,
622+
wantMax: 5 * time.Second,
623+
},
624+
{
625+
name: "Retry-After zero seconds",
626+
headers: http.Header{"Retry-After": []string{"0"}},
627+
wantMin: 0,
628+
wantMax: 0,
629+
},
630+
{
631+
name: "X-Ratelimit-Remaining 0 with reset",
632+
headers: http.Header{
633+
"X-Ratelimit-Remaining": []string{"0"},
634+
"X-Ratelimit-Reset": []string{strconv.FormatInt(time.Now().Add(10*time.Second).Unix(), 10)},
635+
},
636+
wantMin: 9 * time.Second,
637+
wantMax: 11 * time.Second,
638+
},
639+
{
640+
name: "no relevant headers defaults to 1 minute",
641+
headers: http.Header{},
642+
wantMin: time.Minute,
643+
wantMax: time.Minute,
644+
},
645+
{
646+
name: "Largest delay takes precedence",
647+
headers: http.Header{
648+
"Retry-After": []string{"3"},
649+
"X-Ratelimit-Remaining": []string{"0"},
650+
"X-Ratelimit-Reset": []string{strconv.FormatInt(time.Now().Add(60*time.Second).Unix(), 10)},
651+
},
652+
wantMin: 59 * time.Second,
653+
wantMax: 61 * time.Second,
654+
},
655+
}
656+
657+
for _, tt := range tests {
658+
t.Run(tt.name, func(t *testing.T) {
659+
resp := &http.Response{Header: tt.headers}
660+
result := parseRateLimitDelay(resp)
661+
if result < tt.wantMin || result > tt.wantMax {
662+
t.Errorf("parseRateLimitDelay() = %v, want between %v and %v", result, tt.wantMin, tt.wantMax)
663+
}
664+
})
665+
}
666+
}
667+
668+
func TestPostOneRespectsRetryAfterAcrossGoroutines(t *testing.T) {
669+
var reqCount atomic.Int32
670+
firstReqDone := make(chan struct{})
671+
672+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
673+
count := reqCount.Add(1)
674+
if count == 1 {
675+
w.Header().Set("Retry-After", "2")
676+
w.WriteHeader(http.StatusTooManyRequests)
677+
close(firstReqDone)
678+
return
679+
}
680+
w.WriteHeader(http.StatusOK)
681+
}))
682+
t.Cleanup(srv.Close)
683+
684+
client, err := NewClient(srv.URL, "test-org", WithRetries(2))
685+
if err != nil {
686+
t.Fatalf("failed to create client: %v", err)
687+
}
688+
689+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
690+
t.Cleanup(cancel)
691+
692+
var wg sync.WaitGroup
693+
694+
// Goroutine 1: triggers the rate limit
695+
wg.Add(1)
696+
go func() {
697+
defer wg.Done()
698+
if err := client.PostOne(ctx, testRecord()); err != nil {
699+
t.Errorf("goroutine 1 error: %v", err)
700+
}
701+
}()
702+
703+
// Wait for the rate limit to be received and backoff set
704+
<-firstReqDone
705+
time.Sleep(50 * time.Millisecond)
706+
707+
// Goroutine 2: must observe the shared backoff
708+
start := time.Now()
709+
wg.Add(1)
710+
go func() {
711+
defer wg.Done()
712+
if err := client.PostOne(ctx, testRecord()); err != nil {
713+
t.Errorf("goroutine 2 error: %v", err)
714+
}
715+
}()
716+
717+
wg.Wait()
718+
719+
elapsed := time.Since(start)
720+
if elapsed < 1800*time.Millisecond {
721+
t.Errorf("goroutine 2 should have waited for retry-after, but only waited %v", elapsed)
722+
}
723+
}

0 commit comments

Comments
 (0)