Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/backend/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,16 @@ func (b *backend) Extract(ctx context.Context, target string, cfg *config.Extrac

// exportModelArtifact exports the target model artifact to the output directory, which will open the artifact and extract to restore the original repo structure.
func exportModelArtifact(ctx context.Context, store storage.Storage, manifest ocispec.Manifest, repo string, cfg *config.Extract) error {
g := &errgroup.Group{}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(cfg.Concurrency)

for _, layer := range manifest.Layers {
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// pull the blob from the storage.
reader, err := store.PullBlob(ctx, repo, layer.Digest.String())
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/backend/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,17 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e
pb.Start()
defer pb.Stop()

g := &errgroup.Group{}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(cfg.Concurrency)

for _, layer := range layers {
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

return pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer)
})
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/backend/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err

// copy the layers.
dst := b.store
g := &errgroup.Group{}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(cfg.Concurrency)

var fn func(desc ocispec.Descriptor) error
Expand All @@ -98,6 +98,12 @@ func (b *backend) Pull(ctx context.Context, target string, cfg *config.Pull) err

for _, layer := range manifest.Layers {
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

return retry.Do(func() error {
// call the before hook.
cfg.Hooks.BeforePullLayer(layer, manifest)
Expand Down
8 changes: 7 additions & 1 deletion pkg/backend/pull_by_d7y.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,17 @@ func (b *backend) pullByDragonfly(ctx context.Context, target string, cfg *confi
defer pb.Stop()

// Process layers concurrently.
g := &errgroup.Group{}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(cfg.Concurrency)

for _, layer := range manifest.Layers {
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

return processLayer(ctx, pb, dfdaemon.NewDfdaemonDownloadClient(conn), ref, manifest, layer, authToken, cfg)
})
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/backend/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ func (b *backend) Push(ctx context.Context, target string, cfg *config.Push) err
// note: the order is important, manifest should be pushed at last.

// copy the layers.
g := &errgroup.Group{}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(cfg.Concurrency)

for _, layer := range manifest.Layers {
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

return retry.Do(func() error {
return pushIfNotExist(ctx, pb, internalpb.NormalizePrompt("Copying blob"), src, dst, layer, repo, tag)
}, append(defaultRetryOpts, retry.Context(ctx))...)
Expand Down