Skip to content

Commit e6b5db3

Browse files
authored
fix(client): read whole object when ParallelGet concurrency is 1 (#357)
A single worker gains nothing from chunking, so skip the ranged discovery request entirely and read the object in one revision-consistent request rather than serialising ranged GETs.
1 parent 2802417 commit e6b5db3

3 files changed

Lines changed: 81 additions & 13 deletions

File tree

client/parallel_get.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ type RangeReader interface {
3232
// is likewise reported as an error, so a partially written dst must be discarded
3333
// by the caller on failure. An object with no ETag to pin to (e.g. one stored
3434
// before ETags were recorded) cannot be kept revision-safe across chunks, so it
35-
// falls back to a single full read instead of parallelising.
35+
// falls back to a single full read instead of parallelising. A concurrency of
36+
// 1 likewise reads the whole object in one request, since chunking a single
37+
// worker would only serialise ranged GETs for no benefit.
3638
//
3739
// dst is written via concurrent WriteAt calls at non-overlapping offsets; the
3840
// caller owns dst's lifecycle (open, close, cleanup) and need not pre-size it,
@@ -43,6 +45,13 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
4345
}
4446
concurrency = max(concurrency, 1)
4547

48+
// A single worker gains nothing from chunking — it would only serialise
49+
// ranged GETs — so skip discovery entirely and read the object in one
50+
// revision-consistent request.
51+
if concurrency == 1 {
52+
return fullRead(ctx, c, key, dst)
53+
}
54+
4655
// Discovery: the first ranged Open delivers chunk zero and reveals the total
4756
// size and ETag used to pin the rest.
4857
rc, headers, err := c.Open(ctx, key, Range(0, chunkSize))
@@ -77,14 +86,7 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
7786
if err := rc.Close(); err != nil {
7887
return errors.Wrap(err, "parallel get: close discovery reader")
7988
}
80-
full, _, err := c.Open(ctx, key)
81-
if err != nil {
82-
return errors.Wrap(err, "parallel get: full read")
83-
}
84-
// The full read is a fresh request whose body may be a different
85-
// revision than discovery, so the discovery `total` cannot validate its
86-
// length; -1 skips the check and relies on transport-level EOF detection.
87-
return errors.Wrap(writeChunkAt(dst, 0, -1, full), "parallel get")
89+
return fullRead(ctx, c, key, dst)
8890
}
8991

9092
// Multiple chunks: copy the already-open first chunk concurrently with the
@@ -106,6 +108,19 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
106108
return errors.Wrap(eg.Wait(), "parallel get")
107109
}
108110

111+
// fullRead downloads the entire object in a single request and writes it at
112+
// offset zero. It is used when chunking would add no value (a single worker) or
113+
// cannot be made revision-safe (no ETag to pin). The body is a single
114+
// consistent revision, but its length is unknown up front, so writeChunkAt's
115+
// length check is skipped (-1).
116+
func fullRead(ctx context.Context, c RangeReader, key Key, dst io.WriterAt) error {
117+
rc, _, err := c.Open(ctx, key)
118+
if err != nil {
119+
return errors.Wrap(err, "parallel get: full read")
120+
}
121+
return errors.Wrap(writeChunkAt(dst, 0, -1, rc), "parallel get")
122+
}
123+
109124
// fetchChunk opens the [start, end) range pinned to etag and writes it at start.
110125
// An ETag change (the object was rewritten mid-download) or a short read is
111126
// reported as an error.

client/parallel_get_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,55 @@ func (c *changingSizeReader) Open(_ context.Context, _ client.Key, opts ...clien
147147
return io.NopCloser(bytes.NewReader(c.discovery[start : start+length])), headers, nil
148148
}
149149

150+
// recordingReader serves byte ranges and records the Range option of every
151+
// Open call ("" for a full, non-ranged read), so tests can assert how the
152+
// object was fetched.
153+
type recordingReader struct {
154+
data []byte
155+
etag string
156+
157+
mu sync.Mutex
158+
opens []string
159+
}
160+
161+
func (r *recordingReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) {
162+
o := client.NewRequestOptions(opts...)
163+
r.mu.Lock()
164+
r.opens = append(r.opens, o.Range)
165+
r.mu.Unlock()
166+
167+
size := int64(len(r.data))
168+
start, length, outcome := o.ResolveRange(size, r.etag)
169+
headers := http.Header{}
170+
if outcome == client.RangeNotSatisfiable {
171+
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size))
172+
return nil, headers, client.ErrRangeNotSatisfiable
173+
}
174+
if r.etag != "" {
175+
headers.Set(client.ETagKey, r.etag)
176+
}
177+
headers.Set("Content-Length", strconv.FormatInt(length, 10))
178+
if outcome == client.RangePartial {
179+
headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size))
180+
}
181+
return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil
182+
}
183+
184+
func TestParallelGetSingleWorkerFullRead(t *testing.T) {
185+
// A concurrency of 1 gains nothing from chunking, so it must issue a single
186+
// non-ranged read rather than discovering and serialising ranged GETs.
187+
data := make([]byte, 1000)
188+
for i := range data {
189+
data[i] = byte(i % 251)
190+
}
191+
c := &recordingReader{data: data, etag: `"v1"`}
192+
var dst bufferAt
193+
err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 1)
194+
assert.NoError(t, err)
195+
assert.Equal(t, data, dst.buf)
196+
assert.Equal(t, []string{""}, c.opens)
197+
}
198+
150199
func TestParallelGetNoETagSizeChangedBetweenRequests(t *testing.T) {
151200
// A no-ETag multi-chunk object falls back to a single full read. If it is
152201
// rewritten to a different size between discovery and that read, the

internal/cache/parallel_get_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestParallelGet(t *testing.T) {
5757
{name: "UnevenChunks", chunkSize: 300, concurrency: 3},
5858
{name: "SingleByteChunks", chunkSize: 1, concurrency: 8},
5959
{name: "ChunkLargerThanObject", chunkSize: 5000, concurrency: 4},
60-
{name: "SerialFastPath", chunkSize: 100, concurrency: 1},
60+
{name: "SingleWorkerFullRead", chunkSize: 100, concurrency: 1},
6161
}
6262
for _, tt := range tests {
6363
t.Run(tt.name, func(t *testing.T) {
@@ -80,9 +80,13 @@ func TestParallelGetEmptyObject(t *testing.T) {
8080
assert.NoError(t, err)
8181
assert.NoError(t, w.Close())
8282

83-
var dst bufferAt
84-
assert.NoError(t, cache.ParallelGet(ctx, c, key, &dst, 100, 4))
85-
assert.Equal(t, 0, len(dst.buf))
83+
// concurrency 4 takes the ranged discovery path (ErrRangeNotSatisfiable),
84+
// concurrency 1 takes the up-front full-read path; both must yield nothing.
85+
for _, concurrency := range []int{4, 1} {
86+
var dst bufferAt
87+
assert.NoError(t, cache.ParallelGet(ctx, c, key, &dst, 100, concurrency))
88+
assert.Equal(t, 0, len(dst.buf))
89+
}
8690
}
8791

8892
func TestParallelGetNotFound(t *testing.T) {

0 commit comments

Comments
 (0)