Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions client/parallel_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type RangeReader interface {
// is likewise reported as an error, so a partially written dst must be discarded
// by the caller on failure. An object with no ETag to pin to (e.g. one stored
// before ETags were recorded) cannot be kept revision-safe across chunks, so it
// falls back to a single full read instead of parallelising.
// falls back to a single full read instead of parallelising. A concurrency of
// 1 likewise reads the whole object in one request, since chunking a single
// worker would only serialise ranged GETs for no benefit.
//
// dst is written via concurrent WriteAt calls at non-overlapping offsets; the
// caller owns dst's lifecycle (open, close, cleanup) and need not pre-size it,
Expand All @@ -43,6 +45,13 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
}
concurrency = max(concurrency, 1)

// A single worker gains nothing from chunking — it would only serialise
// ranged GETs — so skip discovery entirely and read the object in one
// revision-consistent request.
if concurrency == 1 {
return fullRead(ctx, c, key, dst)
Comment on lines +51 to +52

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor single-worker downloads for S3 caches

When cache.ParallelGet is called with an *S3 cache and concurrency == 1, this new fast path switches from serial ranged Open calls to an unrestricted full Open. For large non-ranged S3 reads, S3.Open calls parallelGetReader (internal/cache/s3.go:246), which then uses s.config.DownloadConcurrency workers (internal/cache/s3_parallel_get.go:78, default 8 in internal/cache/s3.go:47-49), so callers that set ParallelGet(..., concurrency: 1) to throttle S3 traffic can still launch multiple concurrent S3 range GETs. Keep the serial ranged path for backends whose full Open is internally parallel, or provide an explicit single-stream/full-read path that preserves the requested concurrency limit.

Useful? React with 👍 / 👎.

}

// Discovery: the first ranged Open delivers chunk zero and reveals the total
// size and ETag used to pin the rest.
rc, headers, err := c.Open(ctx, key, Range(0, chunkSize))
Expand Down Expand Up @@ -77,14 +86,7 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
if err := rc.Close(); err != nil {
return errors.Wrap(err, "parallel get: close discovery reader")
}
full, _, err := c.Open(ctx, key)
if err != nil {
return errors.Wrap(err, "parallel get: full read")
}
// The full read is a fresh request whose body may be a different
// revision than discovery, so the discovery `total` cannot validate its
// length; -1 skips the check and relies on transport-level EOF detection.
return errors.Wrap(writeChunkAt(dst, 0, -1, full), "parallel get")
return fullRead(ctx, c, key, dst)
}

// Multiple chunks: copy the already-open first chunk concurrently with the
Expand All @@ -106,6 +108,19 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
return errors.Wrap(eg.Wait(), "parallel get")
}

// fullRead downloads the entire object in a single request and writes it at
// offset zero. It is used when chunking would add no value (a single worker) or
// cannot be made revision-safe (no ETag to pin). The body is a single
// consistent revision, but its length is unknown up front, so writeChunkAt's
// length check is skipped (-1).
func fullRead(ctx context.Context, c RangeReader, key Key, dst io.WriterAt) error {
rc, _, err := c.Open(ctx, key)
if err != nil {
return errors.Wrap(err, "parallel get: full read")
}
return errors.Wrap(writeChunkAt(dst, 0, -1, rc), "parallel get")
}

// fetchChunk opens the [start, end) range pinned to etag and writes it at start.
// An ETag change (the object was rewritten mid-download) or a short read is
// reported as an error.
Expand Down
49 changes: 49 additions & 0 deletions client/parallel_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,55 @@ func (c *changingSizeReader) Open(_ context.Context, _ client.Key, opts ...clien
return io.NopCloser(bytes.NewReader(c.discovery[start : start+length])), headers, nil
}

// recordingReader serves byte ranges and records the Range option of every
// Open call ("" for a full, non-ranged read), so tests can assert how the
// object was fetched.
type recordingReader struct {
data []byte
etag string

mu sync.Mutex
opens []string
}

func (r *recordingReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) {
o := client.NewRequestOptions(opts...)
r.mu.Lock()
r.opens = append(r.opens, o.Range)
r.mu.Unlock()

size := int64(len(r.data))
start, length, outcome := o.ResolveRange(size, r.etag)
headers := http.Header{}
if outcome == client.RangeNotSatisfiable {
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size))
return nil, headers, client.ErrRangeNotSatisfiable
}
if r.etag != "" {
headers.Set(client.ETagKey, r.etag)
}
headers.Set("Content-Length", strconv.FormatInt(length, 10))
if outcome == client.RangePartial {
headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size))
}
return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil
}

func TestParallelGetSingleWorkerFullRead(t *testing.T) {
// A concurrency of 1 gains nothing from chunking, so it must issue a single
// non-ranged read rather than discovering and serialising ranged GETs.
data := make([]byte, 1000)
for i := range data {
data[i] = byte(i % 251)
}
c := &recordingReader{data: data, etag: `"v1"`}
var dst bufferAt
err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 1)
assert.NoError(t, err)
assert.Equal(t, data, dst.buf)
assert.Equal(t, []string{""}, c.opens)
}

func TestParallelGetNoETagSizeChangedBetweenRequests(t *testing.T) {
// A no-ETag multi-chunk object falls back to a single full read. If it is
// rewritten to a different size between discovery and that read, the
Expand Down
12 changes: 8 additions & 4 deletions internal/cache/parallel_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestParallelGet(t *testing.T) {
{name: "UnevenChunks", chunkSize: 300, concurrency: 3},
{name: "SingleByteChunks", chunkSize: 1, concurrency: 8},
{name: "ChunkLargerThanObject", chunkSize: 5000, concurrency: 4},
{name: "SerialFastPath", chunkSize: 100, concurrency: 1},
{name: "SingleWorkerFullRead", chunkSize: 100, concurrency: 1},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -80,9 +80,13 @@ func TestParallelGetEmptyObject(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, w.Close())

var dst bufferAt
assert.NoError(t, cache.ParallelGet(ctx, c, key, &dst, 100, 4))
assert.Equal(t, 0, len(dst.buf))
// concurrency 4 takes the ranged discovery path (ErrRangeNotSatisfiable),
// concurrency 1 takes the up-front full-read path; both must yield nothing.
for _, concurrency := range []int{4, 1} {
var dst bufferAt
assert.NoError(t, cache.ParallelGet(ctx, c, key, &dst, 100, concurrency))
assert.Equal(t, 0, len(dst.buf))
}
}

func TestParallelGetNotFound(t *testing.T) {
Expand Down
Loading