Skip to content

Commit f429e7b

Browse files
worstellampagent
andcommitted
fix(snapshot): bound S3 parallelGet and make snapshot serving range-aware
The S3 full-object download path (parallelGet) let workers read every chunk into memory ahead of a slow consumer, so a single large snapshot served to a slow client could buffer the whole object in the Go heap. Bound it with a window of in-flight chunks (peak now numWorkers x chunkSize). The git snapshot endpoint also ignored client Range headers, so it always served the full object via the unbounded path. Forward Range/If-Range to the cache on the cache-hit path, return 206/Content-Range (416 when not satisfiable), and advertise Accept-Ranges, so clients can fetch with bounded parallel range requests (client.ParallelGet) instead of one full GET. Co-authored-by: Amp <amp@ampcode.com> Amp-Thread-ID: https://ampcode.com/threads/T-019ef6fe-55c6-71e8-96dc-ca3ef4301d36
1 parent 2c806db commit f429e7b

5 files changed

Lines changed: 215 additions & 19 deletions

File tree

internal/cache/s3_parallel_get.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,15 @@ func (c *cancelReadCloser) Close() error {
6969

7070
// parallelGet downloads an S3 object in parallel chunks and writes them in
7171
// order to w. Each worker downloads its chunk into memory so the TCP
72-
// connection stays active at full speed. Peak memory: numWorkers × chunkSize.
73-
// All chunk requests are pinned to the given etag to ensure consistency.
74-
// An errgroup cancels all workers on the first error from any goroutine.
72+
// connection stays active at full speed. All chunk requests are pinned to the
73+
// given etag to ensure consistency. An errgroup cancels all workers on the
74+
// first error from any goroutine.
75+
//
76+
// A window token is acquired before each chunk is fetched and released by the
77+
// writer once that chunk has been consumed, bounding the number of
78+
// downloaded-but-unwritten chunks to numWorkers. Without it, workers race ahead
79+
// of a slow consumer and buffer the whole object in memory. Peak memory is
80+
// therefore numWorkers × chunkSize.
7581
func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size int64, etag string, w io.Writer) error {
7682
chunkSize := int64(s.config.DownloadPartSizeMB) << 20 // #nosec G115 -- DownloadPartSizeMB is a small operator-supplied tuning value.
7783
numChunks := int((size + chunkSize - 1) / chunkSize)
@@ -83,6 +89,10 @@ func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size in
8389
results[i] = make(chan []byte, 1)
8490
}
8591

92+
// Bounds downloaded-but-unwritten chunks to numWorkers: a token is held from
93+
// before a chunk is fetched until the writer has consumed it.
94+
window := make(chan struct{}, numWorkers)
95+
8696
// Work queue of chunk indices.
8797
work := make(chan int, numChunks)
8898
for i := range numChunks {
@@ -97,7 +107,11 @@ func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size in
97107
for range numWorkers {
98108
eg.Go(func() error {
99109
for seq := range work {
100-
if egCtx.Err() != nil {
110+
// Block until the writer has caught up enough to admit another
111+
// in-flight chunk, or the group is cancelled.
112+
select {
113+
case window <- struct{}{}:
114+
case <-egCtx.Done():
101115
return egCtx.Err()
102116
}
103117

@@ -142,6 +156,7 @@ func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size in
142156
if _, err := w.Write(data); err != nil {
143157
return errors.Wrap(err, "write chunk")
144158
}
159+
<-window // Admit another in-flight chunk now this one is consumed.
145160
case <-egCtx.Done():
146161
return egCtx.Err()
147162
}

internal/cache/s3_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package cache_test
22

33
import (
4+
"bytes"
45
"context"
6+
"io"
57
"log/slog"
68
"os"
79
"testing"
@@ -71,6 +73,50 @@ func TestS3ContextCancellationAbortsUpload(t *testing.T) {
7173
assert.IsError(t, err, os.ErrNotExist)
7274
}
7375

76+
// TestS3ParallelGetMultiChunk exercises the multi-chunk parallel download path
77+
// (small part size + concurrency over a multi-part object) and verifies the
78+
// reassembled bytes are correct and in order. This guards the bounded-window
79+
// reordering in parallelGet against off-by-one or interleaving regressions.
80+
func TestS3ParallelGetMultiChunk(t *testing.T) {
81+
bucket := s3clienttest.Start(t)
82+
s3clienttest.CleanBucket(t, bucket)
83+
_, ctx := logging.Configure(t.Context(), logging.Config{})
84+
85+
clientProvider := s3client.NewClientProvider(ctx, s3client.Config{Endpoint: s3clienttest.Addr, UseSSL: false})
86+
c, err := cache.NewS3(ctx, cache.S3Config{
87+
Bucket: bucket,
88+
MaxTTL: time.Hour,
89+
UploadPartSizeMB: 5,
90+
UploadConcurrency: 1, // single-threaded upload; concurrent upload exercises a minio-go crc64 race unrelated to this test
91+
DownloadPartSizeMB: 5, // 5 MiB chunks
92+
DownloadConcurrency: 3, // fewer workers than chunks, so the window throttles
93+
}, clientProvider)
94+
assert.NoError(t, err)
95+
defer c.Close()
96+
97+
// 17 MiB => 4 chunks at 5 MiB, with a deterministic pseudo-random pattern so
98+
// any mis-ordered or duplicated chunk is detected on read-back.
99+
want := make([]byte, 17<<20)
100+
for i := range want {
101+
want[i] = byte(i*2654435761 + i>>3) //nolint:gosec // arbitrary deterministic fill
102+
}
103+
104+
key := cache.NewKey("parallel-get-multichunk")
105+
w, err := c.Create(ctx, key, nil, time.Hour)
106+
assert.NoError(t, err)
107+
_, err = w.Write(want)
108+
assert.NoError(t, err)
109+
assert.NoError(t, w.Close())
110+
111+
rc, _, err := c.Open(ctx, key)
112+
assert.NoError(t, err)
113+
got, err := io.ReadAll(rc)
114+
assert.NoError(t, err)
115+
assert.NoError(t, rc.Close())
116+
assert.Equal(t, len(want), len(got))
117+
assert.True(t, bytes.Equal(want, got), "reassembled object must match the original bytes")
118+
}
119+
74120
func TestS3CacheSoak(t *testing.T) {
75121
if os.Getenv("SOAK_TEST") == "" {
76122
t.Skip("Skipping soak test; set SOAK_TEST=1 to run")

internal/httputil/conditional.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ func ConditionalOptions(r *http.Request) []client.RequestOption {
3232
return opts
3333
}
3434

35+
// RangeOptions extracts only the Range/If-Range options from r, for callers
36+
// that evaluate If-Match/If-None-Match separately (e.g. via CheckConditionals)
37+
// and so must not double-handle them by forwarding the full set to Open.
38+
// If-Range is only meaningful alongside Range, so it is dropped when Range is
39+
// absent.
40+
func RangeOptions(r *http.Request) []client.RequestOption {
41+
v := r.Header.Get("Range")
42+
if v == "" {
43+
return nil
44+
}
45+
opts := []client.RequestOption{func(o *client.RequestOptions) { o.Range = v }}
46+
if ir := r.Header.Get("If-Range"); ir != "" {
47+
opts = append(opts, client.IfRange(ir))
48+
}
49+
return opts
50+
}
51+
3552
// CheckConditionals evaluates RFC 7232 If-Match and If-None-Match precondition
3653
// headers on r against etag. It returns 0 when all preconditions pass,
3754
// otherwise the HTTP status the caller should send: 412 Precondition Failed for

internal/strategy/git/snapshot.go

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,18 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request,
320320
}
321321
s.maybeBackgroundFetch(repo)
322322

323-
reader, headers, err := s.cache.Open(ctx, cacheKey)
323+
// Forward only Range/If-Range here; If-Match/If-None-Match are evaluated
324+
// against the served body below via CheckConditionals.
325+
reader, headers, err := s.cache.Open(ctx, cacheKey, httputil.RangeOptions(r)...)
326+
if errors.Is(err, cache.ErrRangeNotSatisfiable) {
327+
w.Header().Set("Content-Type", "application/zstd")
328+
w.Header().Set("Accept-Ranges", "bytes")
329+
if cr := headers.Get("Content-Range"); cr != "" {
330+
w.Header().Set("Content-Range", cr)
331+
}
332+
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
333+
return
334+
}
324335
if err != nil && !errors.Is(err, os.ErrNotExist) {
325336
logger.ErrorContext(ctx, "Failed to open snapshot from cache", "upstream", upstreamURL, "error", err)
326337
http.Error(w, "Internal server error", http.StatusInternalServerError)
@@ -517,23 +528,29 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h
517528
}
518529

519530
func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.ReadCloser, headers http.Header, repo *gitclone.Repository, upstreamURL, repoName string, start time.Time) error {
520-
snapshotCommit := headers.Get("X-Cachew-Snapshot-Commit")
521-
mirrorHead := s.getMirrorHead(ctx, repo)
522-
523-
// Forward the snapshot commit to the client so it knows whether the
524-
// snapshot is fresh (no bundle URL = already at HEAD, skip freshen).
525-
if snapshotCommit != "" {
526-
w.Header().Set("X-Cachew-Snapshot-Commit", snapshotCommit)
531+
// A satisfied byte range is served as-is. Bundle negotiation and full-body
532+
// conditionals apply only to whole-snapshot downloads, so a partial read
533+
// (e.g. a client.ParallelGet chunk) skips them and returns 206 directly.
534+
if cr := headers.Get("Content-Range"); cr != "" {
535+
applySnapshotCacheHeaders(w, headers)
536+
w.Header().Set("Accept-Ranges", "bytes")
537+
w.Header().Set("Content-Range", cr)
538+
// Ranged clients (client.ParallelGet) read the freshen metadata from the
539+
// discovery chunk, so it must be present on partial responses too.
540+
s.setSnapshotMetadataHeaders(ctx, w, headers, repo, upstreamURL)
541+
w.WriteHeader(http.StatusPartialContent)
542+
n, err := io.Copy(w, reader)
543+
s.metrics.recordSnapshotServe(ctx, "cache_range", repoName, n, time.Since(start))
544+
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
545+
span.SetAttributes(attribute.String("cachew.source", "cache_range"), attribute.Int64("cachew.bytes", n))
546+
}
547+
return errors.Wrap(err, "stream snapshot range")
527548
}
528549

529-
if snapshotCommit != "" && mirrorHead != "" && snapshotCommit != mirrorHead {
530-
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
531-
if err == nil {
532-
bundleURL := fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit)
533-
w.Header().Set("X-Cachew-Bundle-Url", bundleURL)
534-
}
550+
snapshotCommit, bundleURL := s.setSnapshotMetadataHeaders(ctx, w, headers, repo, upstreamURL)
535551

536-
// Proactively generate and cache the bundle so any pod can serve it.
552+
// Proactively generate and cache the advertised bundle so any pod can serve it.
553+
if bundleURL != "" {
537554
go func() {
538555
bgCtx := context.WithoutCancel(ctx)
539556
logger := logging.FromContext(bgCtx)
@@ -550,6 +567,7 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW
550567
}
551568

552569
applySnapshotCacheHeaders(w, headers)
570+
w.Header().Set("Accept-Ranges", "bytes")
553571

554572
// Honour conditional GETs against the advertised ETag. ServeContent does this
555573
// natively for *os.File readers, but cache backends returning non-file readers
@@ -569,6 +587,33 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW
569587
return errors.Wrap(err, "stream snapshot")
570588
}
571589

590+
// setSnapshotMetadataHeaders advertises the snapshot's commit and, when the
591+
// snapshot trails the mirror's HEAD, the delta-bundle URL clients use to
592+
// fast-forward. Shared by the full and ranged serve paths so ranged clients
593+
// (client.ParallelGet) receive the same freshen metadata on the discovery
594+
// chunk. It returns the snapshot commit and the bundle URL it set (empty when
595+
// the snapshot is already at HEAD), so callers can decide whether to
596+
// pre-generate the bundle.
597+
func (s *Strategy) setSnapshotMetadataHeaders(ctx context.Context, w http.ResponseWriter, headers http.Header, repo *gitclone.Repository, upstreamURL string) (snapshotCommit, bundleURL string) {
598+
snapshotCommit = headers.Get("X-Cachew-Snapshot-Commit")
599+
if snapshotCommit == "" {
600+
return "", ""
601+
}
602+
w.Header().Set("X-Cachew-Snapshot-Commit", snapshotCommit)
603+
604+
mirrorHead := s.getMirrorHead(ctx, repo)
605+
if mirrorHead == "" || snapshotCommit == mirrorHead {
606+
return snapshotCommit, ""
607+
}
608+
repoPath, err := gitclone.RepoPathFromURL(upstreamURL)
609+
if err != nil {
610+
return snapshotCommit, ""
611+
}
612+
bundleURL = fmt.Sprintf("/git/%s/snapshot.bundle?base=%s", repoPath, snapshotCommit)
613+
w.Header().Set("X-Cachew-Bundle-Url", bundleURL)
614+
return snapshotCommit, bundleURL
615+
}
616+
572617
// applySnapshotCacheHeaders forwards the cached snapshot's validators so clients
573618
// can revalidate (ETag) and size the transfer (Content-Length). Content-Type is
574619
// fixed for snapshots regardless of what the cache backend recorded.

internal/strategy/git/snapshot_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"os/exec"
1111
"path/filepath"
12+
"strconv"
1213
"strings"
1314
"testing"
1415
"time"
@@ -930,3 +931,75 @@ func TestSnapshotRemoteURLUsesUpstreamURL(t *testing.T) {
930931
assert.NoError(t, err, string(output))
931932
assert.Equal(t, upstreamURL+"\n", string(output))
932933
}
934+
935+
func TestSnapshotGetHonorsRange(t *testing.T) {
936+
if _, err := exec.LookPath("git"); err != nil {
937+
t.Skip("git not found in PATH")
938+
}
939+
940+
_, ctx := logging.Configure(context.Background(), logging.Config{})
941+
tmpDir := t.TempDir()
942+
mirrorRoot := filepath.Join(tmpDir, "mirrors")
943+
upstreamURL := "https://github.com/org/repo"
944+
mirrorPath := filepath.Join(mirrorRoot, "github.com", "org", "repo")
945+
createTestMirrorRepo(t, mirrorPath)
946+
947+
// The memory cache returns a non-*os.File reader, so range handling must come
948+
// from the strategy forwarding Range to Open rather than http.ServeContent.
949+
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
950+
assert.NoError(t, err)
951+
mux := newTestMux()
952+
953+
cm := gitclone.NewManagerProvider(ctx, gitclone.Config{MirrorRoot: mirrorRoot}, nil)
954+
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
955+
assert.NoError(t, err)
956+
957+
manager, err := cm()
958+
assert.NoError(t, err)
959+
repo, err := manager.GetOrCreate(ctx, upstreamURL)
960+
assert.NoError(t, err)
961+
962+
waitForReady(t, s)
963+
err = s.GenerateAndUploadSnapshot(ctx, repo)
964+
assert.NoError(t, err)
965+
966+
handler := mux.handlers["GET /git/{host}/{path...}"]
967+
assert.NotZero(t, handler)
968+
969+
get := func(rangeHeader string) *httptest.ResponseRecorder {
970+
req := httptest.NewRequest(http.MethodGet, "/git/github.com/org/repo/snapshot.tar.zst", nil)
971+
req = req.WithContext(ctx)
972+
req.SetPathValue("host", "github.com")
973+
req.SetPathValue("path", "org/repo/snapshot.tar.zst")
974+
if rangeHeader != "" {
975+
req.Header.Set("Range", rangeHeader)
976+
}
977+
w := httptest.NewRecorder()
978+
handler.ServeHTTP(w, req)
979+
return w
980+
}
981+
982+
// Full GET advertises range support and yields the whole body.
983+
full := get("")
984+
assert.Equal(t, 200, full.Code)
985+
assert.Equal(t, "bytes", full.Header().Get("Accept-Ranges"))
986+
body := full.Body.Bytes()
987+
assert.True(t, len(body) > 4, "snapshot body should be larger than the test range")
988+
commit := full.Header().Get("X-Cachew-Snapshot-Commit")
989+
assert.NotZero(t, commit, "full GET should advertise the snapshot commit")
990+
991+
// A satisfiable range returns 206 with the matching bytes and Content-Range.
992+
partial := get("bytes=0-3")
993+
assert.Equal(t, http.StatusPartialContent, partial.Code)
994+
assert.Equal(t, "bytes", partial.Header().Get("Accept-Ranges"))
995+
assert.Equal(t, "bytes 0-3/"+strconv.Itoa(len(body)), partial.Header().Get("Content-Range"))
996+
assert.Equal(t, body[:4], partial.Body.Bytes())
997+
// Ranged clients must receive the same freshen metadata as a full GET so
998+
// they can apply a delta bundle after a parallel download.
999+
assert.Equal(t, commit, partial.Header().Get("X-Cachew-Snapshot-Commit"))
1000+
1001+
// A range beyond the object is not satisfiable.
1002+
tooBig := get("bytes=" + strconv.Itoa(len(body)+10) + "-" + strconv.Itoa(len(body)+20))
1003+
assert.Equal(t, http.StatusRequestedRangeNotSatisfiable, tooBig.Code)
1004+
assert.Equal(t, "bytes */"+strconv.Itoa(len(body)), tooBig.Header().Get("Content-Range"))
1005+
}

0 commit comments

Comments
 (0)