Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions client/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,15 @@ type GitSnapshotMetadata struct {
BundleURL string
}

// DownloadGitSnapshot fetches the working-tree snapshot for repoURL into dst,
// using up to concurrency concurrent range requests of chunkSize bytes each.
// When concurrency is 1, or the server does not support ranges, it transparently
// falls back to a single full download. dst is written at non-overlapping
// offsets via WriteAt (e.g. an *os.File) and the caller owns its lifecycle. It
// returns the snapshot's freshen metadata, read from the discovery response.
// Returns os.ErrNotExist when the server has no snapshot available.
func (c *Client) DownloadGitSnapshot(ctx context.Context, repoURL string, dst io.WriterAt, chunkSize int64, concurrency int) (GitSnapshotMetadata, error) {
// DownloadGitSnapshot fetches the working-tree snapshot for repoURL and writes
// it, in order, to dst, using up to concurrency concurrent range requests of
// chunkSize bytes each. When concurrency is 1, or the server does not support
// ranges, it transparently falls back to a single full download. Because dst is
// written sequentially, a streaming consumer (e.g. a decompress/extract
// pipeline) can run concurrently with the download. It returns the snapshot's
// freshen metadata, read from the discovery response. Returns os.ErrNotExist
// when the server has no snapshot available.
func (c *Client) DownloadGitSnapshot(ctx context.Context, repoURL string, dst io.Writer, chunkSize int64, concurrency int) (GitSnapshotMetadata, error) {
endpoint, err := gitEndpointURL(c.baseURL, repoURL, "snapshot.tar.zst")
if err != nil {
return GitSnapshotMetadata{}, err
Expand Down
10 changes: 5 additions & 5 deletions client/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,12 @@ func TestDownloadGitSnapshotParallel(t *testing.T) {
defer srv.Close()

api := client.NewWithHTTPClient(srv.URL, srv.Client())
var dst bufferAt
var dst bytes.Buffer
// A 128-byte chunk over a 1000-byte body forces multiple chunks, exercising
// concurrent range reassembly.
meta, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 128, 4)
assert.NoError(t, err)
assert.Equal(t, body, dst.buf)
assert.Equal(t, body, dst.Bytes())
assert.Equal(t, "deadbeef", meta.Commit)
assert.Equal(t, "/git/github.com/org/repo/snapshot.bundle?base=deadbeef", meta.BundleURL)
assert.True(t, requests.Load() > 1, "expected multiple range requests, got %d", requests.Load())
Expand All @@ -220,10 +220,10 @@ func TestDownloadGitSnapshotFallsBackWithoutRange(t *testing.T) {
defer srv.Close()

api := client.NewWithHTTPClient(srv.URL, srv.Client())
var dst bufferAt
var dst bytes.Buffer
meta, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 8, 4)
assert.NoError(t, err)
assert.Equal(t, body, dst.buf)
assert.Equal(t, body, dst.Bytes())
assert.Equal(t, "cafe", meta.Commit)
}

Expand All @@ -234,7 +234,7 @@ func TestDownloadGitSnapshotNotFound(t *testing.T) {
defer srv.Close()

api := client.NewWithHTTPClient(srv.URL, srv.Client())
var dst bufferAt
var dst bytes.Buffer
_, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 8, 4)
assert.True(t, errors.Is(err, os.ErrNotExist))
}
204 changes: 150 additions & 54 deletions client/parallel_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"strconv"
"strings"
"sync/atomic"

"github.com/alecthomas/errors"
"golang.org/x/sync/errgroup"
Expand All @@ -19,27 +20,36 @@ type RangeReader interface {
Open(ctx context.Context, key Key, opts ...RequestOption) (io.ReadCloser, http.Header, error)
}

// ParallelGet downloads an object from any Range-capable RangeReader into dst,
// fetching it in chunkSize-byte chunks concurrently (up to concurrency requests
// in flight) and writing each chunk at its offset via dst.WriteAt. Latency-bound
// backends such as a remote cache can saturate bandwidth with overlapping reads.
// ParallelGet downloads an object from any Range-capable RangeReader and writes
// it, in order, to dst. It fetches the object in chunkSize-byte chunks
// concurrently (up to concurrency requests in flight) but emits a single
// sequential byte stream, so a streaming consumer (e.g. a decompress/extract
// pipeline) can run concurrently with the download. Latency-bound backends such
// as a remote cache can saturate bandwidth with overlapping reads.
//
// Chunks complete out of order, so a bounded reorder buffer holds fetched chunks
// until their turn to be written. A window caps the number of
// fetched-but-unwritten chunks (and thus in-flight fetches) at concurrency, so
// peak memory is O(concurrency * chunkSize) regardless of object size or
// consumer speed.
//
// The first chunk is fetched with a ranged Open, whose response yields both the
// total size (from Content-Range) and the object's ETag; every remaining chunk
// is then requested with IfRange pinned to that ETag. If the object changes
// mid-download, a chunk's ETag will differ and ParallelGet returns an error
// rather than splicing bytes from two revisions. A missing or truncated chunk
// is likewise reported as an error, so a partially written dst must be discarded
// by the caller on failure. An object with no ETag to pin to (e.g. one stored
// before ETags were recorded) cannot be kept revision-safe across chunks, so it
// falls back to a single full read instead of parallelising. A concurrency of
// 1 likewise reads the whole object in one request, since chunking a single
// worker would only serialise ranged GETs for no benefit.
// rather than splicing bytes from two revisions. A chunk whose length differs
// from the requested range (a short or overlong response, e.g. a backend that
// ignored the range) is likewise reported as an error, so a partially written
// dst must be discarded by the caller on failure. An object with no ETag to pin
// to (e.g. one stored before ETags were recorded) cannot be kept revision-safe
// across chunks, so it falls back to a single full read instead of
// parallelising. A concurrency of 1 likewise reads the whole object in one
// request, since chunking a single worker would only serialise ranged GETs for
// no benefit.
//
// dst is written via concurrent WriteAt calls at non-overlapping offsets; the
// caller owns dst's lifecycle (open, close, cleanup) and need not pre-size it,
// as WriteAt extends the destination.
func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, chunkSize int64, concurrency int) error {
// dst is written sequentially from a single goroutine, so it need not be safe
// for concurrent writes.
func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.Writer, chunkSize int64, concurrency int) error {
if chunkSize <= 0 {
return errors.Errorf("parallel get: chunk size must be positive, got %d", chunkSize)
}
Expand All @@ -52,9 +62,16 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
return fullRead(ctx, c, key, dst)
}

// Build the group before discovery so chunk zero's request shares egCtx and
// a sibling chunk's failure cancels it too. defer cancel covers the early
// returns below, which exit without calling eg.Wait.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
eg, egCtx := errgroup.WithContext(ctx)

// Discovery: the first ranged Open delivers chunk zero and reveals the total
// size and ETag used to pin the rest.
rc, headers, err := c.Open(ctx, key, Range(0, chunkSize))
rc, headers, err := c.Open(egCtx, key, Range(0, chunkSize))
if errors.Is(err, ErrRangeNotSatisfiable) {
return nil // Empty object: nothing to write.
}
Expand All @@ -74,7 +91,7 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
firstLen = -1
}
if !hasRange || total <= chunkSize {
return errors.Wrap(writeChunkAt(dst, 0, firstLen, rc), "parallel get")
return errors.Wrap(copyChunk(dst, firstLen, rc), "parallel get")
}

// Subsequent chunks are pinned to the discovery ETag via IfRange. Without a
Expand All @@ -86,67 +103,146 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
if err := rc.Close(); err != nil {
return errors.Wrap(err, "parallel get: close discovery reader")
}
return fullRead(ctx, c, key, dst)
return fullRead(egCtx, c, key, dst)
}

// Multiple chunks: copy the already-open first chunk concurrently with the
// rest rather than blocking on it here. The first goroutine is scheduled
// before the limit can be reached, so it never stalls holding an open body.
numChunks := int((total + chunkSize - 1) / chunkSize)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
eg.Go(func() error { return writeChunkAt(dst, 0, firstLen, rc) })
for seq := 1; seq < numChunks; seq++ {
// Stop scheduling once a chunk has failed and cancelled the group.
if egCtx.Err() != nil {
break
}
start := int64(seq) * chunkSize
end := min(start+chunkSize, total)
eg.Go(func() error { return fetchChunk(egCtx, c, key, dst, start, end, etag) })

// slots is a ring of concurrency reorder buffers carrying chunk bytes from a
// fetching worker to the writer. The window bounds fetched-but-unwritten
// chunks to concurrency, so the outstanding sequence numbers always span a
// window of at most concurrency consecutive values and seq%concurrency is
// unique among them — no two outstanding chunks share a slot. Chunk zero is
// streamed directly from the discovery reader and uses no slot.
slots := make([]chan []byte, concurrency)
for i := range slots {
slots[i] = make(chan []byte, 1)
}

// window bounds fetched-but-unwritten chunks (and in-flight fetches) to
// concurrency. A worker takes a token before fetching; the writer returns
// one after writing a chunk, admitting the next fetch. The channel never
// exceeds capacity because a token is always "held" by the chunk in flight
// or being written at the moment it is returned, so sends never block.
window := make(chan struct{}, concurrency)
for range concurrency {
window <- struct{}{}
}

var nextSeq atomic.Int64
nextSeq.Store(1)
for range concurrency {
eg.Go(func() error {
for {
select {
case <-egCtx.Done():
return egCtx.Err()
case <-window:
}
seq := int(nextSeq.Add(1) - 1)
if seq >= numChunks {
window <- struct{}{} // No work for this token; return it.
return nil
}
start := int64(seq) * chunkSize
end := min(start+chunkSize, total)
data, err := fetchChunk(egCtx, c, key, start, end, etag)
if err != nil {
return err
}
slots[seq%concurrency] <- data // Buffered, single producer: never blocks.
}
})
}

// Writer: stream chunk zero from the discovery reader, then emit each
// subsequent chunk in order, returning a window token after each so workers
// can advance.
eg.Go(func() error {
if err := copyChunk(dst, firstLen, rc); err != nil {
return err
}
for seq := 1; seq < numChunks; seq++ {
select {
case <-egCtx.Done():
return egCtx.Err()
case data := <-slots[seq%concurrency]:
if err := writeAll(dst, data, int64(seq)*chunkSize); err != nil {
return err
}
window <- struct{}{}
}
}
return nil
})

return errors.Wrap(eg.Wait(), "parallel get")
}

// fullRead downloads the entire object in a single request and writes it at
// offset zero. It is used when chunking would add no value (a single worker) or
// cannot be made revision-safe (no ETag to pin). The body is a single
// consistent revision, but its length is unknown up front, so writeChunkAt's
// length check is skipped (-1).
func fullRead(ctx context.Context, c RangeReader, key Key, dst io.WriterAt) error {
// fullRead downloads the entire object in a single request and copies it to dst.
// It is used when chunking would add no value (a single worker) or cannot be
// made revision-safe (no ETag to pin). The body is a single consistent revision,
// but its length is unknown up front, so copyChunk's length check is skipped (-1).
func fullRead(ctx context.Context, c RangeReader, key Key, dst io.Writer) error {
rc, _, err := c.Open(ctx, key)
if err != nil {
return errors.Wrap(err, "parallel get: full read")
}
return errors.Wrap(writeChunkAt(dst, 0, -1, rc), "parallel get")
return errors.Wrap(copyChunk(dst, -1, rc), "parallel get")
}

// fetchChunk opens the [start, end) range pinned to etag and writes it at start.
// An ETag change (the object was rewritten mid-download) or a short read is
// reported as an error.
func fetchChunk(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, start, end int64, etag string) error {
// copyChunk copies src into dst and closes src. It fails if fewer than want
// bytes arrive; a negative want skips that check (total size unknown).
func copyChunk(dst io.Writer, want int64, src io.ReadCloser) error {
n, copyErr := io.Copy(dst, src)
if err := errors.Join(copyErr, src.Close()); err != nil {
return errors.Errorf("copy chunk: %w", err)
}
if want >= 0 && n != want {
return errors.Errorf("short chunk: copied %d of %d bytes", n, want)
}
return nil
}

// fetchChunk opens the [start, end) range pinned to etag and returns its bytes.
// An ETag change (the object was rewritten mid-download) or a body whose length
// differs from the requested range (a short read, or an overlong response from a
// backend that ignored the range) is reported as an error.
func fetchChunk(ctx context.Context, c RangeReader, key Key, start, end int64, etag string) ([]byte, error) {
rc, headers, err := c.Open(ctx, key, Range(start, end), IfRange(etag))
if err != nil {
return errors.Errorf("open range %d-%d: %w", start, end, err)
return nil, errors.Errorf("open range %d-%d: %w", start, end, err)
}
if got := headers.Get(ETagKey); got != etag {
return errors.Join(
return nil, errors.Join(
errors.Errorf("object changed during read at offset %d: etag %q != %q", start, got, etag),
rc.Close(),
)
}
return writeChunkAt(dst, start, end-start, rc)
// Read one byte past the expected length so an overlong body (e.g. a backend
// that ignored the range and returned the whole object) is detected rather
// than silently truncated to the wrong bytes.
want := end - start
buf, readErr := io.ReadAll(io.LimitReader(rc, want+1))
if err := errors.Join(readErr, rc.Close()); err != nil {
return nil, errors.Errorf("read chunk at offset %d: %w", start, err)
}
if int64(len(buf)) != want {
return nil, errors.Errorf("chunk at offset %d: read %d of %d bytes", start, len(buf), want)
}
return buf, nil
}

// writeChunkAt streams src into dst at off and closes src. It fails if fewer
// than want bytes arrive; a negative want skips that check (total size unknown).
func writeChunkAt(dst io.WriterAt, off, want int64, src io.ReadCloser) error {
n, copyErr := io.Copy(io.NewOffsetWriter(dst, off), src)
if err := errors.Join(copyErr, src.Close()); err != nil {
return errors.Errorf("write chunk at offset %d: %w", off, err)
// writeAll writes all of data to dst, treating a short write as an error. A
// compliant io.Writer reports short writes via a non-nil error, but the check
// guards against ones that don't.
func writeAll(dst io.Writer, data []byte, offset int64) error {
n, err := dst.Write(data)
if err != nil {
return errors.Errorf("write chunk at offset %d: %w", offset, err)
}
if want >= 0 && n != want {
return errors.Errorf("short chunk at offset %d: wrote %d of %d bytes", off, n, want)
if n != len(data) {
return errors.Errorf("write chunk at offset %d: short write %d of %d bytes", offset, n, len(data))
}
return nil
}
Expand Down
Loading