Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions client/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os"
"strings"
"sync"

"github.com/alecthomas/errors"
)
Expand Down Expand Up @@ -183,6 +184,94 @@ func (c *Client) openGitArtifact(ctx context.Context, repoURL, suffix string) (*
}, nil
}

// GitSnapshotMetadata carries the freshen metadata returned alongside a
// parallel snapshot download. Commit is the mirror's HEAD SHA at snapshot time
// (empty for cold serves); BundleURL, when non-empty, points at a delta bundle
// that brings the snapshot up to the mirror's current HEAD.
type GitSnapshotMetadata struct {
Commit string
BundleURL string
}

// DownloadGitSnapshot fetches the working-tree snapshot for repoURL into dst,
// using up to concurrency concurrent range requests of chunkSize bytes each.
// When concurrency is 1, or the server does not support ranges, it transparently
// falls back to a single full download. dst is written at non-overlapping
// offsets via WriteAt (e.g. an *os.File) and the caller owns its lifecycle. It
// returns the snapshot's freshen metadata, read from the discovery response.
// Returns os.ErrNotExist when the server has no snapshot available.
func (c *Client) DownloadGitSnapshot(ctx context.Context, repoURL string, dst io.WriterAt, chunkSize int64, concurrency int) (GitSnapshotMetadata, error) {
endpoint, err := gitEndpointURL(c.baseURL, repoURL, "snapshot.tar.zst")
if err != nil {
return GitSnapshotMetadata{}, err
}
reader := &gitArtifactRangeReader{client: c, endpoint: endpoint}
if err := ParallelGet(ctx, reader, NewKey(repoURL), dst, chunkSize, concurrency); err != nil {
return GitSnapshotMetadata{}, errors.Wrap(err, "download snapshot")
}
return reader.metadata(), nil
}

// gitArtifactRangeReader adapts a git artifact endpoint to the RangeReader
// interface so ParallelGet can fetch it with concurrent range requests. The
// object's identity is the endpoint URL, so the Key argument is ignored. It
// records the first response's headers, which carry the snapshot's freshen
// metadata (delivered on the discovery chunk) that ParallelGet does not surface.
type gitArtifactRangeReader struct {
client *Client
endpoint string

mu sync.Mutex
discovery http.Header
}

func (g *gitArtifactRangeReader) Open(ctx context.Context, _ Key, opts ...RequestOption) (io.ReadCloser, http.Header, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, g.endpoint, nil)
if err != nil {
return nil, nil, errors.Wrap(err, "create request")
}
NewRequestOptions(opts...).applyToRequest(req)

resp, err := g.client.http.Do(req)
if err != nil {
return nil, nil, errors.Wrap(err, "execute request")
}
switch resp.StatusCode {
case http.StatusOK, http.StatusPartialContent:
g.recordDiscovery(resp.Header)
return resp.Body, resp.Header, nil
case http.StatusNotFound:
_, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec
return nil, nil, errors.Join(os.ErrNotExist, resp.Body.Close())
case http.StatusRequestedRangeNotSatisfiable:
_, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec
g.recordDiscovery(resp.Header)
return nil, resp.Header, errors.Join(ErrRangeNotSatisfiable, resp.Body.Close())
default:
_, _ = io.Copy(io.Discard, resp.Body) //nolint:errcheck,gosec
return nil, nil, errors.Join(errors.WithStack(&HTTPStatusError{StatusCode: resp.StatusCode}), resp.Body.Close())
}
}

// recordDiscovery stores the first response's headers so the freshen metadata
// they carry survives after the bodies are consumed.
func (g *gitArtifactRangeReader) recordDiscovery(h http.Header) {
g.mu.Lock()
defer g.mu.Unlock()
if g.discovery == nil {
g.discovery = h.Clone()
}
}

func (g *gitArtifactRangeReader) metadata() GitSnapshotMetadata {
g.mu.Lock()
defer g.mu.Unlock()
return GitSnapshotMetadata{
Commit: g.discovery.Get(SnapshotCommitHeader),
BundleURL: g.discovery.Get(BundleURLHeader),
}
}

// gitEndpointURL builds a /git/{host}/{repoPath}/{suffix} URL from a cachew
// base URL and an upstream repository URL (e.g. https://github.com/org/repo).
func gitEndpointURL(baseURL, repoURL, suffix string) (string, error) {
Expand Down
67 changes: 67 additions & 0 deletions client/git_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package client_test

import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"os"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/errors"
Expand Down Expand Up @@ -171,3 +174,67 @@ func TestOpenGitBundleNotFound(t *testing.T) {
_, err := api.OpenGitBundle(context.Background(), "/git/x/y/snapshot.bundle")
assert.True(t, errors.Is(err, os.ErrNotExist))
}

func TestDownloadGitSnapshotParallel(t *testing.T) {
body := make([]byte, 1000)
for i := range body {
body[i] = byte(i % 251)
}
const etag = `"snap-v1"`

var requests atomic.Int64
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/git/github.com/org/repo/snapshot.tar.zst", r.URL.Path)
requests.Add(1)
w.Header().Set("Content-Type", "application/zstd")
w.Header().Set("ETag", etag)
w.Header().Set(client.SnapshotCommitHeader, "deadbeef")
w.Header().Set(client.BundleURLHeader, "/git/github.com/org/repo/snapshot.bundle?base=deadbeef")
// ServeContent honours Range/If-Range against the ETag set above, so it
// returns 206 + Content-Range for the chunked requests ParallelGet makes.
http.ServeContent(w, r, "snapshot.tar.zst", time.Time{}, bytes.NewReader(body))
}))
defer srv.Close()

api := client.NewWithHTTPClient(srv.URL, srv.Client())
var dst bufferAt
// A 128-byte chunk over a 1000-byte body forces multiple chunks, exercising
// concurrent range reassembly.
meta, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 128, 4)
assert.NoError(t, err)
assert.Equal(t, body, dst.buf)
assert.Equal(t, "deadbeef", meta.Commit)
assert.Equal(t, "/git/github.com/org/repo/snapshot.bundle?base=deadbeef", meta.BundleURL)
assert.True(t, requests.Load() > 1, "expected multiple range requests, got %d", requests.Load())
}

func TestDownloadGitSnapshotFallsBackWithoutRange(t *testing.T) {
body := []byte("full body, server ignores ranges")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
// No ETag and no range handling: always answer the full object with 200,
// mimicking an older server. ParallelGet must fall back to a single read.
w.Header().Set("Content-Type", "application/zstd")
w.Header().Set(client.SnapshotCommitHeader, "cafe")
_, _ = w.Write(body) //nolint:errcheck
}))
defer srv.Close()

api := client.NewWithHTTPClient(srv.URL, srv.Client())
var dst bufferAt
meta, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 8, 4)
assert.NoError(t, err)
assert.Equal(t, body, dst.buf)
assert.Equal(t, "cafe", meta.Commit)
}

func TestDownloadGitSnapshotNotFound(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}))
defer srv.Close()

api := client.NewWithHTTPClient(srv.URL, srv.Client())
var dst bufferAt
_, err := api.DownloadGitSnapshot(context.Background(), "https://github.com/org/repo", &dst, 8, 4)
assert.True(t, errors.Is(err, os.ErrNotExist))
}
157 changes: 117 additions & 40 deletions cmd/cachew/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ type GitRestoreCmd struct {
Commit []string `help:"Required commit SHAs that must exist on the server, regardless of which ref points at them. May be repeated."`
NoBundle bool `help:"Skip applying delta bundle."`
ZstdThreads int `help:"Threads for zstd decompression (0 = all CPU cores)." default:"0"`
// DownloadConcurrency > 1 fetches the snapshot with that many concurrent
// range requests (requires server range support; falls back to a single
// request otherwise). 1 keeps the streaming single-request download.
DownloadConcurrency int `help:"Concurrent range requests for the snapshot download (1 = single streaming request)." default:"1"`
DownloadChunkSizeMB int `help:"Chunk size in MiB for parallel snapshot downloads." default:"8"`
}

func (c *GitRestoreCmd) Run(ctx context.Context, api *client.Client) error {
Expand All @@ -72,56 +77,22 @@ func (c *GitRestoreCmd) Run(ctx context.Context, api *client.Client) error {

fmt.Fprintf(os.Stderr, "Fetching snapshot for %s\n", c.RepoURL) //nolint:forbidigo

var snap *client.GitSnapshot
if err := inSpan(ctx, "cachew.download_snapshot",
[]attribute.KeyValue{attribute.String("cachew.repo_url", c.RepoURL)},
func(ctx context.Context) error {
downloadStart := time.Now()
s, err := api.OpenGitSnapshot(ctx, c.RepoURL)
if err != nil {
return err //nolint:wrapcheck // wrapped by caller
}
snap = s
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("cachew.snapshot_commit", s.Commit),
attribute.String("cachew.bundle_url", s.BundleURL),
attribute.Float64("cachew.elapsed_seconds", time.Since(downloadStart).Seconds()),
)
return nil
}); err != nil {
commit, bundleURL, err := c.fetchAndExtractSnapshot(ctx, api)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return errors.Errorf("no snapshot available for %s", c.RepoURL)
}
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return errors.Wrap(err, "fetch snapshot")
}
defer snap.Close()
span.SetAttributes(attribute.String("cachew.snapshot_commit", snap.Commit))

fmt.Fprintf(os.Stderr, "Extracting to %s...\n", c.Directory) //nolint:forbidigo
if err := inSpan(ctx, "cachew.extract",
[]attribute.KeyValue{attribute.String("cachew.directory", c.Directory)},
func(ctx context.Context) error {
extractStart := time.Now()
if err := snapshot.Extract(ctx, snap.Body, c.Directory, c.ZstdThreads); err != nil {
return err //nolint:wrapcheck // wrapped by caller
}
elapsed := time.Since(extractStart)
trace.SpanFromContext(ctx).SetAttributes(attribute.Float64("cachew.elapsed_seconds", elapsed.Seconds()))
fmt.Fprintf(os.Stderr, "Snapshot extracted in %s\n", elapsed) //nolint:forbidigo
return nil
}); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return errors.Wrap(err, "extract snapshot")
return errors.Wrap(err, "restore snapshot")
}
span.SetAttributes(attribute.String("cachew.snapshot_commit", commit))
fmt.Fprintf(os.Stderr, "Snapshot restored to %s\n", c.Directory) //nolint:forbidigo

if snap.BundleURL != "" && !c.NoBundle {
if bundleURL != "" && !c.NoBundle {
fmt.Fprintf(os.Stderr, "Applying delta bundle...\n") //nolint:forbidigo
bundleStart := time.Now()
if err := applyBundle(ctx, api, snap.BundleURL, c.Directory); err != nil {
if err := applyBundle(ctx, api, bundleURL, c.Directory); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to apply delta bundle: %v\n", err) //nolint:forbidigo
span.RecordError(err)
} else {
Expand All @@ -144,6 +115,112 @@ func (c *GitRestoreCmd) Run(ctx context.Context, api *client.Client) error {
return nil
}

// fetchAndExtractSnapshot downloads the snapshot and extracts it into the target
// directory, returning its freshen metadata (commit and bundle URL). With a
// download concurrency above 1 it downloads in parallel into a temp file, since
// ParallelGet needs a WriterAt; otherwise it streams the single response
// directly into extraction.
func (c *GitRestoreCmd) fetchAndExtractSnapshot(ctx context.Context, api *client.Client) (commit, bundleURL string, err error) {
if c.DownloadConcurrency > 1 {
return c.parallelFetchAndExtract(ctx, api)
}
return c.streamFetchAndExtract(ctx, api)
}

// streamFetchAndExtract downloads the snapshot in a single request and pipes the
// response body straight into extraction, overlapping download and extraction.
func (c *GitRestoreCmd) streamFetchAndExtract(ctx context.Context, api *client.Client) (string, string, error) {
var snap *client.GitSnapshot
if err := inSpan(ctx, "cachew.download_snapshot",
[]attribute.KeyValue{attribute.String("cachew.repo_url", c.RepoURL)},
func(ctx context.Context) error {
downloadStart := time.Now()
s, err := api.OpenGitSnapshot(ctx, c.RepoURL)
if err != nil {
return err //nolint:wrapcheck // wrapped by caller
}
snap = s
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("cachew.snapshot_commit", s.Commit),
attribute.String("cachew.bundle_url", s.BundleURL),
attribute.Float64("cachew.elapsed_seconds", time.Since(downloadStart).Seconds()),
)
return nil
}); err != nil {
return "", "", err
}
defer snap.Close()

if err := c.extract(ctx, snap.Body); err != nil {
return "", "", err
}
return snap.Commit, snap.BundleURL, nil
}

// parallelFetchAndExtract downloads the snapshot into a temp file using bounded
// concurrent range requests, then extracts from the file. ParallelGet writes via
// WriteAt so it cannot stream into extraction; the temp file is removed on
// return.
func (c *GitRestoreCmd) parallelFetchAndExtract(ctx context.Context, api *client.Client) (string, string, error) {
tmp, err := os.CreateTemp("", "cachew-snapshot-*.tar.zst")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Store parallel snapshots on the target filesystem

When users enable --download-concurrency for a multi-GB snapshot on hosts where /tmp is a small tmpfs or separate quota, this writes the entire compressed snapshot to the default temp directory before extraction. That can fail with ENOSPC even when c.Directory has enough space for the restore; creating the temp file under the target directory's filesystem, or making the temp location configurable, avoids making parallel restore unusable in those environments.

Useful? React with 👍 / 👎.

if err != nil {
return "", "", errors.Wrap(err, "create snapshot temp file")
}
defer func() {
_ = tmp.Close()
_ = os.Remove(tmp.Name()) //nolint:gosec // name is from os.CreateTemp, not external input
}()

var meta client.GitSnapshotMetadata
if err := inSpan(ctx, "cachew.download_snapshot",
[]attribute.KeyValue{
attribute.String("cachew.repo_url", c.RepoURL),
attribute.Int("cachew.download_concurrency", c.DownloadConcurrency),
attribute.Int("cachew.download_chunk_size_mb", c.DownloadChunkSizeMB),
},
func(ctx context.Context) error {
downloadStart := time.Now()
m, err := api.DownloadGitSnapshot(ctx, c.RepoURL, tmp, int64(c.DownloadChunkSizeMB)<<20, c.DownloadConcurrency)
if err != nil {
return err //nolint:wrapcheck // wrapped by caller
}
meta = m
trace.SpanFromContext(ctx).SetAttributes(
attribute.String("cachew.snapshot_commit", m.Commit),
attribute.String("cachew.bundle_url", m.BundleURL),
attribute.Float64("cachew.elapsed_seconds", time.Since(downloadStart).Seconds()),
)
return nil
}); err != nil {
return "", "", err
}

if _, err := tmp.Seek(0, io.SeekStart); err != nil {
return "", "", errors.Wrap(err, "rewind snapshot temp file")
}
if err := c.extract(ctx, tmp); err != nil {
return "", "", err
}
return meta.Commit, meta.BundleURL, nil
}

// extract decompresses and unpacks the snapshot body into the target directory.
func (c *GitRestoreCmd) extract(ctx context.Context, body io.Reader) error {
fmt.Fprintf(os.Stderr, "Extracting to %s...\n", c.Directory) //nolint:forbidigo,gosec // c.Directory is an operator-supplied CLI path
return inSpan(ctx, "cachew.extract",
[]attribute.KeyValue{attribute.String("cachew.directory", c.Directory)},
func(ctx context.Context) error {
extractStart := time.Now()
if err := snapshot.Extract(ctx, body, c.Directory, c.ZstdThreads); err != nil {
return err //nolint:wrapcheck // wrapped by caller
}
elapsed := time.Since(extractStart)
trace.SpanFromContext(ctx).SetAttributes(attribute.Float64("cachew.elapsed_seconds", elapsed.Seconds()))
fmt.Fprintf(os.Stderr, "Snapshot extracted in %s\n", elapsed) //nolint:forbidigo
return nil
})
}

// satisfyRefs ensures the working tree contains every requested ref and
// commit. It short-circuits whenever the local clone already has what was
// asked for, avoiding both /ensure-refs and git pull when the snapshot+bundle
Expand Down
Loading