Skip to content

Commit 4d97fb3

Browse files
worstellampagent
andcommitted
feat(client): add ParallelGetStream for in-order streaming parallel downloads
ParallelGet writes chunks to an io.WriterAt, which requires a seekable destination (e.g. a temp file) and prevents a consumer from overlapping download with processing. ParallelGetStream fetches chunks in parallel but emits in-order bytes to a plain io.Writer via a bounded reorder buffer, so a streaming consumer (e.g. a decompress/extract pipeline) can run concurrently with the download. A concurrency-sized window caps fetched-but-unwritten chunks, bounding peak memory to O(concurrency * chunkSize) regardless of object size or consumer speed. Revision safety, ETag pinning, empty-object handling, range-ignore degrade, and the concurrency==1 shortcut mirror ParallelGet. Amp-Thread-ID: https://ampcode.com/threads/T-019ef6a9-a407-7389-bc43-001405e3ae9e Co-authored-by: Amp <amp@ampcode.com>
1 parent c2bb197 commit 4d97fb3

2 files changed

Lines changed: 396 additions & 0 deletions

File tree

client/parallel_get_stream.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"io"
6+
"sync/atomic"
7+
8+
"github.com/alecthomas/errors"
9+
"golang.org/x/sync/errgroup"
10+
)
11+
12+
// ParallelGetStream downloads an object from any Range-capable RangeReader and
13+
// writes it, in order, to dst. Like ParallelGet it fetches the object in
14+
// chunkSize-byte chunks concurrently (up to concurrency requests in flight), but
15+
// instead of scattering chunks into an io.WriterAt it emits a single sequential
16+
// byte stream. This lets a caller overlap the download with a streaming consumer
17+
// (e.g. a zstd/tar pipeline) rather than staging the whole object first.
18+
//
19+
// Chunks complete out of order, so a bounded reorder buffer holds fetched
20+
// chunks until their turn to be written. A window caps the number of
21+
// fetched-but-unwritten chunks (and thus in-flight fetches) at concurrency, so
22+
// peak buffered memory is O(concurrency * chunkSize) regardless of object size.
23+
//
24+
// Revision safety mirrors ParallelGet: the first ranged Open reveals the total
25+
// size and ETag, every later chunk pins that ETag via IfRange, and an ETag
26+
// change mid-download is reported as an error rather than splicing revisions. An
27+
// object with no ETag, one that fits in the first chunk, or a backend that
28+
// ignored the range falls back to a single full read; a concurrency of 1
29+
// likewise reads the whole object in one request.
30+
//
31+
// dst is written sequentially from a single goroutine, so it need not be safe
32+
// for concurrent writes. On error a partially written dst must be discarded by
33+
// the caller.
34+
func ParallelGetStream(ctx context.Context, c RangeReader, key Key, dst io.Writer, chunkSize int64, concurrency int) error {
35+
if chunkSize <= 0 {
36+
return errors.Errorf("parallel get stream: chunk size must be positive, got %d", chunkSize)
37+
}
38+
concurrency = max(concurrency, 1)
39+
40+
// A single worker gains nothing from chunking, so read the object in one
41+
// revision-consistent request.
42+
if concurrency == 1 {
43+
return fullReadStream(ctx, c, key, dst)
44+
}
45+
46+
// Discovery: the first ranged Open delivers chunk zero and reveals the total
47+
// size and ETag used to pin the rest.
48+
rc, headers, err := c.Open(ctx, key, Range(0, chunkSize))
49+
if errors.Is(err, ErrRangeNotSatisfiable) {
50+
return nil // Empty object: nothing to write.
51+
}
52+
if err != nil {
53+
return errors.Wrap(err, "parallel get stream: open first chunk")
54+
}
55+
56+
etag := headers.Get(ETagKey)
57+
total, hasRange := parseContentRangeTotal(headers.Get("Content-Range"))
58+
59+
// A backend that ignored the range, or an object that fits within the first
60+
// chunk, is delivered entirely by this response: copy it and return. A
61+
// negative want skips the length check when the total size is unknown.
62+
firstLen := min(chunkSize, total)
63+
if !hasRange {
64+
firstLen = -1
65+
}
66+
if !hasRange || total <= chunkSize {
67+
return errors.Wrap(copyChunkStream(dst, firstLen, rc), "parallel get stream")
68+
}
69+
70+
// Without a validator to pin subsequent chunks to, splicing across a rewrite
71+
// can't be detected, so fall back to a single, revision-consistent read.
72+
if etag == "" {
73+
if err := rc.Close(); err != nil {
74+
return errors.Wrap(err, "parallel get stream: close discovery reader")
75+
}
76+
return fullReadStream(ctx, c, key, dst)
77+
}
78+
79+
numChunks := int((total + chunkSize - 1) / chunkSize)
80+
81+
// slots[seq] carries the bytes of chunk seq (1 <= seq < numChunks) from its
82+
// fetching worker to the writer. Chunk zero is streamed directly from the
83+
// discovery reader and has no slot.
84+
slots := make([]chan []byte, numChunks)
85+
for seq := 1; seq < numChunks; seq++ {
86+
slots[seq] = make(chan []byte, 1)
87+
}
88+
89+
// window bounds fetched-but-unwritten chunks (and in-flight fetches) to
90+
// concurrency. A worker takes a token before fetching; the writer returns
91+
// one after writing a chunk, admitting the next fetch. The channel never
92+
// exceeds capacity because a token is always "held" by the chunk in flight
93+
// or being written at the moment it is returned, so sends never block.
94+
window := make(chan struct{}, concurrency)
95+
for range concurrency {
96+
window <- struct{}{}
97+
}
98+
99+
eg, egCtx := errgroup.WithContext(ctx)
100+
101+
var nextSeq atomic.Int64
102+
nextSeq.Store(1)
103+
for range concurrency {
104+
eg.Go(func() error {
105+
for {
106+
select {
107+
case <-egCtx.Done():
108+
return egCtx.Err()
109+
case <-window:
110+
}
111+
seq := int(nextSeq.Add(1) - 1)
112+
if seq >= numChunks {
113+
window <- struct{}{} // No work for this token; return it.
114+
return nil
115+
}
116+
start := int64(seq) * chunkSize
117+
end := min(start+chunkSize, total)
118+
data, err := fetchChunkBytes(egCtx, c, key, start, end, etag)
119+
if err != nil {
120+
return err
121+
}
122+
slots[seq] <- data // Buffered, single producer: never blocks.
123+
}
124+
})
125+
}
126+
127+
// Writer: stream chunk zero from the discovery reader, then emit each
128+
// subsequent chunk in order, returning a window token after each so workers
129+
// can advance.
130+
eg.Go(func() error {
131+
if err := copyChunkStream(dst, firstLen, rc); err != nil {
132+
return err
133+
}
134+
for seq := 1; seq < numChunks; seq++ {
135+
select {
136+
case <-egCtx.Done():
137+
return egCtx.Err()
138+
case data := <-slots[seq]:
139+
if _, err := dst.Write(data); err != nil {
140+
return errors.Errorf("write chunk at offset %d: %w", int64(seq)*chunkSize, err)
141+
}
142+
window <- struct{}{}
143+
}
144+
}
145+
return nil
146+
})
147+
148+
return errors.Wrap(eg.Wait(), "parallel get stream")
149+
}
150+
151+
// fullReadStream downloads the entire object in a single request and copies it
152+
// to dst. Used when chunking adds no value (a single worker) or can't be made
153+
// revision-safe (no ETag to pin).
154+
func fullReadStream(ctx context.Context, c RangeReader, key Key, dst io.Writer) error {
155+
rc, _, err := c.Open(ctx, key)
156+
if err != nil {
157+
return errors.Wrap(err, "parallel get stream: full read")
158+
}
159+
return errors.Wrap(copyChunkStream(dst, -1, rc), "parallel get stream")
160+
}
161+
162+
// copyChunkStream copies src into dst and closes src. It fails if fewer than
163+
// want bytes arrive; a negative want skips that check (total size unknown).
164+
func copyChunkStream(dst io.Writer, want int64, src io.ReadCloser) error {
165+
n, copyErr := io.Copy(dst, src)
166+
if err := errors.Join(copyErr, src.Close()); err != nil {
167+
return errors.Errorf("copy chunk: %w", err)
168+
}
169+
if want >= 0 && n != want {
170+
return errors.Errorf("short chunk: copied %d of %d bytes", n, want)
171+
}
172+
return nil
173+
}
174+
175+
// fetchChunkBytes opens the [start, end) range pinned to etag and returns its
176+
// bytes. An ETag change (the object was rewritten mid-download) or a short read
177+
// is reported as an error.
178+
func fetchChunkBytes(ctx context.Context, c RangeReader, key Key, start, end int64, etag string) ([]byte, error) {
179+
rc, headers, err := c.Open(ctx, key, Range(start, end), IfRange(etag))
180+
if err != nil {
181+
return nil, errors.Errorf("open range %d-%d: %w", start, end, err)
182+
}
183+
if got := headers.Get(ETagKey); got != etag {
184+
return nil, errors.Join(
185+
errors.Errorf("object changed during read at offset %d: etag %q != %q", start, got, etag),
186+
rc.Close(),
187+
)
188+
}
189+
buf := make([]byte, end-start)
190+
_, readErr := io.ReadFull(rc, buf)
191+
if err := errors.Join(readErr, rc.Close()); err != nil {
192+
return nil, errors.Errorf("read chunk at offset %d: %w", start, err)
193+
}
194+
return buf, nil
195+
}

client/parallel_get_stream_test.go

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package client_test
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"strconv"
10+
"sync/atomic"
11+
"testing"
12+
"time"
13+
14+
"github.com/alecthomas/assert/v2"
15+
"github.com/alecthomas/errors"
16+
17+
"github.com/block/cachew/client"
18+
)
19+
20+
func patternBytes(n int) []byte {
21+
data := make([]byte, n)
22+
for i := range data {
23+
data[i] = byte(i % 251)
24+
}
25+
return data
26+
}
27+
28+
func TestParallelGetStreamReassembly(t *testing.T) {
29+
// A multi-chunk object must be emitted to the writer as the original,
30+
// in-order byte stream despite being fetched concurrently.
31+
data := patternBytes(10_000)
32+
c := &recordingReader{data: data, etag: `"v1"`}
33+
var dst bytes.Buffer
34+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 1000, 4)
35+
assert.NoError(t, err)
36+
assert.Equal(t, data, dst.Bytes())
37+
}
38+
39+
func TestParallelGetStreamSingleWorkerFullRead(t *testing.T) {
40+
// A concurrency of 1 must issue a single non-ranged read rather than
41+
// discovering and serialising ranged GETs.
42+
data := patternBytes(1000)
43+
c := &recordingReader{data: data, etag: `"v1"`}
44+
var dst bytes.Buffer
45+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 100, 1)
46+
assert.NoError(t, err)
47+
assert.Equal(t, data, dst.Bytes())
48+
assert.Equal(t, []string{""}, c.opens)
49+
}
50+
51+
func TestParallelGetStreamEmptyObject(t *testing.T) {
52+
c := &recordingReader{data: nil, etag: `"v1"`}
53+
var dst bytes.Buffer
54+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
55+
assert.NoError(t, err)
56+
assert.Equal(t, 0, dst.Len())
57+
}
58+
59+
func TestParallelGetStreamSingleChunk(t *testing.T) {
60+
// An object delivered entirely by the discovery request is streamed directly.
61+
data := []byte("0123456789")
62+
c := &recordingReader{data: data, etag: `"v1"`}
63+
var dst bytes.Buffer
64+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
65+
assert.NoError(t, err)
66+
assert.Equal(t, data, dst.Bytes())
67+
}
68+
69+
func TestParallelGetStreamETagMismatch(t *testing.T) {
70+
// An object rewritten mid-download (different ETag on later chunks) must be
71+
// reported rather than splicing bytes from two revisions.
72+
c := &rangeFlipReader{data: make([]byte, 1000), firstETag: `"v1"`, restETag: `"v2"`}
73+
var dst bytes.Buffer
74+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
75+
assert.Error(t, err)
76+
assert.Contains(t, err.Error(), "object changed during read")
77+
}
78+
79+
func TestParallelGetStreamNoETagMultiChunk(t *testing.T) {
80+
// A multi-chunk object with no ETag can't be pinned, so it falls back to a
81+
// single full read.
82+
data := patternBytes(1000)
83+
c := &noETagReader{data: data}
84+
var dst bytes.Buffer
85+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
86+
assert.NoError(t, err)
87+
assert.Equal(t, data, dst.Bytes())
88+
}
89+
90+
func TestParallelGetStreamNoETagSingleChunk(t *testing.T) {
91+
data := []byte("0123456789")
92+
c := &noETagReader{data: data}
93+
var dst bytes.Buffer
94+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
95+
assert.NoError(t, err)
96+
assert.Equal(t, data, dst.Bytes())
97+
}
98+
99+
func TestParallelGetStreamServerIgnoresRange(t *testing.T) {
100+
// A backend that ignores the range header delivers the whole object on the
101+
// discovery request; it must be streamed in full.
102+
data := patternBytes(1000)
103+
c := &ignoreRangeReader{data: data}
104+
var dst bytes.Buffer
105+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 100, 4)
106+
assert.NoError(t, err)
107+
assert.Equal(t, data, dst.Bytes())
108+
}
109+
110+
func TestParallelGetStreamOutOfOrderCompletion(t *testing.T) {
111+
// Chunks deliberately complete in reverse order; the writer must still emit a
112+
// correctly ordered stream and stay within the bounded window.
113+
data := patternBytes(10_000)
114+
c := &reorderReader{data: data, etag: `"v1"`, chunkSize: 1000}
115+
var dst bytes.Buffer
116+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 1000, 4)
117+
assert.NoError(t, err)
118+
assert.Equal(t, data, dst.Bytes())
119+
}
120+
121+
func TestParallelGetStreamPropagatesOpenError(t *testing.T) {
122+
// An error opening a non-first chunk must surface and cancel the download.
123+
c := &failingChunkReader{data: patternBytes(10_000), etag: `"v1"`, failAtStart: 5000}
124+
var dst bytes.Buffer
125+
err := client.ParallelGetStream(context.Background(), c, client.NewKey("k"), &dst, 1000, 4)
126+
assert.Error(t, err)
127+
assert.Contains(t, err.Error(), "boom")
128+
}
129+
130+
// ignoreRangeReader returns the whole object with no Content-Range regardless of
131+
// the requested range, modelling a backend that doesn't honour ranges.
132+
type ignoreRangeReader struct{ data []byte }
133+
134+
func (r *ignoreRangeReader) Open(_ context.Context, _ client.Key, _ ...client.RequestOption) (io.ReadCloser, http.Header, error) {
135+
headers := http.Header{}
136+
headers.Set("Content-Length", strconv.Itoa(len(r.data)))
137+
return io.NopCloser(bytes.NewReader(r.data)), headers, nil
138+
}
139+
140+
// reorderReader serves correct byte ranges but delays earlier offsets longer
141+
// than later ones, so within the in-flight window chunks complete out of order
142+
// and the writer must buffer and reorder them.
143+
type reorderReader struct {
144+
data []byte
145+
etag string
146+
chunkSize int64
147+
}
148+
149+
func (r *reorderReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) {
150+
size := int64(len(r.data))
151+
o := client.NewRequestOptions(opts...)
152+
start, length, outcome := o.ResolveRange(size, r.etag)
153+
headers := http.Header{}
154+
if outcome == client.RangeNotSatisfiable {
155+
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size))
156+
return nil, headers, client.ErrRangeNotSatisfiable
157+
}
158+
// Earlier chunks within a window sleep longer, so higher offsets finish
159+
// first and the writer is forced to reorder.
160+
if outcome == client.RangePartial {
161+
chunks := (size - start) / r.chunkSize
162+
time.Sleep(time.Duration(chunks) * time.Millisecond)
163+
}
164+
headers.Set(client.ETagKey, r.etag)
165+
headers.Set("Content-Length", strconv.FormatInt(length, 10))
166+
if outcome == client.RangePartial {
167+
headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size))
168+
}
169+
return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil
170+
}
171+
172+
// failingChunkReader serves ranges normally but errors when the requested range
173+
// starts at failAtStart, modelling a mid-download fetch failure.
174+
type failingChunkReader struct {
175+
data []byte
176+
etag string
177+
failAtStart int64
178+
179+
opens atomic.Int64
180+
}
181+
182+
func (r *failingChunkReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) {
183+
r.opens.Add(1)
184+
size := int64(len(r.data))
185+
o := client.NewRequestOptions(opts...)
186+
start, length, outcome := o.ResolveRange(size, r.etag)
187+
if outcome == client.RangePartial && start == r.failAtStart {
188+
return nil, nil, errors.New("boom")
189+
}
190+
headers := http.Header{}
191+
if outcome == client.RangeNotSatisfiable {
192+
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size))
193+
return nil, headers, client.ErrRangeNotSatisfiable
194+
}
195+
headers.Set(client.ETagKey, r.etag)
196+
headers.Set("Content-Length", strconv.FormatInt(length, 10))
197+
if outcome == client.RangePartial {
198+
headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size))
199+
}
200+
return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil
201+
}

0 commit comments

Comments
 (0)