Skip to content

Commit f75e12e

Browse files
worstellampcode-com
andcommitted
feat(git): add per-client clone rate limiting
Add a per-client-IP concurrent clone limiter to prevent a single client from monopolizing all clone capacity. When a client exceeds the configured limit, requests are rejected with 429 Too Many Requests and a Retry-After header. Configurable via max-clones-per-client in the git strategy config block. Defaults to 0 (disabled). The tracker counts in-flight clones per client IP and releases slots when clone jobs complete. New metric cachew.git.clone_rejections_total (by client IP) provides observability into rate-limited requests. Co-authored-by: Amp <amp@ampcode.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d4a57-1477-707c-bb89-5543fddff0e7
1 parent 88105a0 commit f75e12e

4 files changed

Lines changed: 221 additions & 9 deletions

File tree

internal/strategy/git/git.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ type Config struct {
3939
MirrorSnapshotInterval time.Duration `hcl:"mirror-snapshot-interval,optional" help:"How often to generate mirror snapshots for pod bootstrap. 0 uses snapshot-interval. Defaults to 2h." default:"2h"`
4040
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"`
4141
// ServerURL is embedded as remote.origin.url in snapshots so git pull goes through cachew.
42-
ServerURL string `hcl:"server-url,optional" help:"Base URL of this cachew instance, embedded in snapshot remote URLs." default:"${CACHEW_URL}"`
43-
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression (0 = all CPU cores)." default:"0"`
42+
ServerURL string `hcl:"server-url,optional" help:"Base URL of this cachew instance, embedded in snapshot remote URLs." default:"${CACHEW_URL}"`
43+
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression (0 = all CPU cores)." default:"0"`
44+
MaxClonesPerClient int `hcl:"max-clones-per-client,optional" help:"Max concurrent clone triggers per client IP. 0 disables per-client limiting." default:"0"`
4445
}
4546

4647
type Strategy struct {
@@ -59,6 +60,7 @@ type Strategy struct {
5960
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
6061
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
6162
metrics *gitMetrics
63+
cloneTracker *ClientCloneTracker // nil when per-client limiting is disabled
6264
}
6365

6466
func New(
@@ -125,6 +127,9 @@ func New(
125127
tokenManager: tokenManager,
126128
metrics: m,
127129
}
130+
if config.MaxClonesPerClient > 0 {
131+
s.cloneTracker = NewClientCloneTracker(config.MaxClonesPerClient)
132+
}
128133
s.config.ServerURL = strings.TrimRight(config.ServerURL, "/")
129134

130135
if err := s.warmExistingRepos(ctx); err != nil {
@@ -274,10 +279,9 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
274279

275280
case gitclone.StateCloning, gitclone.StateEmpty:
276281
if state == gitclone.StateEmpty {
277-
logger.DebugContext(ctx, "Starting background clone, forwarding to upstream")
278-
s.scheduler.Submit(repo.UpstreamURL(), "clone", func(ctx context.Context) error {
279-
return s.startClone(ctx, repo)
280-
})
282+
if rejected := s.submitClone(w, r, repo); rejected {
283+
return
284+
}
281285
}
282286
if err := s.serveWithSpool(w, r, host, pathValue, upstreamURL); err != nil {
283287
logger.WarnContext(ctx, "Spool failed, forwarding to upstream", "error", err)
@@ -286,6 +290,36 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
286290
}
287291
}
288292

293+
// submitClone submits a background clone job for the given repo, applying
294+
// per-client rate limiting when configured. Returns true if the request was
295+
// rejected (429 already written to w), false if the clone was submitted.
296+
func (s *Strategy) submitClone(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository) bool {
297+
ctx := r.Context()
298+
logger := logging.FromContext(ctx)
299+
300+
if s.cloneTracker != nil {
301+
ip := ClientIP(r)
302+
release, ok := s.cloneTracker.TryAcquire(ip)
303+
if !ok {
304+
logger.WarnContext(ctx, "Per-client clone limit reached", "client", ip, "upstream", repo.UpstreamURL())
305+
s.metrics.recordCloneRejection(ctx, ip)
306+
w.Header().Set("Retry-After", "30")
307+
http.Error(w, "Too many concurrent clone requests", http.StatusTooManyRequests)
308+
return true
309+
}
310+
s.scheduler.Submit(repo.UpstreamURL(), "clone", func(ctx context.Context) error {
311+
defer release()
312+
return s.startClone(ctx, repo)
313+
})
314+
} else {
315+
s.scheduler.Submit(repo.UpstreamURL(), "clone", func(ctx context.Context) error {
316+
return s.startClone(ctx, repo)
317+
})
318+
}
319+
logger.DebugContext(ctx, "Starting background clone, forwarding to upstream")
320+
return false
321+
}
322+
289323
func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, host, pathValue string, isInfoRefs bool) error {
290324
ctx := r.Context()
291325

internal/strategy/git/metrics.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ import (
1111
)
1212

1313
type gitMetrics struct {
14-
operationDuration metric.Float64Histogram
15-
operationTotal metric.Int64Counter
16-
requestTotal metric.Int64Counter
14+
operationDuration metric.Float64Histogram
15+
operationTotal metric.Int64Counter
16+
requestTotal metric.Int64Counter
17+
cloneRejectionsTotal metric.Int64Counter
1718
}
1819

1920
func newGitMetrics() (*gitMetrics, error) {
@@ -39,6 +40,12 @@ func newGitMetrics() (*gitMetrics, error) {
3940
return nil, errors.Wrap(err, "create requests_total counter")
4041
}
4142

43+
if m.cloneRejectionsTotal, err = meter.Int64Counter("cachew.git.clone_rejections_total",
44+
metric.WithDescription("Clone triggers rejected by per-client rate limiting"),
45+
metric.WithUnit("{rejections}")); err != nil {
46+
return nil, errors.Wrap(err, "create clone_rejections_total counter")
47+
}
48+
4249
return m, nil
4350
}
4451

@@ -55,3 +62,7 @@ func (m *gitMetrics) recordOperation(ctx context.Context, operation, status stri
5562
func (m *gitMetrics) recordRequest(ctx context.Context, requestType string) {
5663
m.requestTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("type", requestType)))
5764
}
65+
66+
func (m *gitMetrics) recordCloneRejection(ctx context.Context, clientIP string) {
67+
m.cloneRejectionsTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("client", clientIP)))
68+
}

internal/strategy/git/ratelimit.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package git
2+
3+
import (
4+
"net"
5+
"net/http"
6+
"sync"
7+
"sync/atomic"
8+
)
9+
10+
// ClientCloneTracker limits how many concurrent clone operations a single
11+
// client IP can trigger. When a client exceeds the limit, the request is
12+
// rejected with 429 Too Many Requests.
13+
type ClientCloneTracker struct {
14+
mu sync.Mutex
15+
inflight map[string]*atomic.Int32
16+
limit int
17+
}
18+
19+
// NewClientCloneTracker creates a tracker that allows at most limit concurrent
20+
// clone operations per client IP.
21+
func NewClientCloneTracker(limit int) *ClientCloneTracker {
22+
return &ClientCloneTracker{
23+
inflight: make(map[string]*atomic.Int32),
24+
limit: limit,
25+
}
26+
}
27+
28+
// TryAcquire attempts to reserve a clone slot for the given client IP.
29+
// Returns true and a release function if the client is under the limit.
30+
// Returns false if the client has reached the limit.
31+
func (t *ClientCloneTracker) TryAcquire(clientIP string) (release func(), ok bool) {
32+
t.mu.Lock()
33+
counter, exists := t.inflight[clientIP]
34+
if !exists {
35+
counter = &atomic.Int32{}
36+
t.inflight[clientIP] = counter
37+
}
38+
t.mu.Unlock()
39+
40+
if int(counter.Load()) >= t.limit {
41+
return nil, false
42+
}
43+
counter.Add(1)
44+
45+
var once sync.Once
46+
return func() {
47+
once.Do(func() {
48+
if counter.Add(-1) <= 0 {
49+
t.mu.Lock()
50+
// Re-check under lock to avoid racing with another acquire.
51+
if counter.Load() <= 0 {
52+
delete(t.inflight, clientIP)
53+
}
54+
t.mu.Unlock()
55+
}
56+
})
57+
}, true
58+
}
59+
60+
// ClientIP extracts the IP address from an HTTP request, stripping the port.
61+
func ClientIP(r *http.Request) string {
62+
host, _, err := net.SplitHostPort(r.RemoteAddr)
63+
if err != nil {
64+
return r.RemoteAddr
65+
}
66+
return host
67+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package git_test
2+
3+
import (
4+
"net/http"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
9+
"github.com/alecthomas/assert/v2"
10+
11+
"github.com/block/cachew/internal/strategy/git"
12+
)
13+
14+
func TestClientCloneTrackerBasic(t *testing.T) {
15+
tracker := git.NewClientCloneTracker(2)
16+
17+
r1, ok := tracker.TryAcquire("10.0.0.1")
18+
assert.True(t, ok)
19+
20+
r2, ok := tracker.TryAcquire("10.0.0.1")
21+
assert.True(t, ok)
22+
23+
// Third should be rejected.
24+
_, ok = tracker.TryAcquire("10.0.0.1")
25+
assert.False(t, ok)
26+
27+
// Different client should be fine.
28+
r3, ok := tracker.TryAcquire("10.0.0.2")
29+
assert.True(t, ok)
30+
31+
// Release one slot for 10.0.0.1.
32+
r1()
33+
34+
r4, ok := tracker.TryAcquire("10.0.0.1")
35+
assert.True(t, ok)
36+
37+
r2()
38+
r3()
39+
r4()
40+
}
41+
42+
func TestClientCloneTrackerReleaseIdempotent(t *testing.T) {
43+
tracker := git.NewClientCloneTracker(1)
44+
45+
release, ok := tracker.TryAcquire("10.0.0.1")
46+
assert.True(t, ok)
47+
48+
// Double-release should not corrupt the counter.
49+
release()
50+
release()
51+
52+
r2, ok := tracker.TryAcquire("10.0.0.1")
53+
assert.True(t, ok)
54+
r2()
55+
}
56+
57+
func TestClientCloneTrackerConcurrent(t *testing.T) {
58+
tracker := git.NewClientCloneTracker(3)
59+
60+
// Hold 3 slots so all subsequent acquires must be rejected.
61+
releases := make([]func(), 3)
62+
for i := range 3 {
63+
r, ok := tracker.TryAcquire("10.0.0.1")
64+
assert.True(t, ok)
65+
releases[i] = r
66+
}
67+
68+
var rejected atomic.Int32
69+
var wg sync.WaitGroup
70+
for range 20 {
71+
wg.Go(func() {
72+
_, ok := tracker.TryAcquire("10.0.0.1")
73+
if !ok {
74+
rejected.Add(1)
75+
}
76+
})
77+
}
78+
79+
wg.Wait()
80+
assert.Equal(t, int32(20), rejected.Load(), "all attempts should be rejected while slots are held")
81+
82+
for _, r := range releases {
83+
r()
84+
}
85+
}
86+
87+
func TestClientIP(t *testing.T) {
88+
tests := []struct {
89+
remoteAddr string
90+
want string
91+
}{
92+
{"192.168.1.1:12345", "192.168.1.1"},
93+
{"[::1]:8080", "::1"},
94+
{"192.168.1.1", "192.168.1.1"},
95+
}
96+
for _, tt := range tests {
97+
r := &http.Request{RemoteAddr: tt.remoteAddr}
98+
assert.Equal(t, tt.want, git.ClientIP(r), "ClientIP(%q)", tt.remoteAddr)
99+
}
100+
}

0 commit comments

Comments
 (0)