diff --git a/client/git.go b/client/git.go index 999509d..beb35e6 100644 --- a/client/git.go +++ b/client/git.go @@ -193,14 +193,15 @@ type GitSnapshotMetadata struct { BundleURL string } -// DownloadGitSnapshot fetches the working-tree snapshot for repoURL into dst, -// using up to concurrency concurrent range requests of chunkSize bytes each. -// When concurrency is 1, or the server does not support ranges, it transparently -// falls back to a single full download. dst is written at non-overlapping -// offsets via WriteAt (e.g. an *os.File) and the caller owns its lifecycle. It -// returns the snapshot's freshen metadata, read from the discovery response. -// Returns os.ErrNotExist when the server has no snapshot available. -func (c *Client) DownloadGitSnapshot(ctx context.Context, repoURL string, dst io.WriterAt, chunkSize int64, concurrency int) (GitSnapshotMetadata, error) { +// DownloadGitSnapshot fetches the working-tree snapshot for repoURL and writes +// it, in order, to dst, using up to concurrency concurrent range requests of +// chunkSize bytes each. When concurrency is 1, or the server does not support +// ranges, it transparently falls back to a single full download. Because dst is +// written sequentially, a streaming consumer (e.g. a decompress/extract +// pipeline) can run concurrently with the download. It returns the snapshot's +// freshen metadata, read from the discovery response. Returns os.ErrNotExist +// when the server has no snapshot available. +func (c *Client) DownloadGitSnapshot(ctx context.Context, repoURL string, dst io.Writer, chunkSize int64, concurrency int) (GitSnapshotMetadata, error) { endpoint, err := gitEndpointURL(c.baseURL, repoURL, "snapshot.tar.zst") if err != nil { return GitSnapshotMetadata{}, err diff --git a/client/git_test.go b/client/git_test.go index 0e2ac43..d3f180e 100644 --- a/client/git_test.go +++ b/client/git_test.go @@ -197,12 +197,12 @@ func TestDownloadGitSnapshotParallel(t *testing.T) { defer srv.Close() api := client.NewWithHTTPClient(srv.URL, srv.Client()) - var dst bufferAt + var dst bytes.Buffer // A 128-byte chunk over a 1000-byte body forces multiple chunks, exercising // concurrent range reassembly. meta, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 128, 4) assert.NoError(t, err) - assert.Equal(t, body, dst.buf) + assert.Equal(t, body, dst.Bytes()) assert.Equal(t, "deadbeef", meta.Commit) assert.Equal(t, "/git/github.com/org/repo/snapshot.bundle?base=deadbeef", meta.BundleURL) assert.True(t, requests.Load() > 1, "expected multiple range requests, got %d", requests.Load()) @@ -220,10 +220,10 @@ func TestDownloadGitSnapshotFallsBackWithoutRange(t *testing.T) { defer srv.Close() api := client.NewWithHTTPClient(srv.URL, srv.Client()) - var dst bufferAt + var dst bytes.Buffer meta, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 8, 4) assert.NoError(t, err) - assert.Equal(t, body, dst.buf) + assert.Equal(t, body, dst.Bytes()) assert.Equal(t, "cafe", meta.Commit) } @@ -234,7 +234,7 @@ func TestDownloadGitSnapshotNotFound(t *testing.T) { defer srv.Close() api := client.NewWithHTTPClient(srv.URL, srv.Client()) - var dst bufferAt + var dst bytes.Buffer _, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 8, 4) assert.True(t, errors.Is(err, os.ErrNotExist)) } diff --git a/client/parallel_get.go b/client/parallel_get.go index be7c9aa..d5e75f8 100644 --- a/client/parallel_get.go +++ b/client/parallel_get.go @@ -6,6 +6,7 @@ import ( "net/http" "strconv" "strings" + "sync/atomic" "github.com/alecthomas/errors" "golang.org/x/sync/errgroup" @@ -19,27 +20,36 @@ type RangeReader interface { Open(ctx context.Context, key Key, opts ...RequestOption) (io.ReadCloser, http.Header, error) } -// ParallelGet downloads an object from any Range-capable RangeReader into dst, -// fetching it in chunkSize-byte chunks concurrently (up to concurrency requests -// in flight) and writing each chunk at its offset via dst.WriteAt. Latency-bound -// backends such as a remote cache can saturate bandwidth with overlapping reads. +// ParallelGet downloads an object from any Range-capable RangeReader and writes +// it, in order, to dst. It fetches the object in chunkSize-byte chunks +// concurrently (up to concurrency requests in flight) but emits a single +// sequential byte stream, so a streaming consumer (e.g. a decompress/extract +// pipeline) can run concurrently with the download. Latency-bound backends such +// as a remote cache can saturate bandwidth with overlapping reads. +// +// Chunks complete out of order, so a bounded reorder buffer holds fetched chunks +// until their turn to be written. A window caps the number of +// fetched-but-unwritten chunks (and thus in-flight fetches) at concurrency, so +// peak memory is O(concurrency * chunkSize) regardless of object size or +// consumer speed. // // The first chunk is fetched with a ranged Open, whose response yields both the // total size (from Content-Range) and the object's ETag; every remaining chunk // is then requested with IfRange pinned to that ETag. If the object changes // mid-download, a chunk's ETag will differ and ParallelGet returns an error -// rather than splicing bytes from two revisions. A missing or truncated chunk -// is likewise reported as an error, so a partially written dst must be discarded -// by the caller on failure. An object with no ETag to pin to (e.g. one stored -// before ETags were recorded) cannot be kept revision-safe across chunks, so it -// falls back to a single full read instead of parallelising. A concurrency of -// 1 likewise reads the whole object in one request, since chunking a single -// worker would only serialise ranged GETs for no benefit. +// rather than splicing bytes from two revisions. A chunk whose length differs +// from the requested range (a short or overlong response, e.g. a backend that +// ignored the range) is likewise reported as an error, so a partially written +// dst must be discarded by the caller on failure. An object with no ETag to pin +// to (e.g. one stored before ETags were recorded) cannot be kept revision-safe +// across chunks, so it falls back to a single full read instead of +// parallelising. A concurrency of 1 likewise reads the whole object in one +// request, since chunking a single worker would only serialise ranged GETs for +// no benefit. // -// dst is written via concurrent WriteAt calls at non-overlapping offsets; the -// caller owns dst's lifecycle (open, close, cleanup) and need not pre-size it, -// as WriteAt extends the destination. -func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, chunkSize int64, concurrency int) error { +// dst is written sequentially from a single goroutine, so it need not be safe +// for concurrent writes. +func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.Writer, chunkSize int64, concurrency int) error { if chunkSize <= 0 { return errors.Errorf("parallel get: chunk size must be positive, got %d", chunkSize) } @@ -52,9 +62,16 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c return fullRead(ctx, c, key, dst) } + // Build the group before discovery so chunk zero's request shares egCtx and + // a sibling chunk's failure cancels it too. defer cancel covers the early + // returns below, which exit without calling eg.Wait. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + eg, egCtx := errgroup.WithContext(ctx) + // Discovery: the first ranged Open delivers chunk zero and reveals the total // size and ETag used to pin the rest. - rc, headers, err := c.Open(ctx, key, Range(0, chunkSize)) + rc, headers, err := c.Open(egCtx, key, Range(0, chunkSize)) if errors.Is(err, ErrRangeNotSatisfiable) { return nil // Empty object: nothing to write. } @@ -74,7 +91,7 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c firstLen = -1 } if !hasRange || total <= chunkSize { - return errors.Wrap(writeChunkAt(dst, 0, firstLen, rc), "parallel get") + return errors.Wrap(copyChunk(dst, firstLen, rc), "parallel get") } // Subsequent chunks are pinned to the discovery ETag via IfRange. Without a @@ -86,67 +103,146 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c if err := rc.Close(); err != nil { return errors.Wrap(err, "parallel get: close discovery reader") } - return fullRead(ctx, c, key, dst) + return fullRead(egCtx, c, key, dst) } - // Multiple chunks: copy the already-open first chunk concurrently with the - // rest rather than blocking on it here. The first goroutine is scheduled - // before the limit can be reached, so it never stalls holding an open body. numChunks := int((total + chunkSize - 1) / chunkSize) - eg, egCtx := errgroup.WithContext(ctx) - eg.SetLimit(concurrency) - eg.Go(func() error { return writeChunkAt(dst, 0, firstLen, rc) }) - for seq := 1; seq < numChunks; seq++ { - // Stop scheduling once a chunk has failed and cancelled the group. - if egCtx.Err() != nil { - break - } - start := int64(seq) * chunkSize - end := min(start+chunkSize, total) - eg.Go(func() error { return fetchChunk(egCtx, c, key, dst, start, end, etag) }) + + // slots is a ring of concurrency reorder buffers carrying chunk bytes from a + // fetching worker to the writer. The window bounds fetched-but-unwritten + // chunks to concurrency, so the outstanding sequence numbers always span a + // window of at most concurrency consecutive values and seq%concurrency is + // unique among them — no two outstanding chunks share a slot. Chunk zero is + // streamed directly from the discovery reader and uses no slot. + slots := make([]chan []byte, concurrency) + for i := range slots { + slots[i] = make(chan []byte, 1) } + + // window bounds fetched-but-unwritten chunks (and in-flight fetches) to + // concurrency. A worker takes a token before fetching; the writer returns + // one after writing a chunk, admitting the next fetch. The channel never + // exceeds capacity because a token is always "held" by the chunk in flight + // or being written at the moment it is returned, so sends never block. + window := make(chan struct{}, concurrency) + for range concurrency { + window <- struct{}{} + } + + var nextSeq atomic.Int64 + nextSeq.Store(1) + for range concurrency { + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case <-window: + } + seq := int(nextSeq.Add(1) - 1) + if seq >= numChunks { + window <- struct{}{} // No work for this token; return it. + return nil + } + start := int64(seq) * chunkSize + end := min(start+chunkSize, total) + data, err := fetchChunk(egCtx, c, key, start, end, etag) + if err != nil { + return err + } + slots[seq%concurrency] <- data // Buffered, single producer: never blocks. + } + }) + } + + // Writer: stream chunk zero from the discovery reader, then emit each + // subsequent chunk in order, returning a window token after each so workers + // can advance. + eg.Go(func() error { + if err := copyChunk(dst, firstLen, rc); err != nil { + return err + } + for seq := 1; seq < numChunks; seq++ { + select { + case <-egCtx.Done(): + return egCtx.Err() + case data := <-slots[seq%concurrency]: + if err := writeAll(dst, data, int64(seq)*chunkSize); err != nil { + return err + } + window <- struct{}{} + } + } + return nil + }) + return errors.Wrap(eg.Wait(), "parallel get") } -// fullRead downloads the entire object in a single request and writes it at -// offset zero. It is used when chunking would add no value (a single worker) or -// cannot be made revision-safe (no ETag to pin). The body is a single -// consistent revision, but its length is unknown up front, so writeChunkAt's -// length check is skipped (-1). -func fullRead(ctx context.Context, c RangeReader, key Key, dst io.WriterAt) error { +// fullRead downloads the entire object in a single request and copies it to dst. +// It is used when chunking would add no value (a single worker) or cannot be +// made revision-safe (no ETag to pin). The body is a single consistent revision, +// but its length is unknown up front, so copyChunk's length check is skipped (-1). +func fullRead(ctx context.Context, c RangeReader, key Key, dst io.Writer) error { rc, _, err := c.Open(ctx, key) if err != nil { return errors.Wrap(err, "parallel get: full read") } - return errors.Wrap(writeChunkAt(dst, 0, -1, rc), "parallel get") + return errors.Wrap(copyChunk(dst, -1, rc), "parallel get") } -// fetchChunk opens the [start, end) range pinned to etag and writes it at start. -// An ETag change (the object was rewritten mid-download) or a short read is -// reported as an error. -func fetchChunk(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, start, end int64, etag string) error { +// copyChunk copies src into dst and closes src. It fails if fewer than want +// bytes arrive; a negative want skips that check (total size unknown). +func copyChunk(dst io.Writer, want int64, src io.ReadCloser) error { + n, copyErr := io.Copy(dst, src) + if err := errors.Join(copyErr, src.Close()); err != nil { + return errors.Errorf("copy chunk: %w", err) + } + if want >= 0 && n != want { + return errors.Errorf("short chunk: copied %d of %d bytes", n, want) + } + return nil +} + +// fetchChunk opens the [start, end) range pinned to etag and returns its bytes. +// An ETag change (the object was rewritten mid-download) or a body whose length +// differs from the requested range (a short read, or an overlong response from a +// backend that ignored the range) is reported as an error. +func fetchChunk(ctx context.Context, c RangeReader, key Key, start, end int64, etag string) ([]byte, error) { rc, headers, err := c.Open(ctx, key, Range(start, end), IfRange(etag)) if err != nil { - return errors.Errorf("open range %d-%d: %w", start, end, err) + return nil, errors.Errorf("open range %d-%d: %w", start, end, err) } if got := headers.Get(ETagKey); got != etag { - return errors.Join( + return nil, errors.Join( errors.Errorf("object changed during read at offset %d: etag %q != %q", start, got, etag), rc.Close(), ) } - return writeChunkAt(dst, start, end-start, rc) + // Read one byte past the expected length so an overlong body (e.g. a backend + // that ignored the range and returned the whole object) is detected rather + // than silently truncated to the wrong bytes. + want := end - start + buf, readErr := io.ReadAll(io.LimitReader(rc, want+1)) + if err := errors.Join(readErr, rc.Close()); err != nil { + return nil, errors.Errorf("read chunk at offset %d: %w", start, err) + } + if int64(len(buf)) != want { + return nil, errors.Errorf("chunk at offset %d: read %d of %d bytes", start, len(buf), want) + } + return buf, nil } -// writeChunkAt streams src into dst at off and closes src. It fails if fewer -// than want bytes arrive; a negative want skips that check (total size unknown). -func writeChunkAt(dst io.WriterAt, off, want int64, src io.ReadCloser) error { - n, copyErr := io.Copy(io.NewOffsetWriter(dst, off), src) - if err := errors.Join(copyErr, src.Close()); err != nil { - return errors.Errorf("write chunk at offset %d: %w", off, err) +// writeAll writes all of data to dst, treating a short write as an error. A +// compliant io.Writer reports short writes via a non-nil error, but the check +// guards against ones that don't. +func writeAll(dst io.Writer, data []byte, offset int64) error { + n, err := dst.Write(data) + if err != nil { + return errors.Errorf("write chunk at offset %d: %w", offset, err) } - if want >= 0 && n != want { - return errors.Errorf("short chunk at offset %d: wrote %d of %d bytes", off, n, want) + if n != len(data) { + return errors.Errorf("write chunk at offset %d: short write %d of %d bytes", offset, n, len(data)) } return nil } diff --git a/client/parallel_get_test.go b/client/parallel_get_test.go index b9d920f..c90408c 100644 --- a/client/parallel_get_test.go +++ b/client/parallel_get_test.go @@ -8,9 +8,12 @@ import ( "net/http" "strconv" "sync" + "sync/atomic" "testing" + "time" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/errors" "github.com/block/cachew/client" ) @@ -19,21 +22,12 @@ import ( // interface ParallelGet drives. var _ client.RangeReader = (*client.Client)(nil) -// bufferAt is an in-memory io.WriterAt that extends like a file, zero-filling -// any gap, so tests can assert reassembly without touching disk. -type bufferAt struct { - mu sync.Mutex - buf []byte -} - -func (b *bufferAt) WriteAt(p []byte, off int64) (int, error) { - b.mu.Lock() - defer b.mu.Unlock() - if end := int(off) + len(p); end > len(b.buf) { - b.buf = append(b.buf, make([]byte, end-len(b.buf))...) +func patternBytes(n int) []byte { + data := make([]byte, n) + for i := range data { + data[i] = byte(i % 251) } - copy(b.buf[off:], p) - return len(p), nil + return data } // rangeFlipReader serves correct byte ranges but reports a different ETag for @@ -67,7 +61,7 @@ func (f *rangeFlipReader) Open(_ context.Context, _ client.Key, opts ...client.R func TestParallelGetETagMismatch(t *testing.T) { c := &rangeFlipReader{data: make([]byte, 1000), firstETag: `"v1"`, restETag: `"v2"`} - var dst bufferAt + var dst bytes.Buffer err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4) assert.Error(t, err) assert.Contains(t, err.Error(), "object changed during read") @@ -97,15 +91,12 @@ func (n *noETagReader) Open(_ context.Context, _ client.Key, opts ...client.Requ func TestParallelGetNoETagMultiChunk(t *testing.T) { // A multi-chunk object with no ETag can't be pinned, so it falls back to a // single full read (backwards compatible with objects stored before ETags). - data := make([]byte, 1000) - for i := range data { - data[i] = byte(i % 251) - } + data := patternBytes(1000) c := &noETagReader{data: data} - var dst bufferAt + var dst bytes.Buffer err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4) assert.NoError(t, err) - assert.Equal(t, data, dst.buf) + assert.Equal(t, data, dst.Bytes()) } func TestParallelGetNoETagSingleChunk(t *testing.T) { @@ -113,10 +104,10 @@ func TestParallelGetNoETagSingleChunk(t *testing.T) { // revision, so it succeeds without pinning. data := []byte("0123456789") c := &noETagReader{data: data} - var dst bufferAt + var dst bytes.Buffer err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4) assert.NoError(t, err) - assert.Equal(t, data, dst.buf) + assert.Equal(t, data, dst.Bytes()) } // changingSizeReader serves a multi-chunk body with no ETag on the ranged @@ -147,6 +138,18 @@ func (c *changingSizeReader) Open(_ context.Context, _ client.Key, opts ...clien return io.NopCloser(bytes.NewReader(c.discovery[start : start+length])), headers, nil } +func TestParallelGetNoETagSizeChangedBetweenRequests(t *testing.T) { + // A no-ETag multi-chunk object falls back to a single full read. If it is + // rewritten to a different size between discovery and that read, the + // discovery total must not be used to validate the full body: the full read + // is itself a consistent revision and should be accepted in its entirety. + c := &changingSizeReader{discovery: make([]byte, 1000), rewritten: []byte("changed")} + var dst bytes.Buffer + err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4) + assert.NoError(t, err) + assert.Equal(t, c.rewritten, dst.Bytes()) +} + // recordingReader serves byte ranges and records the Range option of every // Open call ("" for a full, non-ranged read), so tests can assert how the // object was fetched. @@ -181,29 +184,170 @@ func (r *recordingReader) Open(_ context.Context, _ client.Key, opts ...client.R return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil } +func TestParallelGetReassembly(t *testing.T) { + // A multi-chunk object must be emitted to the writer as the original, + // in-order byte stream despite being fetched concurrently. + data := patternBytes(10_000) + c := &recordingReader{data: data, etag: `"v1"`} + var dst bytes.Buffer + err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 1000, 4) + assert.NoError(t, err) + assert.Equal(t, data, dst.Bytes()) +} + func TestParallelGetSingleWorkerFullRead(t *testing.T) { // A concurrency of 1 gains nothing from chunking, so it must issue a single // non-ranged read rather than discovering and serialising ranged GETs. - data := make([]byte, 1000) - for i := range data { - data[i] = byte(i % 251) - } + data := patternBytes(1000) c := &recordingReader{data: data, etag: `"v1"`} - var dst bufferAt + var dst bytes.Buffer err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 1) assert.NoError(t, err) - assert.Equal(t, data, dst.buf) + assert.Equal(t, data, dst.Bytes()) assert.Equal(t, []string{""}, c.opens) } -func TestParallelGetNoETagSizeChangedBetweenRequests(t *testing.T) { - // A no-ETag multi-chunk object falls back to a single full read. If it is - // rewritten to a different size between discovery and that read, the - // discovery total must not be used to validate the full body: the full read - // is itself a consistent revision and should be accepted in its entirety. - c := &changingSizeReader{discovery: make([]byte, 1000), rewritten: []byte("changed")} - var dst bufferAt +func TestParallelGetEmptyObject(t *testing.T) { + c := &recordingReader{data: nil, etag: `"v1"`} + var dst bytes.Buffer err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4) assert.NoError(t, err) - assert.Equal(t, c.rewritten, dst.buf) + assert.Equal(t, 0, dst.Len()) +} + +func TestParallelGetServerIgnoresRange(t *testing.T) { + // A backend that ignores the range header delivers the whole object on the + // discovery request; it must be streamed in full. + data := patternBytes(1000) + c := &ignoreRangeReader{data: data} + var dst bytes.Buffer + err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 100, 4) + assert.NoError(t, err) + assert.Equal(t, data, dst.Bytes()) +} + +func TestParallelGetOutOfOrderCompletion(t *testing.T) { + // Chunks deliberately complete in reverse order within the in-flight window; + // the writer must still emit a correctly ordered stream. + data := patternBytes(10_000) + c := &reorderReader{data: data, etag: `"v1"`, chunkSize: 1000} + var dst bytes.Buffer + err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 1000, 4) + assert.NoError(t, err) + assert.Equal(t, data, dst.Bytes()) +} + +func TestParallelGetPropagatesOpenError(t *testing.T) { + // An error opening a non-first chunk must surface and cancel the download. + c := &failingChunkReader{data: patternBytes(10_000), etag: `"v1"`, failAtStart: 5000} + var dst bytes.Buffer + err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 1000, 4) + assert.Error(t, err) + assert.Contains(t, err.Error(), "boom") +} + +func TestParallelGetRejectsOverlongChunk(t *testing.T) { + // A backend that honours the discovery range but ignores a later chunk's + // range — returning the whole object with the same ETag — must be detected + // rather than emitting truncated, mis-aligned bytes. + c := &fullBodyOnChunkReader{data: patternBytes(10_000), etag: `"v1"`} + var dst bytes.Buffer + err := client.ParallelGet(context.Background(), c, client.NewKey("k"), &dst, 1000, 4) + assert.Error(t, err) + assert.Contains(t, err.Error(), "of 1000 bytes") +} + +// fullBodyOnChunkReader honours the discovery range (start 0) with a proper 206 +// but ignores the range on any later chunk, returning the entire object with the +// same ETag — modelling a backend that degrades to full responses mid-download. +type fullBodyOnChunkReader struct { + data []byte + etag string +} + +func (r *fullBodyOnChunkReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) { + size := int64(len(r.data)) + start, length, outcome := client.NewRequestOptions(opts...).ResolveRange(size, r.etag) + headers := http.Header{} + headers.Set(client.ETagKey, r.etag) + if outcome == client.RangePartial && start == 0 { + headers.Set("Content-Length", strconv.FormatInt(length, 10)) + headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size)) + return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil + } + headers.Set("Content-Length", strconv.FormatInt(size, 10)) + return io.NopCloser(bytes.NewReader(r.data)), headers, nil +} + +// ignoreRangeReader returns the whole object with no Content-Range regardless of +// the requested range, modelling a backend that doesn't honour ranges. +type ignoreRangeReader struct{ data []byte } + +func (r *ignoreRangeReader) Open(_ context.Context, _ client.Key, _ ...client.RequestOption) (io.ReadCloser, http.Header, error) { + headers := http.Header{} + headers.Set("Content-Length", strconv.Itoa(len(r.data))) + return io.NopCloser(bytes.NewReader(r.data)), headers, nil +} + +// reorderReader serves correct byte ranges but delays earlier offsets longer +// than later ones, so within the in-flight window chunks complete out of order +// and the writer must buffer and reorder them. +type reorderReader struct { + data []byte + etag string + chunkSize int64 +} + +func (r *reorderReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) { + size := int64(len(r.data)) + o := client.NewRequestOptions(opts...) + start, length, outcome := o.ResolveRange(size, r.etag) + headers := http.Header{} + if outcome == client.RangeNotSatisfiable { + headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size)) + return nil, headers, client.ErrRangeNotSatisfiable + } + // Earlier chunks within a window sleep longer, so higher offsets finish + // first and the writer is forced to reorder. + if outcome == client.RangePartial { + chunks := (size - start) / r.chunkSize + time.Sleep(time.Duration(chunks) * time.Millisecond) + } + headers.Set(client.ETagKey, r.etag) + headers.Set("Content-Length", strconv.FormatInt(length, 10)) + if outcome == client.RangePartial { + headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size)) + } + return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil +} + +// failingChunkReader serves ranges normally but errors when the requested range +// starts at failAtStart, modelling a mid-download fetch failure. +type failingChunkReader struct { + data []byte + etag string + failAtStart int64 + + opens atomic.Int64 +} + +func (r *failingChunkReader) Open(_ context.Context, _ client.Key, opts ...client.RequestOption) (io.ReadCloser, http.Header, error) { + r.opens.Add(1) + size := int64(len(r.data)) + o := client.NewRequestOptions(opts...) + start, length, outcome := o.ResolveRange(size, r.etag) + if outcome == client.RangePartial && start == r.failAtStart { + return nil, nil, errors.New("boom") + } + headers := http.Header{} + if outcome == client.RangeNotSatisfiable { + headers.Set("Content-Range", fmt.Sprintf("bytes */%d", size)) + return nil, headers, client.ErrRangeNotSatisfiable + } + headers.Set(client.ETagKey, r.etag) + headers.Set("Content-Length", strconv.FormatInt(length, 10)) + if outcome == client.RangePartial { + headers.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, start+length-1, size)) + } + return io.NopCloser(bytes.NewReader(r.data[start : start+length])), headers, nil } diff --git a/internal/cache/parallel_get.go b/internal/cache/parallel_get.go index 55458d0..bffbd7c 100644 --- a/internal/cache/parallel_get.go +++ b/internal/cache/parallel_get.go @@ -9,9 +9,10 @@ import ( "github.com/block/cachew/client" ) -// ParallelGet downloads an object from any Range-capable Cache into dst, -// fetching it in chunkSize-byte chunks concurrently. It delegates to -// [client.ParallelGet]; see that function for the full semantics. -func ParallelGet(ctx context.Context, c Cache, key Key, dst io.WriterAt, chunkSize int64, concurrency int) error { +// ParallelGet downloads an object from any Range-capable Cache, fetching it in +// chunkSize-byte chunks concurrently and writing the reassembled, in-order bytes +// to dst. It delegates to [client.ParallelGet]; see that function for the full +// semantics. +func ParallelGet(ctx context.Context, c Cache, key Key, dst io.Writer, chunkSize int64, concurrency int) error { return errors.WithStack(client.ParallelGet(ctx, c, key, dst, chunkSize, concurrency)) } diff --git a/internal/cache/parallel_get_test.go b/internal/cache/parallel_get_test.go index 1baa3bb..c589a94 100644 --- a/internal/cache/parallel_get_test.go +++ b/internal/cache/parallel_get_test.go @@ -1,11 +1,11 @@ package cache_test import ( + "bytes" "context" "io" "log/slog" "os" - "sync" "testing" "time" @@ -15,23 +15,6 @@ import ( "github.com/block/cachew/internal/logging" ) -// bufferAt is an in-memory io.WriterAt that extends like a file, zero-filling -// any gap, so tests can assert reassembly without touching disk. -type bufferAt struct { - mu sync.Mutex - buf []byte -} - -func (b *bufferAt) WriteAt(p []byte, off int64) (int, error) { - b.mu.Lock() - defer b.mu.Unlock() - if end := int(off) + len(p); end > len(b.buf) { - b.buf = append(b.buf, make([]byte, end-len(b.buf))...) - } - copy(b.buf[off:], p) - return len(p), nil -} - func TestParallelGet(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) c, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour}) @@ -61,10 +44,10 @@ func TestParallelGet(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var dst bufferAt + var dst bytes.Buffer err := cache.ParallelGet(ctx, c, key, &dst, tt.chunkSize, tt.concurrency) assert.NoError(t, err) - assert.Equal(t, content, dst.buf) + assert.Equal(t, content, dst.Bytes()) }) } } @@ -83,9 +66,9 @@ func TestParallelGetEmptyObject(t *testing.T) { // concurrency 4 takes the ranged discovery path (ErrRangeNotSatisfiable), // concurrency 1 takes the up-front full-read path; both must yield nothing. for _, concurrency := range []int{4, 1} { - var dst bufferAt + var dst bytes.Buffer assert.NoError(t, cache.ParallelGet(ctx, c, key, &dst, 100, concurrency)) - assert.Equal(t, 0, len(dst.buf)) + assert.Equal(t, 0, dst.Len()) } } @@ -95,7 +78,7 @@ func TestParallelGetNotFound(t *testing.T) { assert.NoError(t, err) defer c.Close() - var dst bufferAt + var dst bytes.Buffer err = cache.ParallelGet(ctx, c, cache.NewKey("missing"), &dst, 100, 4) assert.IsError(t, err, os.ErrNotExist) }