Skip to content

Commit 21e801e

Browse files
committed
Enable pulling large files in parallel
This is an attempt to better meet the expectation of users that pull large files. If implemented this will permit to pull concurrently chunks of a given artifact layer. Signed-off-by: Soule BA <bah.soule@gmail.com>
1 parent 739461c commit 21e801e

6 files changed

Lines changed: 248 additions & 11 deletions

File tree

oci/client/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ import (
2121

2222
"github.com/google/go-containerregistry/pkg/crane"
2323
"github.com/google/go-containerregistry/pkg/v1/remote"
24+
"github.com/hashicorp/go-retryablehttp"
2425

2526
"github.com/fluxcd/pkg/oci"
2627
)
2728

2829
// Client holds the options for accessing remote OCI registries.
2930
type Client struct {
30-
options []crane.Option
31+
options []crane.Option
32+
httpClient *retryablehttp.Client
3133
}
3234

3335
// NewClient returns an OCI client configured with the given crane options.

oci/client/pull.go

Lines changed: 214 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,28 @@ import (
2222
"context"
2323
"fmt"
2424
"io"
25+
"net/http"
26+
"net/url"
2527
"os"
2628

29+
"github.com/fluxcd/pkg/tar"
30+
"github.com/google/go-containerregistry/pkg/authn"
2731
"github.com/google/go-containerregistry/pkg/crane"
2832
"github.com/google/go-containerregistry/pkg/name"
29-
gcrv1 "github.com/google/go-containerregistry/pkg/v1"
33+
"github.com/hashicorp/go-retryablehttp"
3034

31-
"github.com/fluxcd/pkg/tar"
35+
v1 "github.com/google/go-containerregistry/pkg/v1"
36+
"github.com/google/go-containerregistry/pkg/v1/remote"
37+
"github.com/google/go-containerregistry/pkg/v1/remote/transport"
38+
"golang.org/x/sync/errgroup"
39+
)
40+
41+
const (
42+
// thresholdForConcurrentPull is the maximum size of a layer to be extracted in one go.
43+
// If the layer is larger than this, it will be downloaded in chunks.
44+
thresholdForConcurrentPull = 100 * 1024 * 1024 // 100MB
45+
// maxConcurrentPulls is the maximum number of concurrent downloads.
46+
maxConcurrentPulls = 10
3247
)
3348

3449
var (
@@ -39,8 +54,12 @@ var (
3954

4055
// PullOptions contains options for pulling a layer.
4156
type PullOptions struct {
42-
layerIndex int
43-
layerType LayerType
57+
layerIndex int
58+
layerType LayerType
59+
transport http.RoundTripper
60+
auth authn.Authenticator
61+
keychain authn.Keychain
62+
concurrency int
4463
}
4564

4665
// PullOption is a function for configuring PullOptions.
@@ -60,22 +79,53 @@ func WithPullLayerIndex(i int) PullOption {
6079
}
6180
}
6281

82+
func WithTransport(t http.RoundTripper) PullOption {
83+
return func(o *PullOptions) {
84+
o.transport = t
85+
}
86+
}
87+
88+
func WithConcurrency(c int) PullOption {
89+
return func(o *PullOptions) {
90+
o.concurrency = c
91+
}
92+
}
93+
6394
// Pull downloads an artifact from an OCI repository and extracts the content.
6495
// It untar or copies the content to the given outPath depending on the layerType.
6596
// If no layer type is given, it tries to determine the right type by checking compressed content of the layer.
66-
func (c *Client) Pull(ctx context.Context, url, outPath string, opts ...PullOption) (*Metadata, error) {
97+
func (c *Client) Pull(ctx context.Context, urlString, outPath string, opts ...PullOption) (*Metadata, error) {
6798
o := &PullOptions{
6899
layerIndex: 0,
69100
}
101+
o.keychain = authn.DefaultKeychain
70102
for _, opt := range opts {
71103
opt(o)
72104
}
73-
ref, err := name.ParseReference(url)
105+
106+
if o.concurrency == 0 || o.concurrency > maxConcurrentPulls {
107+
o.concurrency = maxConcurrentPulls
108+
}
109+
110+
if o.transport == nil {
111+
transport := remote.DefaultTransport.(*http.Transport).Clone()
112+
o.transport = transport
113+
}
114+
115+
ref, err := name.ParseReference(urlString)
74116
if err != nil {
75117
return nil, fmt.Errorf("invalid URL: %w", err)
76118
}
77119

78-
img, err := crane.Pull(url, c.optionsWithContext(ctx)...)
120+
if c.httpClient == nil {
121+
h, err := makeHttpClient(ctx, ref.Context(), *o)
122+
if err != nil {
123+
return nil, err
124+
}
125+
c.httpClient = h
126+
}
127+
128+
img, err := crane.Pull(urlString, c.optionsWithContext(ctx)...)
79129
if err != nil {
80130
return nil, err
81131
}
@@ -91,7 +141,7 @@ func (c *Client) Pull(ctx context.Context, url, outPath string, opts ...PullOpti
91141
}
92142

93143
meta := MetadataFromAnnotations(manifest.Annotations)
94-
meta.URL = url
144+
meta.URL = urlString
95145
meta.Digest = ref.Context().Digest(digest.String()).String()
96146

97147
layers, err := img.Layers()
@@ -107,15 +157,133 @@ func (c *Client) Pull(ctx context.Context, url, outPath string, opts ...PullOpti
107157
return nil, fmt.Errorf("index '%d' out of bound for '%d' layers in artifact", o.layerIndex, len(layers))
108158
}
109159

160+
size, err := layers[o.layerIndex].Size()
161+
if err != nil {
162+
return nil, fmt.Errorf("failed to get layer size: %w", err)
163+
}
164+
165+
if size > thresholdForConcurrentPull {
166+
digest, err := layers[o.layerIndex].Digest()
167+
if err != nil {
168+
return nil, fmt.Errorf("parsing digest failed: %w", err)
169+
}
170+
u := url.URL{
171+
Scheme: ref.Context().Scheme(),
172+
Host: ref.Context().RegistryStr(),
173+
Path: fmt.Sprintf("/v2/%s/blobs/%s", ref.Context().RepositoryStr(), digest.String()),
174+
}
175+
ok, err := c.IsRangeRequestEnabled(ctx, u)
176+
if err != nil {
177+
return nil, fmt.Errorf("failed to check range request support: %w", err)
178+
}
179+
if ok {
180+
err = c.concurrentExtractLayer(ctx, u, layers[o.layerIndex], outPath, digest, size, o.concurrency)
181+
if err != nil {
182+
return nil, err
183+
}
184+
return meta, nil
185+
}
186+
}
187+
110188
err = extractLayer(layers[o.layerIndex], outPath, o.layerType)
111189
if err != nil {
112190
return nil, err
113191
}
114192
return meta, nil
115193
}
116194

195+
// TO DO: handle authentication handle using keychain for authentication
196+
func (c *Client) IsRangeRequestEnabled(ctx context.Context, u url.URL) (bool, error) {
197+
req, err := retryablehttp.NewRequest(http.MethodHead, u.String(), nil)
198+
if err != nil {
199+
return false, err
200+
}
201+
202+
resp, err := c.httpClient.Do(req.WithContext(ctx))
203+
if err != nil {
204+
return false, err
205+
}
206+
207+
if err := transport.CheckError(resp, http.StatusOK); err != nil {
208+
return false, err
209+
}
210+
211+
if rangeUnit := resp.Header.Get("Accept-Ranges"); rangeUnit == "bytes" {
212+
return true, nil
213+
}
214+
for k, v := range resp.Header {
215+
fmt.Printf("Header: %s, Value: %s\n", k, v)
216+
}
217+
return false, nil
218+
}
219+
220+
func (c *Client) concurrentExtractLayer(ctx context.Context, u url.URL, layer v1.Layer, path string, digest v1.Hash, size int64, concurrency int) error {
221+
chunkSize := size / int64(concurrency)
222+
chunks := make([][]byte, concurrency+1)
223+
diff := size % int64(concurrency)
224+
225+
g, ctx := errgroup.WithContext(ctx)
226+
for i := 0; i < concurrency; i++ {
227+
i := i
228+
g.Go(func() (err error) {
229+
start, end := int64(i)*chunkSize, int64(i+1)*chunkSize
230+
if i == concurrency-1 {
231+
end += diff
232+
}
233+
req, err := retryablehttp.NewRequest(http.MethodGet, u.String(), nil)
234+
if err != nil {
235+
return fmt.Errorf("failed to create a new request: %w", err)
236+
}
237+
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", start, end-1))
238+
resp, err := c.httpClient.Do(req.WithContext(ctx))
239+
if err != nil {
240+
return fmt.Errorf("failed to download archive: %w", err)
241+
}
242+
defer resp.Body.Close()
243+
244+
if err := transport.CheckError(resp, http.StatusPartialContent); err != nil {
245+
return fmt.Errorf("failed to download archive from %s (status: %s)", u.String(), resp.Status)
246+
}
247+
248+
c, err := io.ReadAll(io.LimitReader(resp.Body, end-start))
249+
if err != nil {
250+
return fmt.Errorf("failed to read response body: %w", err)
251+
}
252+
chunks[i] = c
253+
return nil
254+
})
255+
}
256+
err := g.Wait()
257+
if err != nil {
258+
return err
259+
}
260+
261+
content := bufio.NewReader(bytes.NewReader(bytes.Join(chunks, nil)))
262+
d, s, err := v1.SHA256(content)
263+
if err != nil {
264+
return err
265+
}
266+
if d != digest {
267+
return fmt.Errorf("digest mismatch: expected %s, got %s", digest, d)
268+
}
269+
if s != size {
270+
return fmt.Errorf("size mismatch: expected %d, got %d", size, size)
271+
}
272+
273+
f, err := os.Create(path)
274+
if err != nil {
275+
return err
276+
}
277+
278+
_, err = io.Copy(f, content)
279+
if err != nil {
280+
return fmt.Errorf("error copying layer content: %s", err)
281+
}
282+
return nil
283+
}
284+
117285
// extractLayer extracts the Layer to the path
118-
func extractLayer(layer gcrv1.Layer, path string, layerType LayerType) error {
286+
func extractLayer(layer v1.Layer, path string, layerType LayerType) error {
119287
var blob io.Reader
120288
blob, err := layer.Compressed()
121289
if err != nil {
@@ -173,3 +341,40 @@ func isGzipBlob(buf *bufio.Reader) (bool, error) {
173341
}
174342
return bytes.Equal(b, gzipMagicHeader), nil
175343
}
344+
345+
type resource interface {
346+
Scheme() string
347+
RegistryStr() string
348+
Scope(string) string
349+
350+
authn.Resource
351+
}
352+
353+
func makeHttpClient(ctx context.Context, target resource, o PullOptions) (*retryablehttp.Client, error) {
354+
auth := o.auth
355+
if o.keychain != nil {
356+
kauth, err := o.keychain.Resolve(target)
357+
if err != nil {
358+
return nil, err
359+
}
360+
auth = kauth
361+
}
362+
363+
reg, ok := target.(name.Registry)
364+
if !ok {
365+
repo, ok := target.(name.Repository)
366+
if !ok {
367+
return nil, fmt.Errorf("unexpected resource: %T", target)
368+
}
369+
reg = repo.Registry
370+
}
371+
372+
tr, err := transport.NewWithContext(ctx, reg, auth, o.transport, []string{target.Scope(transport.PullScope)})
373+
if err != nil {
374+
return nil, err
375+
}
376+
377+
h := retryablehttp.NewClient()
378+
h.HTTPClient = &http.Client{Transport: tr}
379+
return h, nil
380+
}

oci/client/pull_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func Test_PullAnyTarball(t *testing.T) {
4141
repo := "test-no-annotations" + randStringRunes(5)
4242

4343
dst := fmt.Sprintf("%s/%s:%s", dockerReg, repo, tag)
44+
fmt.Println("Pulling from:", dst)
4445

4546
artifact := filepath.Join(t.TempDir(), "artifact.tgz")
4647
g.Expect(build(artifact, testDir, nil)).To(Succeed())
@@ -82,3 +83,23 @@ func Test_PullAnyTarball(t *testing.T) {
8283
g.Expect(extractTo + "/" + entry).To(Or(BeAnExistingFile(), BeADirectory()))
8384
}
8485
}
86+
87+
func Test_PullLargeTarball(t *testing.T) {
88+
g := NewWithT(t)
89+
ctx := context.Background()
90+
c := NewClient(DefaultOptions())
91+
dst := "vnp505/zephyr-7b-alpha:alpha"
92+
extractTo := filepath.Join(t.TempDir(), "artifact")
93+
m, err := c.Pull(ctx, dst, extractTo, WithPullLayerIndex(19))
94+
fmt.Println("Pulled from:", dst)
95+
g.Expect(err).ToNot(HaveOccurred())
96+
g.Expect(err).ToNot(HaveOccurred())
97+
g.Expect(m).ToNot(BeNil())
98+
g.Expect(m.Annotations).To(BeEmpty())
99+
g.Expect(m.Created).To(BeEmpty())
100+
g.Expect(m.Revision).To(BeEmpty())
101+
g.Expect(m.Source).To(BeEmpty())
102+
g.Expect(m.URL).To(Equal(dst))
103+
g.Expect(m.Digest).ToNot(BeEmpty())
104+
g.Expect(extractTo).ToNot(BeEmpty())
105+
}

oci/client/push_pull_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ func Test_Push_Pull(t *testing.T) {
305305
g.Expect(err).ToNot(HaveOccurred())
306306

307307
fileInfo, err := os.Stat(tt.sourcePath)
308+
g.Expect(err).ToNot(HaveOccurred())
308309
// if a directory was pushed, then the created file should be a gzipped archive
309310
if fileInfo.IsDir() {
310311
bufReader := bufio.NewReader(bytes.NewReader(got))

oci/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ require (
2121
github.com/fluxcd/pkg/tar v0.4.0
2222
github.com/fluxcd/pkg/version v0.2.2
2323
github.com/google/go-containerregistry v0.18.0
24+
github.com/hashicorp/go-retryablehttp v0.7.5
2425
github.com/onsi/gomega v1.31.1
2526
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
2627
github.com/sirupsen/logrus v1.9.3
28+
golang.org/x/sync v0.6.0
2729
sigs.k8s.io/controller-runtime v0.16.3
2830
)
2931

@@ -80,6 +82,7 @@ require (
8082
github.com/gorilla/handlers v1.5.1 // indirect
8183
github.com/gorilla/mux v1.8.1 // indirect
8284
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
85+
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
8386
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 // indirect
8487
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
8588
github.com/imdario/mergo v0.3.15 // indirect
@@ -130,7 +133,6 @@ require (
130133
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
131134
golang.org/x/net v0.20.0 // indirect
132135
golang.org/x/oauth2 v0.16.0 // indirect
133-
golang.org/x/sync v0.6.0 // indirect
134136
golang.org/x/sys v0.16.0 // indirect
135137
golang.org/x/term v0.16.0 // indirect
136138
golang.org/x/text v0.14.0 // indirect

oci/go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
155155
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
156156
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
157157
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
158+
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
159+
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
160+
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
161+
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
162+
github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M=
163+
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
158164
github.com/hashicorp/golang-lru/arc/v2 v2.0.5 h1:l2zaLDubNhW4XO3LnliVj0GXO3+/CGNJAg1dcN2Fpfw=
159165
github.com/hashicorp/golang-lru/arc/v2 v2.0.5/go.mod h1:ny6zBSQZi2JxIeYcv7kt2sH2PXJtirBN7RDhRpxPkxU=
160166
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=

0 commit comments

Comments
 (0)