Skip to content

Commit 361af00

Browse files
authored
Handle rate limit headers in client, adjust request throttling (#61)
* improve deploymentrecord client to support rate limiting backoffs provided in headers Signed-off-by: Eric Pickard <piceri@github.com> * fix typo Signed-off-by: Eric Pickard <piceri@github.com> * add user agent Signed-off-by: Eric Pickard <piceri@github.com> * rename existing rateLimiter to requestThrottler and reduce throughput limits to match github secondary rate limits Signed-off-by: Eric Pickard <piceri@github.com> * switch to atomic rate limit deadline Signed-off-by: Eric Pickard <piceri@github.com> * address comments Signed-off-by: Eric Pickard <piceri@github.com> * remove Retry-After max check Signed-off-by: Eric Pickard <piceri@github.com> --------- Signed-off-by: Eric Pickard <piceri@github.com>
1 parent 6dc89ca commit 361af00

File tree

2 files changed

+249
-38
lines changed

2 files changed

+249
-38
lines changed

pkg/deploymentrecord/client.go

Lines changed: 134 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"regexp"
1515
"strconv"
1616
"strings"
17+
"sync/atomic"
1718
"time"
1819

1920
"github.com/bradleyfalzon/ghinstallation/v2"
@@ -30,13 +31,16 @@ var validOrgPattern = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`)
3031

3132
// Client is an API client for posting deployment records.
3233
type Client struct {
33-
baseURL string
34-
org string
35-
httpClient *http.Client
36-
retries int
37-
apiToken string
38-
transport *ghinstallation.Transport
39-
rateLimiter *rate.Limiter
34+
baseURL string
35+
org string
36+
httpClient *http.Client
37+
retries int
38+
apiToken string
39+
transport *ghinstallation.Transport
40+
requestThrottler *rate.Limiter
41+
42+
// rateLimitDeadline is a UnixNano timestamp shared across workers.
43+
rateLimitDeadline atomic.Int64
4044
}
4145

4246
// NewClient creates a new API client with the given base URL and
@@ -70,8 +74,8 @@ func NewClient(baseURL, org string, opts ...ClientOption) (*Client, error) {
7074
Timeout: 5 * time.Second,
7175
},
7276
retries: 3,
73-
// 20 req/sec with burst of 50
74-
rateLimiter: rate.NewLimiter(rate.Limit(20), 50),
77+
// 3 req/sec (180 req/min) with burst of 20
78+
requestThrottler: rate.NewLimiter(rate.Limit(3), 20),
7579
}
7680

7781
for _, opt := range opts {
@@ -140,10 +144,10 @@ func WithGHApp(id, installID string, pkBytes []byte, pkPath string) ClientOption
140144
}
141145
}
142146

143-
// WithRateLimiter sets a custom rate limiter for API calls.
144-
func WithRateLimiter(rps float64, burst int) ClientOption {
147+
// WithRequestThrottler sets a custom rate limiter for API calls.
148+
func WithRequestThrottler(rps float64, burst int) ClientOption {
145149
return func(c *Client) {
146-
c.rateLimiter = rate.NewLimiter(rate.Limit(rps), burst)
150+
c.requestThrottler = rate.NewLimiter(rate.Limit(rps), burst)
147151
}
148152
}
149153

@@ -180,11 +184,6 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
180184
return errors.New("record cannot be nil")
181185
}
182186

183-
// Wait for rate limiter
184-
if err := c.rateLimiter.Wait(ctx); err != nil {
185-
return fmt.Errorf("rate limiter wait failed: %w", err)
186-
}
187-
188187
url := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record", c.baseURL, c.org)
189188

190189
body, err := json.Marshal(record)
@@ -197,23 +196,16 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
197196
var lastErr error
198197
// The first attempt is not a retry!
199198
for attempt := range c.retries + 1 {
200-
if attempt > 0 {
201-
backoff := time.Duration(math.Pow(2,
202-
float64(attempt))) * 100 * time.Millisecond
203-
//nolint:gosec
204-
jitter := time.Duration(rand.Int64N(50)) * time.Millisecond
205-
delay := backoff + jitter
206-
207-
if delay > 5*time.Second {
208-
delay = 5 * time.Second
209-
}
199+
if err = waitForBackoff(ctx, attempt); err != nil {
200+
return err
201+
}
210202

211-
// Wait with context cancellation support
212-
select {
213-
case <-time.After(delay):
214-
case <-ctx.Done():
215-
return fmt.Errorf("context cancelled during retry backoff: %w", ctx.Err())
216-
}
203+
if err = c.waitForServerRateLimit(ctx); err != nil {
204+
return err
205+
}
206+
207+
if err = c.requestThrottler.Wait(ctx); err != nil {
208+
return fmt.Errorf("request throttler wait failed: %w", err)
217209
}
218210

219211
// Reset reader position for retries
@@ -236,6 +228,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
236228
} else if c.apiToken != "" {
237229
req.Header.Set("Authorization", "Bearer "+c.apiToken)
238230
}
231+
req.Header.Set("User-Agent", "GitHub-Deployment-Tracker")
239232

240233
start := time.Now()
241234
// nolint: gosec
@@ -268,7 +261,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
268261

269262
switch {
270263
case resp.StatusCode == 404:
271-
// No artifact found
264+
// No artifact found - do not retry
272265
dtmetrics.PostDeploymentRecordNoAttestation.Inc()
273266
slog.Debug("no artifact attestation found, no record created",
274267
"attempt", attempt,
@@ -279,14 +272,17 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
279272
)
280273
return &NoArtifactError{err: fmt.Errorf("no attestation found for %s", record.Digest)}
281274
case resp.StatusCode >= 400 && resp.StatusCode < 500:
282-
if resp.Header.Get("retry-after") != "" || resp.Header.Get("x-ratelimit-remaining") == "0" {
283-
// Rate limited — retry with backoff
284-
// Could be 403 or 429
275+
// Check headers that indicate rate limiting
276+
if resp.Header.Get("Retry-After") != "" || resp.Header.Get("X-Ratelimit-Remaining") == "0" {
277+
retryDelay := parseRateLimitDelay(resp)
278+
c.setRetryAfter(retryDelay)
285279
dtmetrics.PostDeploymentRecordRateLimited.Inc()
286280
slog.Warn("rate limited, retrying",
287281
"attempt", attempt,
288282
"status_code", resp.StatusCode,
289-
"retry_after", resp.Header.Get("Retry-After"),
283+
"retry-after", resp.Header.Get("Retry-After"),
284+
"x-ratelimit-remaining", resp.Header.Get("X-Ratelimit-Remaining"),
285+
"retry_delay", retryDelay.Seconds(),
290286
"container_name", record.Name,
291287
"resp_msg", string(respBody),
292288
)
@@ -323,3 +319,103 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error {
323319
)
324320
return fmt.Errorf("all retries exhausted: %w", lastErr)
325321
}
322+
323+
// waitForServerRateLimit blocks until the global server rate limit backoff has elapsed.
324+
// All workers sharing this client observe the same deadline.
325+
func (c *Client) waitForServerRateLimit(ctx context.Context) error {
326+
deadline := c.rateLimitDeadline.Load()
327+
delay := time.Until(time.Unix(0, deadline))
328+
if delay <= 0 {
329+
return nil
330+
}
331+
332+
slog.Info("waiting for server rate limit backoff",
333+
"delay", delay.Round(time.Millisecond),
334+
)
335+
336+
timer := time.NewTimer(delay)
337+
defer timer.Stop()
338+
339+
select {
340+
case <-timer.C:
341+
return nil
342+
case <-ctx.Done():
343+
return fmt.Errorf("context cancelled during server rate limit wait: %w", ctx.Err())
344+
}
345+
}
346+
347+
// setRetryAfter records a global backoff deadline.
348+
// Ensures deadline can only be extended, not shortened.
349+
func (c *Client) setRetryAfter(d time.Duration) {
350+
newDeadline := time.Now().Add(d).UnixNano()
351+
for {
352+
current := c.rateLimitDeadline.Load()
353+
if newDeadline <= current {
354+
return
355+
}
356+
if c.rateLimitDeadline.CompareAndSwap(current, newDeadline) {
357+
return
358+
}
359+
}
360+
}
361+
362+
// parseRateLimitDelay extracts the backoff duration from a rate-limit response:
363+
// Return largest delay from header options.
364+
// If no headers are set, default to 1 minute.
365+
func parseRateLimitDelay(resp *http.Response) time.Duration {
366+
// GitHub docs show Retry-After header will always be an int
367+
var retryAfterDelay *time.Duration
368+
if ra := resp.Header.Get("Retry-After"); ra != "" {
369+
if seconds, err := strconv.Atoi(ra); err == nil {
370+
rad := time.Duration(seconds) * time.Second
371+
retryAfterDelay = &rad
372+
}
373+
}
374+
375+
var rateLimitResetDelay *time.Duration
376+
if resp.Header.Get("X-Ratelimit-Remaining") == "0" {
377+
if resetStr := resp.Header.Get("X-Ratelimit-Reset"); resetStr != "" {
378+
if epoch, err := strconv.ParseInt(resetStr, 10, 64); err == nil {
379+
if d := time.Until(time.Unix(epoch, 0)); d > 0 {
380+
rateLimitResetDelay = &d
381+
}
382+
}
383+
}
384+
}
385+
386+
switch {
387+
case retryAfterDelay != nil && rateLimitResetDelay != nil:
388+
return max(*retryAfterDelay, *rateLimitResetDelay)
389+
case retryAfterDelay != nil:
390+
return *retryAfterDelay
391+
case rateLimitResetDelay != nil:
392+
return *rateLimitResetDelay
393+
default:
394+
return time.Minute
395+
}
396+
}
397+
398+
func waitForBackoff(ctx context.Context, attempt int) error {
399+
if attempt > 0 {
400+
backoff := time.Duration(math.Pow(2,
401+
float64(attempt))) * 100 * time.Millisecond
402+
//nolint:gosec
403+
jitter := time.Duration(rand.Int64N(50)) * time.Millisecond
404+
delay := backoff + jitter
405+
406+
if delay > 5*time.Second {
407+
delay = 5 * time.Second
408+
}
409+
410+
// Wait with context cancellation support
411+
timer := time.NewTimer(delay)
412+
defer timer.Stop()
413+
414+
select {
415+
case <-timer.C:
416+
case <-ctx.Done():
417+
return fmt.Errorf("context cancelled during retry backoff: %w", ctx.Err())
418+
}
419+
}
420+
return nil
421+
}

pkg/deploymentrecord/client_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"errors"
66
"net/http"
77
"net/http/httptest"
8+
"strconv"
89
"strings"
10+
"sync"
911
"sync/atomic"
1012
"testing"
1113
"time"
@@ -421,6 +423,7 @@ func TestPostOne(t *testing.T) {
421423
retries: 1,
422424
handler: func(w http.ResponseWriter, _ *http.Request) {
423425
w.Header().Set("X-Ratelimit-Remaining", "0")
426+
w.Header().Set("X-Ratelimit-Reset", strconv.FormatInt(time.Now().Add(1*time.Second).Unix(), 10))
424427
w.WriteHeader(http.StatusForbidden)
425428
},
426429
wantErr: true,
@@ -604,3 +607,115 @@ func TestPostOneSendsCorrectRequest(t *testing.T) {
604607
t.Fatalf("unexpected error: %v", err)
605608
}
606609
}
610+
611+
func TestParseRateLimitDelay(t *testing.T) {
612+
tests := []struct {
613+
name string
614+
headers http.Header
615+
wantMin time.Duration
616+
wantMax time.Duration
617+
}{
618+
{
619+
name: "Retry-After in seconds",
620+
headers: http.Header{"Retry-After": []string{"5"}},
621+
wantMin: 5 * time.Second,
622+
wantMax: 5 * time.Second,
623+
},
624+
{
625+
name: "Retry-After zero seconds",
626+
headers: http.Header{"Retry-After": []string{"0"}},
627+
wantMin: 0,
628+
wantMax: 0,
629+
},
630+
{
631+
name: "X-Ratelimit-Remaining 0 with reset",
632+
headers: http.Header{
633+
"X-Ratelimit-Remaining": []string{"0"},
634+
"X-Ratelimit-Reset": []string{strconv.FormatInt(time.Now().Add(10*time.Second).Unix(), 10)},
635+
},
636+
wantMin: 9 * time.Second,
637+
wantMax: 11 * time.Second,
638+
},
639+
{
640+
name: "no relevant headers defaults to 1 minute",
641+
headers: http.Header{},
642+
wantMin: time.Minute,
643+
wantMax: time.Minute,
644+
},
645+
{
646+
name: "Largest delay takes precedence",
647+
headers: http.Header{
648+
"Retry-After": []string{"3"},
649+
"X-Ratelimit-Remaining": []string{"0"},
650+
"X-Ratelimit-Reset": []string{strconv.FormatInt(time.Now().Add(60*time.Second).Unix(), 10)},
651+
},
652+
wantMin: 59 * time.Second,
653+
wantMax: 61 * time.Second,
654+
},
655+
}
656+
657+
for _, tt := range tests {
658+
t.Run(tt.name, func(t *testing.T) {
659+
resp := &http.Response{Header: tt.headers}
660+
result := parseRateLimitDelay(resp)
661+
if result < tt.wantMin || result > tt.wantMax {
662+
t.Errorf("parseRateLimitDelay() = %v, want between %v and %v", result, tt.wantMin, tt.wantMax)
663+
}
664+
})
665+
}
666+
}
667+
668+
func TestPostOneRespectsRetryAfterAcrossGoroutines(t *testing.T) {
669+
var reqCount atomic.Int32
670+
firstReqDone := make(chan struct{})
671+
672+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
673+
count := reqCount.Add(1)
674+
if count == 1 {
675+
w.Header().Set("Retry-After", "2")
676+
w.WriteHeader(http.StatusTooManyRequests)
677+
close(firstReqDone)
678+
return
679+
}
680+
w.WriteHeader(http.StatusOK)
681+
}))
682+
t.Cleanup(srv.Close)
683+
684+
client, err := NewClient(srv.URL, "test-org", WithRetries(2))
685+
if err != nil {
686+
t.Fatalf("failed to create client: %v", err)
687+
}
688+
689+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
690+
t.Cleanup(cancel)
691+
692+
var wg sync.WaitGroup
693+
694+
// Goroutine 1: triggers the rate limit
695+
wg.Go(func() {
696+
if err := client.PostOne(ctx, testRecord()); err != nil {
697+
t.Errorf("goroutine 1 error: %v", err)
698+
}
699+
})
700+
701+
// Wait for the rate limit to be received and backoff set
702+
<-firstReqDone
703+
time.Sleep(50 * time.Millisecond)
704+
705+
// Goroutine 2: must observe the shared backoff
706+
secondReqDone := make(chan struct{})
707+
start := time.Now()
708+
wg.Go(func() {
709+
defer close(secondReqDone)
710+
if err := client.PostOne(ctx, testRecord()); err != nil {
711+
t.Errorf("goroutine 2 error: %v", err)
712+
}
713+
})
714+
// Measure only goroutine 2's duration
715+
<-secondReqDone
716+
elapsed := time.Since(start)
717+
if elapsed < 1500*time.Millisecond {
718+
t.Errorf("goroutine 2 should have waited for retry-after, but only waited %v", elapsed)
719+
}
720+
wg.Wait()
721+
}

0 commit comments

Comments
 (0)